""" GPT-5 medical answer generator for IP-Assist-Lite Canonical wrapper supporting both Responses API and Chat Completions - JSON-safe serialization via model_dump() - Tool forcing support for both APIs - Reasoning effort control """ from __future__ import annotations from typing import Any, Dict, List, Optional, Union import os import logging from dotenv import load_dotenv from openai import OpenAI import openai # Configure logging log_level = os.getenv("LOG_LEVEL", "INFO").upper() logging.basicConfig( level=getattr(logging, log_level, logging.INFO), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Load environment variables from .env file load_dotenv() # Configuration from environment USE_RESPONSES_API = os.getenv("USE_RESPONSES_API", "1").strip() not in {"0","false","False"} REASONING_EFFORT = os.getenv("REASONING_EFFORT", "").strip() or None # e.g., "medium" # Allowed GPT‑5 model family ALLOWED_GPT5_MODELS = {"gpt-5", "gpt-5-mini", "gpt-5-nano"} ENABLE_GPT4_FALLBACK = os.getenv("ENABLE_GPT4_FALLBACK", "true").strip().lower() == "true" FALLBACK_MODEL = os.getenv("FALLBACK_MODEL", "gpt-4o-mini").strip() or "gpt-4o-mini" class GPT5Medical: def _compose_instructions(self, messages: List[Dict[str, str]]) -> str: """Compose a single instruction string from any system messages with a sensible default.""" sys_parts = [m.get("content", "") for m in messages if m.get("role") == "system"] base = ( "You are a careful, thorough interventional pulmonology assistant. " "Use only the provided context when present; cite succinctly. Be specific on doses, contraindications, and steps when applicable." ) if sys_parts: return "\n".join(sys_parts + [base]).strip() return base def _extract_text(self, resp) -> Optional[str]: """SDK-agnostic text extractor for Responses API.""" # Debug: Log all attributes of the response logger.debug(f"Response type: {type(resp)}") logger.debug(f"Response attributes: {dir(resp)}") # 1) Preferred shortcut (SDK helper) text = getattr(resp, "output_text", None) logger.debug(f"output_text attribute value: {text}") if text and isinstance(text, str): logger.debug(f"✅ Found output_text attribute: {len(text)} chars") return text # 1b) Check for direct text attribute (GPT-5 specific) text = getattr(resp, "text", None) if text and isinstance(text, str): logger.debug(f"✅ Found text attribute: {len(text)} chars") return text # 2) Robust parsing across possible shapes try: raw = resp.model_dump() if hasattr(resp, "model_dump") else resp.__dict__ logger.debug(f"Response structure keys: {list(raw.keys()) if isinstance(raw, dict) else 'not a dict'}") # 2a) Check for text field at top level (GPT-5 responses) if isinstance(raw, dict) and "text" in raw and raw["text"]: text = raw["text"] logger.debug(f"✅ Found text field at top level: {len(text)} chars") return text # Newer Responses API: output is a list of items; message items have content blocks if isinstance(raw, dict) and isinstance(raw.get("output"), list): logger.debug(f"Found output list with {len(raw.get('output', []))} items") collected = [] for i, item in enumerate(raw.get("output", [])): itype = item.get("type") logger.debug(f" Item {i}: type={itype}") # Handle reasoning type (GPT-5 specific) if itype == "reasoning": # Check if there's text in the reasoning reasoning_text = item.get("text", "") if reasoning_text: logger.debug(f" Found reasoning text: {len(reasoning_text)} chars") # For now, skip reasoning - we want the actual output continue # Direct output_text item if itype == "output_text" and isinstance(item.get("text"), str): collected.append(item.get("text", "")) continue # Direct text item if itype == "text" and isinstance(item.get("text"), str): collected.append(item.get("text", "")) continue # Message with content blocks if itype == "message": content = item.get("content", []) or [] logger.debug(f" Message has {len(content)} content blocks") for block in content: btype = block.get("type") # Blocks may be 'output_text' or other types; prefer text payload if btype in ("output_text", "text") and isinstance(block.get("text"), str): collected.append(block.get("text", "")) if collected: result = "\n".join([t for t in collected if t]) logger.debug(f"✅ Extracted {len(result)} chars from output items") return result # Fallbacks: sometimes SDK may return a flat 'text' at top level if isinstance(raw, dict) and isinstance(raw.get("text"), str): text = raw.get("text") logger.debug(f"✅ Found text at top level: {len(text)} chars") return text # Additional fallback: if raw is a dict with data, try to extract it if isinstance(raw, dict): # Log what we actually got for debugging logger.debug(f"Raw response dict keys: {list(raw.keys())}") # Check for common response patterns if "choices" in raw and isinstance(raw["choices"], list) and raw["choices"]: # Chat completions format choice = raw["choices"][0] if isinstance(choice, dict) and "message" in choice: msg = choice["message"] if isinstance(msg, dict) and "content" in msg: content = msg["content"] if isinstance(content, str): logger.debug(f"✅ Found text in choices[0].message.content: {len(content)} chars") return content except Exception as e: logger.error(f"❌ Error extracting text: {e}") logger.warning("❌ No text extracted from response") return None def __init__(self, model: Optional[str] = None, use_responses: Optional[bool] = None, max_out: int = 4000, # Increased to 4000 for comprehensive responses reasoning_effort: Optional[str] = None): # "low"|"medium"|"high" self.client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) # Normalize to GPT‑5 family if an alias like "gpt-5-turbo" is provided raw_model = (model or os.getenv("IP_GPT5_MODEL") or os.getenv("GPT5_MODEL") or "gpt-5-mini").strip() self.model = self._coerce_gpt5_model(raw_model) self.use_responses = use_responses if use_responses is not None else USE_RESPONSES_API self.max_out = max_out self.reasoning_effort = reasoning_effort or REASONING_EFFORT # Trace fields for UI/telemetry self.last_used_model: Optional[str] = None self.last_warning_banner: Optional[str] = None def _coerce_gpt5_model(self, name: str) -> str: """Map arbitrary names into the supported GPT‑5 family. - Exact matches allowed: gpt-5, gpt-5-mini, gpt-5-nano - Unknown gpt‑5 variants (e.g., gpt-5-turbo) → gpt-5 - Non gpt‑5 names left as-is (fallback logic will handle access errors) """ if name in ALLOWED_GPT5_MODELS: return name if name.startswith("gpt-5"): # Coerce any unrecognized variant in the gpt‑5 family to the base model return "gpt-5" return name def _normalize_messages_for_responses(self, messages: List[Dict[str, str]]): """Map Chat-style messages -> Responses input format (role + string content). The Responses API accepts a list of {role, content} where content is a string. """ return [{"role": msg.get("role", "user"), "content": msg.get("content", "")} for msg in messages] def complete(self, messages: List[Dict[str, str]], tools: Optional[List[Dict[str, Any]]] = None, tool_choice: Optional[Union[str, Dict[str, Any]]] = None, temperature: Optional[float] = None) -> Dict[str, Any]: """Return a plain dict with `.text`, `.tool_calls`, `.raw` fields. - Uses Responses API by default; falls back to Chat Completions with correct params. - Always returns JSON-serializable structures. """ # Reset trace fields self.last_used_model = None self.last_warning_banner = None # Helper: extract tool calls from Responses API output def _extract_tool_calls_from_responses(resp_obj): tool_calls = [] for item in getattr(resp_obj, "output", []) or []: if getattr(item, "type", None) == "tool_call": tool_calls.append({ "name": getattr(item, "tool_name", ""), "arguments": getattr(item, "arguments", ""), }) return tool_calls # Primary call: Responses API or Chat Completions for the requested model try: if self.use_responses: logger.info(f"🔵 Using Responses API for model: {self.model}") kwargs = { "model": self.model, # Extract any system messages into instructions; provide default guidance too "instructions": self._compose_instructions(messages), "input": self._normalize_messages_for_responses([m for m in messages if m.get("role") != "system"]), "max_output_tokens": self.max_out, } # Only add reasoning effort for actual GPT-5 models if self.reasoning_effort and "gpt-5" in self.model: kwargs["reasoning"] = {"effort": self.reasoning_effort} # Avoid passing non-portable 'verbosity' param; SDKs may not support it yet if tools: kwargs["tools"] = tools kwargs["tool_choice"] = tool_choice or "auto" # Note: GPT-5 Responses API doesn't support temperature parameter # Temperature is controlled by the model's internal reasoning # Log the actual input being sent logger.debug(f"Request model: {kwargs.get('model')}") logger.debug(f"Request instructions length: {len(kwargs.get('instructions', ''))}") logger.debug(f"Request input messages: {len(kwargs.get('input', []))} messages") if kwargs.get('input'): for i, msg in enumerate(kwargs.get('input', [])[:2]): # Log first 2 messages logger.debug(f" Message {i}: role={msg.get('role')}, content_length={len(msg.get('content', ''))}") resp = self.client.responses.create(**kwargs) logger.debug(f"Response type: {type(resp)}") text = self._extract_text(resp) logger.info(f"📝 Extracted text length: {len(text) if text else 0} chars") # Log the actual text if it's suspiciously short if text and len(text) < 10: logger.warning(f"⚠️ Very short response from GPT-5: '{text}'") tool_calls = _extract_tool_calls_from_responses(resp) from src.utils.serialization import to_jsonable self.last_used_model = getattr(resp, "model", self.model) # If Responses returned no text and no tool calls, try a Chat fallback (same model) # Ensure text is a string before calling strip() if not (text and isinstance(text, str) and text.strip()) and not tool_calls: logger.warning(f"⚠️ Responses API returned empty text. Trying Chat Completions fallback...") try: chat_kwargs = { "model": self.model, "messages": messages, } if self.model and (self.model.startswith("gpt-5") or self.model in ALLOWED_GPT5_MODELS): chat_kwargs["max_completion_tokens"] = self.max_out else: chat_kwargs["max_tokens"] = self.max_out if tools: chat_kwargs["tools"] = tools chat_kwargs["tool_choice"] = tool_choice or "auto" if temperature is not None: chat_kwargs["temperature"] = temperature logger.info(f"🔄 Trying Chat Completions with model: {self.model}") chat_resp = self.client.chat.completions.create(**chat_kwargs) msg = chat_resp.choices[0].message if chat_resp.choices else None text = msg.content if msg else "" if text: logger.info(f"✅ Chat Completions returned {len(text)} chars") else: logger.warning(f"❌ Chat Completions also returned empty") tool_calls = [] if msg and getattr(msg, "tool_calls", None): for tc in msg.tool_calls: tool_calls.append({ "name": getattr(tc.function, "name", "") if hasattr(tc, "function") else "", "arguments": getattr(tc.function, "arguments", "") if hasattr(tc, "function") else "", }) self.last_used_model = getattr(chat_resp, "model", self.model) self.last_warning_banner = "Fetched content via Chat after empty Responses output." return { "text": text, "tool_calls": tool_calls or None, "raw": to_jsonable(chat_resp), "used_model": self.last_used_model, } except Exception as e: logger.error(f"❌ Chat fallback failed: {e}") # Fall back to returning the original Responses object (even if empty) pass return { "text": text, "tool_calls": tool_calls or None, "raw": to_jsonable(resp), "used_model": self.last_used_model, } else: # Chat Completions path is_o1_model = self.model and self.model.startswith("o1") kwargs = { "model": self.model, "messages": messages, } # Use correct token cap per model family if self.model and (self.model.startswith("gpt-5") or self.model in ALLOWED_GPT5_MODELS): kwargs["max_completion_tokens"] = self.max_out else: kwargs["max_tokens"] = self.max_out if not is_o1_model: if tools: kwargs["tools"] = tools kwargs["tool_choice"] = tool_choice or "auto" if temperature is not None: kwargs["temperature"] = temperature else: filtered_messages = [] for msg in messages: if msg.get("role") == "system": filtered_messages.append({"role": "user", "content": f"[System]: {msg['content']}"}) else: filtered_messages.append(msg) kwargs["messages"] = filtered_messages resp = self.client.chat.completions.create(**kwargs) msg = resp.choices[0].message if resp.choices else None text = msg.content if msg else "" tool_calls = [] if msg and getattr(msg, "tool_calls", None): for tc in msg.tool_calls: tool_calls.append({ "name": getattr(tc.function, "name", "") if hasattr(tc, "function") else "", "arguments": getattr(tc.function, "arguments", "") if hasattr(tc, "function") else "", }) from src.utils.serialization import to_jsonable self.last_used_model = getattr(resp, "model", self.model) return { "text": text, "tool_calls": tool_calls or None, "raw": to_jsonable(resp), "used_model": self.last_used_model, } except (openai.NotFoundError, openai.PermissionDeniedError) as e: logger.warning(f"⚠️ Model {self.model} not accessible: {e}") # Try fallback to GPT-4 if ENABLE_GPT4_FALLBACK and self.model.startswith("gpt-5"): logger.info(f"🔄 Falling back to {FALLBACK_MODEL}") try: fallback_kwargs = { "model": FALLBACK_MODEL, "messages": messages, "max_tokens": self.max_out, } if temperature is not None: fallback_kwargs["temperature"] = temperature if tools: fallback_kwargs["tools"] = tools fallback_kwargs["tool_choice"] = tool_choice or "auto" fallback_resp = self.client.chat.completions.create(**fallback_kwargs) msg = fallback_resp.choices[0].message if fallback_resp.choices else None text = msg.content if msg else "" logger.info(f"✅ Fallback to {FALLBACK_MODEL} succeeded: {len(text)} chars") tool_calls = [] if msg and getattr(msg, "tool_calls", None): for tc in msg.tool_calls: tool_calls.append({ "name": getattr(tc.function, "name", "") if hasattr(tc, "function") else "", "arguments": getattr(tc.function, "arguments", "") if hasattr(tc, "function") else "", }) from src.utils.serialization import to_jsonable self.last_used_model = FALLBACK_MODEL self.last_warning_banner = f"Using fallback model {FALLBACK_MODEL} (GPT-5 not accessible)" return { "text": text, "tool_calls": tool_calls or None, "raw": to_jsonable(fallback_resp), "used_model": self.last_used_model, } except Exception as e2: logger.error(f"❌ Fallback also failed: {e2}") raise e else: raise e except openai.AuthenticationError as e: logger.error(f"❌ Authentication error: {e}") # Hard failures: do not silently downgrade self.last_warning_banner = ( f"Selected model '{self.model}' is unavailable for this API key/project. " "Verify model access in the OpenAI dashboard or choose a different model." ) raise except (openai.RateLimitError, openai.APIConnectionError, openai.APIStatusError, TimeoutError) as e: # Transient errors: optionally fallback if ENABLE_GPT4_FALLBACK and FALLBACK_MODEL: try: # Retry primary path with fallback model if self.use_responses: kwargs = { "model": FALLBACK_MODEL, "instructions": self._compose_instructions(messages), "input": self._normalize_messages_for_responses([m for m in messages if m.get("role") != "system"]), "max_output_tokens": self.max_out, } if self.reasoning_effort: kwargs["reasoning"] = {"effort": self.reasoning_effort} if tools: kwargs["tools"] = tools kwargs["tool_choice"] = tool_choice or "auto" kwargs["temperature"] = 0.2 if temperature is None else temperature resp = self.client.responses.create(**kwargs) text = self._extract_text(resp) tool_calls = _extract_tool_calls_from_responses(resp) from src.utils.serialization import to_jsonable self.last_used_model = getattr(resp, "model", FALLBACK_MODEL) self.last_warning_banner = ( f"Fell back to {FALLBACK_MODEL} due to transient error calling '{self.model}': {e.__class__.__name__}." ) return { "text": text, "tool_calls": tool_calls or None, "raw": to_jsonable(resp), "used_model": self.last_used_model, } else: is_o1_model = FALLBACK_MODEL.startswith("o1") kwargs = { "model": FALLBACK_MODEL, "messages": messages, } if FALLBACK_MODEL.startswith("gpt-5"): kwargs["max_completion_tokens"] = self.max_out else: kwargs["max_tokens"] = self.max_out if not is_o1_model: if tools: kwargs["tools"] = tools kwargs["tool_choice"] = tool_choice or "auto" if temperature is not None: kwargs["temperature"] = temperature resp = self.client.chat.completions.create(**kwargs) msg = resp.choices[0].message if resp.choices else None text = msg.content if msg else "" tool_calls = [] if msg and getattr(msg, "tool_calls", None): for tc in msg.tool_calls: tool_calls.append({ "name": getattr(tc.function, "name", "") if hasattr(tc, "function") else "", "arguments": getattr(tc.function, "arguments", "") if hasattr(tc, "function") else "", }) from src.utils.serialization import to_jsonable self.last_used_model = getattr(resp, "model", FALLBACK_MODEL) self.last_warning_banner = ( f"Fell back to {FALLBACK_MODEL} due to transient error calling '{self.model}': {e.__class__.__name__}." ) return { "text": text, "tool_calls": tool_calls or None, "raw": to_jsonable(resp), "used_model": self.last_used_model, } except Exception: # Fall through to generic error pass # If fallback disabled or also failed, re-raise raise except Exception: # Unknown error, bubble up raise # Backward compatibility with old interface def generate(self, system: str, user: str, tools: Optional[List[Dict]] = None, tool_choice: Optional[Dict] = None) -> Dict: """Legacy interface for backward compatibility""" messages = [ {"role": "system", "content": system.strip()}, {"role": "user", "content": user.strip()} ] result = self.complete(messages, tools, tool_choice) # Map to old format return { "text": result["text"], "tool_calls": result["tool_calls"], "usage": result["raw"].get("usage", {}) } def generate_response(self, prompt: str, messages: Optional[List[Dict]] = None) -> str: """Simple text generation for orchestrator use""" msgs = messages or [] msgs.append({"role": "user", "content": prompt}) result = self.complete(msgs) # Track used model for UI consumption self.last_used_model = result.get("used_model", self.model) return result["text"] or "Unable to generate response." # Create alias for backward compatibility GPT5MedicalGenerator = GPT5Medical __all__ = [ "GPT5Medical", "GPT5MedicalGenerator", # Backward compatibility alias ]