Spaces:
Sleeping
Sleeping
| """ | |
| GPT-5 medical answer generator for IP-Assist-Lite | |
| Canonical wrapper supporting both Responses API and Chat Completions | |
| - JSON-safe serialization via model_dump() | |
| - Tool forcing support for both APIs | |
| - Reasoning effort control | |
| """ | |
| from __future__ import annotations | |
| from typing import Any, Dict, List, Optional, Union | |
| import os | |
| import logging | |
| from dotenv import load_dotenv | |
| from openai import OpenAI | |
| import openai | |
| # Configure logging | |
| log_level = os.getenv("LOG_LEVEL", "INFO").upper() | |
| logging.basicConfig( | |
| level=getattr(logging, log_level, logging.INFO), | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Configuration from environment | |
| USE_RESPONSES_API = os.getenv("USE_RESPONSES_API", "1").strip() not in {"0","false","False"} | |
| REASONING_EFFORT = os.getenv("REASONING_EFFORT", "").strip() or None # e.g., "medium" | |
| # Allowed GPT‑5 model family | |
| ALLOWED_GPT5_MODELS = {"gpt-5", "gpt-5-mini", "gpt-5-nano"} | |
| ENABLE_GPT4_FALLBACK = os.getenv("ENABLE_GPT4_FALLBACK", "true").strip().lower() == "true" | |
| FALLBACK_MODEL = os.getenv("FALLBACK_MODEL", "gpt-4o-mini").strip() or "gpt-4o-mini" | |
| class GPT5Medical: | |
| def _compose_instructions(self, messages: List[Dict[str, str]]) -> str: | |
| """Compose a single instruction string from any system messages with a sensible default.""" | |
| sys_parts = [m.get("content", "") for m in messages if m.get("role") == "system"] | |
| base = ( | |
| "You are a careful, thorough interventional pulmonology assistant. " | |
| "Use only the provided context when present; cite succinctly. Be specific on doses, contraindications, and steps when applicable." | |
| ) | |
| if sys_parts: | |
| return "\n".join(sys_parts + [base]).strip() | |
| return base | |
| def _extract_text(self, resp) -> Optional[str]: | |
| """SDK-agnostic text extractor for Responses API.""" | |
| # Debug: Log all attributes of the response | |
| logger.debug(f"Response type: {type(resp)}") | |
| logger.debug(f"Response attributes: {dir(resp)}") | |
| # 1) Preferred shortcut (SDK helper) | |
| text = getattr(resp, "output_text", None) | |
| logger.debug(f"output_text attribute value: {text}") | |
| if text and isinstance(text, str): | |
| logger.debug(f"✅ Found output_text attribute: {len(text)} chars") | |
| return text | |
| # 1b) Check for direct text attribute (GPT-5 specific) | |
| text = getattr(resp, "text", None) | |
| if text and isinstance(text, str): | |
| logger.debug(f"✅ Found text attribute: {len(text)} chars") | |
| return text | |
| # 2) Robust parsing across possible shapes | |
| try: | |
| raw = resp.model_dump() if hasattr(resp, "model_dump") else resp.__dict__ | |
| logger.debug(f"Response structure keys: {list(raw.keys()) if isinstance(raw, dict) else 'not a dict'}") | |
| # 2a) Check for text field at top level (GPT-5 responses) | |
| if isinstance(raw, dict) and "text" in raw and raw["text"]: | |
| text = raw["text"] | |
| logger.debug(f"✅ Found text field at top level: {len(text)} chars") | |
| return text | |
| # Newer Responses API: output is a list of items; message items have content blocks | |
| if isinstance(raw, dict) and isinstance(raw.get("output"), list): | |
| logger.debug(f"Found output list with {len(raw.get('output', []))} items") | |
| collected = [] | |
| for i, item in enumerate(raw.get("output", [])): | |
| itype = item.get("type") | |
| logger.debug(f" Item {i}: type={itype}") | |
| # Handle reasoning type (GPT-5 specific) | |
| if itype == "reasoning": | |
| # Check if there's text in the reasoning | |
| reasoning_text = item.get("text", "") | |
| if reasoning_text: | |
| logger.debug(f" Found reasoning text: {len(reasoning_text)} chars") | |
| # For now, skip reasoning - we want the actual output | |
| continue | |
| # Direct output_text item | |
| if itype == "output_text" and isinstance(item.get("text"), str): | |
| collected.append(item.get("text", "")) | |
| continue | |
| # Direct text item | |
| if itype == "text" and isinstance(item.get("text"), str): | |
| collected.append(item.get("text", "")) | |
| continue | |
| # Message with content blocks | |
| if itype == "message": | |
| content = item.get("content", []) or [] | |
| logger.debug(f" Message has {len(content)} content blocks") | |
| for block in content: | |
| btype = block.get("type") | |
| # Blocks may be 'output_text' or other types; prefer text payload | |
| if btype in ("output_text", "text") and isinstance(block.get("text"), str): | |
| collected.append(block.get("text", "")) | |
| if collected: | |
| result = "\n".join([t for t in collected if t]) | |
| logger.debug(f"✅ Extracted {len(result)} chars from output items") | |
| return result | |
| # Fallbacks: sometimes SDK may return a flat 'text' at top level | |
| if isinstance(raw, dict) and isinstance(raw.get("text"), str): | |
| text = raw.get("text") | |
| logger.debug(f"✅ Found text at top level: {len(text)} chars") | |
| return text | |
| # Additional fallback: if raw is a dict with data, try to extract it | |
| if isinstance(raw, dict): | |
| # Log what we actually got for debugging | |
| logger.debug(f"Raw response dict keys: {list(raw.keys())}") | |
| # Check for common response patterns | |
| if "choices" in raw and isinstance(raw["choices"], list) and raw["choices"]: | |
| # Chat completions format | |
| choice = raw["choices"][0] | |
| if isinstance(choice, dict) and "message" in choice: | |
| msg = choice["message"] | |
| if isinstance(msg, dict) and "content" in msg: | |
| content = msg["content"] | |
| if isinstance(content, str): | |
| logger.debug(f"✅ Found text in choices[0].message.content: {len(content)} chars") | |
| return content | |
| except Exception as e: | |
| logger.error(f"❌ Error extracting text: {e}") | |
| logger.warning("❌ No text extracted from response") | |
| return None | |
| def __init__(self, | |
| model: Optional[str] = None, | |
| use_responses: Optional[bool] = None, | |
| max_out: int = 4000, # Increased to 4000 for comprehensive responses | |
| reasoning_effort: Optional[str] = None): # "low"|"medium"|"high" | |
| self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| # Normalize to GPT‑5 family if an alias like "gpt-5-turbo" is provided | |
| raw_model = (model or os.getenv("IP_GPT5_MODEL") or os.getenv("GPT5_MODEL") or "gpt-5-mini").strip() | |
| self.model = self._coerce_gpt5_model(raw_model) | |
| self.use_responses = use_responses if use_responses is not None else USE_RESPONSES_API | |
| self.max_out = max_out | |
| self.reasoning_effort = reasoning_effort or REASONING_EFFORT | |
| # Trace fields for UI/telemetry | |
| self.last_used_model: Optional[str] = None | |
| self.last_warning_banner: Optional[str] = None | |
| def _coerce_gpt5_model(self, name: str) -> str: | |
| """Map arbitrary names into the supported GPT‑5 family. | |
| - Exact matches allowed: gpt-5, gpt-5-mini, gpt-5-nano | |
| - Unknown gpt‑5 variants (e.g., gpt-5-turbo) → gpt-5 | |
| - Non gpt‑5 names left as-is (fallback logic will handle access errors) | |
| """ | |
| if name in ALLOWED_GPT5_MODELS: | |
| return name | |
| if name.startswith("gpt-5"): | |
| # Coerce any unrecognized variant in the gpt‑5 family to the base model | |
| return "gpt-5" | |
| return name | |
| def _normalize_messages_for_responses(self, messages: List[Dict[str, str]]): | |
| """Map Chat-style messages -> Responses input format (role + string content). | |
| The Responses API accepts a list of {role, content} where content is a string. | |
| """ | |
| return [{"role": msg.get("role", "user"), "content": msg.get("content", "")} for msg in messages] | |
| def complete(self, | |
| messages: List[Dict[str, str]], | |
| tools: Optional[List[Dict[str, Any]]] = None, | |
| tool_choice: Optional[Union[str, Dict[str, Any]]] = None, | |
| temperature: Optional[float] = None) -> Dict[str, Any]: | |
| """Return a plain dict with `.text`, `.tool_calls`, `.raw` fields. | |
| - Uses Responses API by default; falls back to Chat Completions with correct params. | |
| - Always returns JSON-serializable structures. | |
| """ | |
| # Reset trace fields | |
| self.last_used_model = None | |
| self.last_warning_banner = None | |
| # Helper: extract tool calls from Responses API output | |
| def _extract_tool_calls_from_responses(resp_obj): | |
| tool_calls = [] | |
| for item in getattr(resp_obj, "output", []) or []: | |
| if getattr(item, "type", None) == "tool_call": | |
| tool_calls.append({ | |
| "name": getattr(item, "tool_name", ""), | |
| "arguments": getattr(item, "arguments", ""), | |
| }) | |
| return tool_calls | |
| # Primary call: Responses API or Chat Completions for the requested model | |
| try: | |
| if self.use_responses: | |
| logger.info(f"🔵 Using Responses API for model: {self.model}") | |
| kwargs = { | |
| "model": self.model, | |
| # Extract any system messages into instructions; provide default guidance too | |
| "instructions": self._compose_instructions(messages), | |
| "input": self._normalize_messages_for_responses([m for m in messages if m.get("role") != "system"]), | |
| "max_output_tokens": self.max_out, | |
| } | |
| # Only add reasoning effort for actual GPT-5 models | |
| if self.reasoning_effort and "gpt-5" in self.model: | |
| kwargs["reasoning"] = {"effort": self.reasoning_effort} | |
| # Avoid passing non-portable 'verbosity' param; SDKs may not support it yet | |
| if tools: | |
| kwargs["tools"] = tools | |
| kwargs["tool_choice"] = tool_choice or "auto" | |
| # Note: GPT-5 Responses API doesn't support temperature parameter | |
| # Temperature is controlled by the model's internal reasoning | |
| # Log the actual input being sent | |
| logger.debug(f"Request model: {kwargs.get('model')}") | |
| logger.debug(f"Request instructions length: {len(kwargs.get('instructions', ''))}") | |
| logger.debug(f"Request input messages: {len(kwargs.get('input', []))} messages") | |
| if kwargs.get('input'): | |
| for i, msg in enumerate(kwargs.get('input', [])[:2]): # Log first 2 messages | |
| logger.debug(f" Message {i}: role={msg.get('role')}, content_length={len(msg.get('content', ''))}") | |
| resp = self.client.responses.create(**kwargs) | |
| logger.debug(f"Response type: {type(resp)}") | |
| text = self._extract_text(resp) | |
| logger.info(f"📝 Extracted text length: {len(text) if text else 0} chars") | |
| # Log the actual text if it's suspiciously short | |
| if text and len(text) < 10: | |
| logger.warning(f"⚠️ Very short response from GPT-5: '{text}'") | |
| tool_calls = _extract_tool_calls_from_responses(resp) | |
| from src.utils.serialization import to_jsonable | |
| self.last_used_model = getattr(resp, "model", self.model) | |
| # If Responses returned no text and no tool calls, try a Chat fallback (same model) | |
| # Ensure text is a string before calling strip() | |
| if not (text and isinstance(text, str) and text.strip()) and not tool_calls: | |
| logger.warning(f"⚠️ Responses API returned empty text. Trying Chat Completions fallback...") | |
| try: | |
| chat_kwargs = { | |
| "model": self.model, | |
| "messages": messages, | |
| } | |
| if self.model and (self.model.startswith("gpt-5") or self.model in ALLOWED_GPT5_MODELS): | |
| chat_kwargs["max_completion_tokens"] = self.max_out | |
| else: | |
| chat_kwargs["max_tokens"] = self.max_out | |
| if tools: | |
| chat_kwargs["tools"] = tools | |
| chat_kwargs["tool_choice"] = tool_choice or "auto" | |
| if temperature is not None: | |
| chat_kwargs["temperature"] = temperature | |
| logger.info(f"🔄 Trying Chat Completions with model: {self.model}") | |
| chat_resp = self.client.chat.completions.create(**chat_kwargs) | |
| msg = chat_resp.choices[0].message if chat_resp.choices else None | |
| text = msg.content if msg else "" | |
| if text: | |
| logger.info(f"✅ Chat Completions returned {len(text)} chars") | |
| else: | |
| logger.warning(f"❌ Chat Completions also returned empty") | |
| tool_calls = [] | |
| if msg and getattr(msg, "tool_calls", None): | |
| for tc in msg.tool_calls: | |
| tool_calls.append({ | |
| "name": getattr(tc.function, "name", "") if hasattr(tc, "function") else "", | |
| "arguments": getattr(tc.function, "arguments", "") if hasattr(tc, "function") else "", | |
| }) | |
| self.last_used_model = getattr(chat_resp, "model", self.model) | |
| self.last_warning_banner = "Fetched content via Chat after empty Responses output." | |
| return { | |
| "text": text, | |
| "tool_calls": tool_calls or None, | |
| "raw": to_jsonable(chat_resp), | |
| "used_model": self.last_used_model, | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Chat fallback failed: {e}") | |
| # Fall back to returning the original Responses object (even if empty) | |
| pass | |
| return { | |
| "text": text, | |
| "tool_calls": tool_calls or None, | |
| "raw": to_jsonable(resp), | |
| "used_model": self.last_used_model, | |
| } | |
| else: | |
| # Chat Completions path | |
| is_o1_model = self.model and self.model.startswith("o1") | |
| kwargs = { | |
| "model": self.model, | |
| "messages": messages, | |
| } | |
| # Use correct token cap per model family | |
| if self.model and (self.model.startswith("gpt-5") or self.model in ALLOWED_GPT5_MODELS): | |
| kwargs["max_completion_tokens"] = self.max_out | |
| else: | |
| kwargs["max_tokens"] = self.max_out | |
| if not is_o1_model: | |
| if tools: | |
| kwargs["tools"] = tools | |
| kwargs["tool_choice"] = tool_choice or "auto" | |
| if temperature is not None: | |
| kwargs["temperature"] = temperature | |
| else: | |
| filtered_messages = [] | |
| for msg in messages: | |
| if msg.get("role") == "system": | |
| filtered_messages.append({"role": "user", "content": f"[System]: {msg['content']}"}) | |
| else: | |
| filtered_messages.append(msg) | |
| kwargs["messages"] = filtered_messages | |
| resp = self.client.chat.completions.create(**kwargs) | |
| msg = resp.choices[0].message if resp.choices else None | |
| text = msg.content if msg else "" | |
| tool_calls = [] | |
| if msg and getattr(msg, "tool_calls", None): | |
| for tc in msg.tool_calls: | |
| tool_calls.append({ | |
| "name": getattr(tc.function, "name", "") if hasattr(tc, "function") else "", | |
| "arguments": getattr(tc.function, "arguments", "") if hasattr(tc, "function") else "", | |
| }) | |
| from src.utils.serialization import to_jsonable | |
| self.last_used_model = getattr(resp, "model", self.model) | |
| return { | |
| "text": text, | |
| "tool_calls": tool_calls or None, | |
| "raw": to_jsonable(resp), | |
| "used_model": self.last_used_model, | |
| } | |
| except (openai.NotFoundError, openai.PermissionDeniedError) as e: | |
| logger.warning(f"⚠️ Model {self.model} not accessible: {e}") | |
| # Try fallback to GPT-4 | |
| if ENABLE_GPT4_FALLBACK and self.model.startswith("gpt-5"): | |
| logger.info(f"🔄 Falling back to {FALLBACK_MODEL}") | |
| try: | |
| fallback_kwargs = { | |
| "model": FALLBACK_MODEL, | |
| "messages": messages, | |
| "max_tokens": self.max_out, | |
| } | |
| if temperature is not None: | |
| fallback_kwargs["temperature"] = temperature | |
| if tools: | |
| fallback_kwargs["tools"] = tools | |
| fallback_kwargs["tool_choice"] = tool_choice or "auto" | |
| fallback_resp = self.client.chat.completions.create(**fallback_kwargs) | |
| msg = fallback_resp.choices[0].message if fallback_resp.choices else None | |
| text = msg.content if msg else "" | |
| logger.info(f"✅ Fallback to {FALLBACK_MODEL} succeeded: {len(text)} chars") | |
| tool_calls = [] | |
| if msg and getattr(msg, "tool_calls", None): | |
| for tc in msg.tool_calls: | |
| tool_calls.append({ | |
| "name": getattr(tc.function, "name", "") if hasattr(tc, "function") else "", | |
| "arguments": getattr(tc.function, "arguments", "") if hasattr(tc, "function") else "", | |
| }) | |
| from src.utils.serialization import to_jsonable | |
| self.last_used_model = FALLBACK_MODEL | |
| self.last_warning_banner = f"Using fallback model {FALLBACK_MODEL} (GPT-5 not accessible)" | |
| return { | |
| "text": text, | |
| "tool_calls": tool_calls or None, | |
| "raw": to_jsonable(fallback_resp), | |
| "used_model": self.last_used_model, | |
| } | |
| except Exception as e2: | |
| logger.error(f"❌ Fallback also failed: {e2}") | |
| raise e | |
| else: | |
| raise e | |
| except openai.AuthenticationError as e: | |
| logger.error(f"❌ Authentication error: {e}") | |
| # Hard failures: do not silently downgrade | |
| self.last_warning_banner = ( | |
| f"Selected model '{self.model}' is unavailable for this API key/project. " | |
| "Verify model access in the OpenAI dashboard or choose a different model." | |
| ) | |
| raise | |
| except (openai.RateLimitError, openai.APIConnectionError, openai.APIStatusError, TimeoutError) as e: | |
| # Transient errors: optionally fallback | |
| if ENABLE_GPT4_FALLBACK and FALLBACK_MODEL: | |
| try: | |
| # Retry primary path with fallback model | |
| if self.use_responses: | |
| kwargs = { | |
| "model": FALLBACK_MODEL, | |
| "instructions": self._compose_instructions(messages), | |
| "input": self._normalize_messages_for_responses([m for m in messages if m.get("role") != "system"]), | |
| "max_output_tokens": self.max_out, | |
| } | |
| if self.reasoning_effort: | |
| kwargs["reasoning"] = {"effort": self.reasoning_effort} | |
| if tools: | |
| kwargs["tools"] = tools | |
| kwargs["tool_choice"] = tool_choice or "auto" | |
| kwargs["temperature"] = 0.2 if temperature is None else temperature | |
| resp = self.client.responses.create(**kwargs) | |
| text = self._extract_text(resp) | |
| tool_calls = _extract_tool_calls_from_responses(resp) | |
| from src.utils.serialization import to_jsonable | |
| self.last_used_model = getattr(resp, "model", FALLBACK_MODEL) | |
| self.last_warning_banner = ( | |
| f"Fell back to {FALLBACK_MODEL} due to transient error calling '{self.model}': {e.__class__.__name__}." | |
| ) | |
| return { | |
| "text": text, | |
| "tool_calls": tool_calls or None, | |
| "raw": to_jsonable(resp), | |
| "used_model": self.last_used_model, | |
| } | |
| else: | |
| is_o1_model = FALLBACK_MODEL.startswith("o1") | |
| kwargs = { | |
| "model": FALLBACK_MODEL, | |
| "messages": messages, | |
| } | |
| if FALLBACK_MODEL.startswith("gpt-5"): | |
| kwargs["max_completion_tokens"] = self.max_out | |
| else: | |
| kwargs["max_tokens"] = self.max_out | |
| if not is_o1_model: | |
| if tools: | |
| kwargs["tools"] = tools | |
| kwargs["tool_choice"] = tool_choice or "auto" | |
| if temperature is not None: | |
| kwargs["temperature"] = temperature | |
| resp = self.client.chat.completions.create(**kwargs) | |
| msg = resp.choices[0].message if resp.choices else None | |
| text = msg.content if msg else "" | |
| tool_calls = [] | |
| if msg and getattr(msg, "tool_calls", None): | |
| for tc in msg.tool_calls: | |
| tool_calls.append({ | |
| "name": getattr(tc.function, "name", "") if hasattr(tc, "function") else "", | |
| "arguments": getattr(tc.function, "arguments", "") if hasattr(tc, "function") else "", | |
| }) | |
| from src.utils.serialization import to_jsonable | |
| self.last_used_model = getattr(resp, "model", FALLBACK_MODEL) | |
| self.last_warning_banner = ( | |
| f"Fell back to {FALLBACK_MODEL} due to transient error calling '{self.model}': {e.__class__.__name__}." | |
| ) | |
| return { | |
| "text": text, | |
| "tool_calls": tool_calls or None, | |
| "raw": to_jsonable(resp), | |
| "used_model": self.last_used_model, | |
| } | |
| except Exception: | |
| # Fall through to generic error | |
| pass | |
| # If fallback disabled or also failed, re-raise | |
| raise | |
| except Exception: | |
| # Unknown error, bubble up | |
| raise | |
| # Backward compatibility with old interface | |
| def generate(self, | |
| system: str, | |
| user: str, | |
| tools: Optional[List[Dict]] = None, | |
| tool_choice: Optional[Dict] = None) -> Dict: | |
| """Legacy interface for backward compatibility""" | |
| messages = [ | |
| {"role": "system", "content": system.strip()}, | |
| {"role": "user", "content": user.strip()} | |
| ] | |
| result = self.complete(messages, tools, tool_choice) | |
| # Map to old format | |
| return { | |
| "text": result["text"], | |
| "tool_calls": result["tool_calls"], | |
| "usage": result["raw"].get("usage", {}) | |
| } | |
| def generate_response(self, prompt: str, messages: Optional[List[Dict]] = None) -> str: | |
| """Simple text generation for orchestrator use""" | |
| msgs = messages or [] | |
| msgs.append({"role": "user", "content": prompt}) | |
| result = self.complete(msgs) | |
| # Track used model for UI consumption | |
| self.last_used_model = result.get("used_model", self.model) | |
| return result["text"] or "Unable to generate response." | |
| # Create alias for backward compatibility | |
| GPT5MedicalGenerator = GPT5Medical | |
| __all__ = [ | |
| "GPT5Medical", | |
| "GPT5MedicalGenerator", # Backward compatibility alias | |
| ] | |