Spaces:
Configuration error
Configuration error
| # test.py β Agentic logic using OpenAI + MCP tools (langchain_core for parsing) | |
| import os | |
| import json | |
| from typing import Any, Dict, Optional, List, Literal, Type | |
| from pydantic import BaseModel, ValidationError | |
| from openai import OpenAI | |
| from langchain_core.output_parsers import PydanticOutputParser # β requested parser | |
| # -------------------- OpenAI setup -------------------- | |
| OAI_MODEL = os.getenv("OAI_MODEL", "gpt-4o-mini") | |
| client_oai = OpenAI(api_key="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") | |
| def _format_history_for_context( | |
| conversation: List[Dict[str, str]], | |
| max_turns: int = 8 | |
| ) -> str: | |
| """ | |
| Convert the last N messages from the session into a compact context string. | |
| Expected item format: {"role": "user"|"assistant", "content": "..."}. | |
| """ | |
| if not conversation: | |
| return "" | |
| window = conversation[-max_turns:] | |
| lines = [] | |
| for m in window: | |
| role = m.get("role", "user") | |
| content = m.get("content", "").strip() | |
| if not content: | |
| continue | |
| if role == "user": | |
| lines.append(f"User: {content}") | |
| else: | |
| lines.append(f"Assistant: {content}") | |
| return "\n".join(lines) | |
| def llm_invoke( | |
| prompt: str, | |
| system: str = "You are a helpful assistant. Return JSON when requested.", | |
| temperature: float = 0.0, | |
| ) -> str: | |
| """ | |
| Invoke OpenAI Chat Completions for planning/intent classification (low temperature). | |
| """ | |
| resp = client_oai.chat.completions.create( | |
| model=OAI_MODEL, | |
| messages=[ | |
| {"role": "system", "content": system}, | |
| {"role": "user", "content": prompt}, | |
| ], | |
| temperature=temperature, | |
| ) | |
| return resp.choices[0].message.content | |
| # -------------------- Pydantic models -------------------- | |
| class IntentSpec(BaseModel): | |
| in_scope: bool | |
| intent: Literal["in_scope", "out_of_scope", "chit_chat"] | |
| reason: Optional[str] = None | |
| class SubQuery(BaseModel): | |
| id: str | |
| query: str | |
| tool_name: Literal["ask_excel", "ask_pdf", "ask_link"] | |
| required_params: Dict[str, Any] | |
| depends_on: List[str] = [] | |
| class PlanResponse(BaseModel): | |
| subqueries: List[SubQuery] | |
| class ContextEnhancer(BaseModel): | |
| answer_found: bool | |
| needs_enhancement: bool | |
| enhanced_query: Optional[str] = None | |
| cached_answer: Optional[str] = None | |
| reason: Optional[str] = None | |
| # -------------------- JSON parsing via langchain_core -------------------- | |
| def _safe_json(text: str) -> str: | |
| """ | |
| Heuristic sanitizer: strip code fences and extract the main JSON block | |
| to help PydanticOutputParser if the model adds extra text. | |
| """ | |
| t = text.strip() | |
| if t.startswith("```"): | |
| # Remove triple backtick fences; allow optional 'json' hint | |
| t = t.strip("`").strip() | |
| if t.lower().startswith("json"): | |
| t = t[4:].strip() | |
| # Try direct JSON | |
| try: | |
| json.loads(t) | |
| return t | |
| except Exception: | |
| pass | |
| # Fallback: find first '{' and last '}' | |
| start = t.find("{") | |
| end = t.rfind("}") | |
| if start != -1 and end != -1 and end > start: | |
| return t[start : end + 1] | |
| return text | |
| def parse_response(text: str, model_spec: Type[BaseModel]) -> BaseModel: | |
| """ | |
| Parse into a Pydantic model using langchain_core's PydanticOutputParser, | |
| with a robust fallback to standard json+pydantic if needed. | |
| """ | |
| parser = PydanticOutputParser(pydantic_object=model_spec) | |
| # First try parser.parse() directly | |
| try: | |
| return parser.parse(text) | |
| except Exception: | |
| pass | |
| # Fallback: sanitize and try again | |
| try: | |
| return parser.parse(_safe_json(text)) | |
| except Exception: | |
| # Last fallback: manual pydantic construction | |
| data = json.loads(_safe_json(text)) | |
| return model_spec(**data) | |
| # -------------------- Prompts (intent + planning) -------------------- | |
| ''' | |
| def intent_prompt(query: str, available_iits: List = [], available_branches: List = [], years: List = []) -> str: | |
| parser = PydanticOutputParser(pydantic_object=IntentSpec) | |
| fmt = parser.get_format_instructions() # <- tells the LLM the exact JSON keys/types | |
| return f"""You are an intent classifier for a JOSAA Counseling Assistant. | |
| Supported IITs: {', '.join(available_iits)} | |
| Supported Branches: {', '.join(available_branches)} | |
| Available Data: opening/closing ranks ({', '.join(years)}), curriculum, NIRF, placements/faculty/research/facilities. | |
| Classify the user's message into EXACTLY ONE of: | |
| - "chit_chat" | |
| - "in_scope" | |
| - "out_of_scope" | |
| Rules: | |
| - "chit_chat" for greetings/small talk (hi/hello/how are you/what can you do). | |
| - "in_scope" for queries about SUPPORTED IITs/branches, counseling, ranks/cutoffs, courses, curriculum, NIRF, placements, faculty, research, alumni/distinguished alumni and campus facilities. | |
| - "out_of_scope" otherwise. | |
| Return ONLY a JSON object following these instructions: | |
| {fmt} | |
| User query: "{query}" | |
| """.strip() | |
| ''' | |
| def intent_prompt( | |
| query: str, | |
| available_iits: List = [], | |
| available_branches: List = [], | |
| years: List = [], | |
| conversation_context: str = "" # NEW | |
| ) -> str: | |
| parser = PydanticOutputParser(pydantic_object=IntentSpec) | |
| fmt = parser.get_format_instructions() | |
| convo = f"\n\nRecent conversation:\n{conversation_context}\n\n" if conversation_context else "\n\n" | |
| return f"""You are an intent classifier for a JOSAA Counseling Assistant. | |
| Supported IITs: {', '.join(available_iits)} | |
| Supported Branches: {', '.join(available_branches)} | |
| Available Data: opening/closing ranks ({', '.join(years)}), curriculum, NIRF, placements/faculty/research/facilities.{convo} | |
| Classify the user's message into EXACTLY ONE of: | |
| - "chit_chat" | |
| - "in_scope" | |
| - "out_of_scope" | |
| Rules: | |
| - "chit_chat" for greetings/small talk (hi/hello/how are you/what can you do). | |
| - "in_scope" for queries about SUPPORTED IITs/branches, counseling, ranks/cutoffs, courses, curriculum, NIRF, placements, faculty, research, alumni/distinguished alumni and campus facilities. | |
| - "out_of_scope" otherwise. | |
| Return ONLY a JSON object following these instructions: | |
| {fmt} | |
| User query: "{query}" | |
| """.strip() | |
| ''' | |
| def planning_prompt(query: str, available_iits: List = [], available_branches: List = [], years: List = []) -> str: | |
| parser = PydanticOutputParser(pydantic_object=PlanResponse) | |
| fmt = parser.get_format_instructions() | |
| return f"""You are a query planner for a JEE counseling assistant. | |
| AVAILABLE TOOLS: | |
| - ask_excel β ranks/cutoffs; params may include iit_name, branch, year | |
| - ask_pdf β curriculum/NIRF; params may include iit_name, branch | |
| - ask_link β placements/faculty/research/facilities; params may include iit_name, branch, or a URL | |
| Break the user query into specific subqueries targeting ONE tool each. | |
| Use ONLY supported IIT names and branch names when present. | |
| Return ONLY a JSON object following these instructions: | |
| {fmt} | |
| User Query: "{query}" | |
| """.strip() | |
| ''' | |
| def planning_prompt( | |
| query: str, | |
| available_iits: List = [], | |
| available_branches: List = [], | |
| years: List = [], | |
| conversation_context: str = "" # NEW | |
| ) -> str: | |
| parser = PydanticOutputParser(pydantic_object=PlanResponse) | |
| fmt = parser.get_format_instructions() | |
| convo = f"\n\nRecent conversation:\n{conversation_context}\n\n" if conversation_context else "\n\n" | |
| return f"""You are a query planner for a JEE counseling assistant. | |
| AVAILABLE TOOLS: | |
| - ask_excel β ranks/cutoffs | |
| - ask_pdf β curriculum/NIRF | |
| - ask_link β placements/faculty/research/facilities{convo} | |
| Break the user query into specific subqueries targeting ONE tool each. | |
| Return ONLY a JSON object following these instructions: | |
| {fmt} | |
| User Query: "{query}" | |
| """.strip() | |
| # -------------------- Intent detection & planning -------------------- | |
| ''' | |
| def intent_detect(user_q: str, available_iits: List, available_branches: List, years: List) -> IntentSpec: | |
| response = llm_invoke(intent_prompt(user_q, available_iits, available_branches, years), temperature=0.0) | |
| print("intent is", f"{response}") | |
| try: | |
| return parse_response(response, IntentSpec) | |
| except Exception as e: | |
| # default to out_of_scope if parsing fails | |
| return IntentSpec(in_scope=False, intent="out_of_scope", reason=f"Parse error: {e}") | |
| ''' | |
| def intent_detect( | |
| user_q: str, | |
| available_iits: List, | |
| available_branches: List, | |
| years: List, | |
| conversation_context: str # NEW | |
| ) -> IntentSpec: | |
| response = llm_invoke( | |
| intent_prompt(user_q, available_iits, available_branches, years, conversation_context), | |
| temperature=0.0 | |
| ) | |
| return parse_response(response, IntentSpec) | |
| ''' | |
| def make_query_plan(user_q: str, available_iits: List, available_branches: List, years: List) -> PlanResponse: | |
| response = llm_invoke(planning_prompt(user_q, available_iits, available_branches, years), temperature=0.0) | |
| return parse_response(response, PlanResponse) | |
| ''' | |
| def make_query_plan( | |
| user_q: str, | |
| available_iits: List, | |
| available_branches: List, | |
| years: List, | |
| conversation_context: str # NEW | |
| ) -> PlanResponse: | |
| response = llm_invoke( | |
| planning_prompt(user_q, available_iits, available_branches, years, conversation_context), | |
| temperature=0.0 | |
| ) | |
| return parse_response(response, PlanResponse) | |
| # -------------------- MCP tool registry (real calls) -------------------- | |
| def _build_query_text(query: str, params: Dict[str, Any]) -> str: | |
| """Compose a single question string using the planner's params and description.""" | |
| if not params: | |
| return query | |
| param_str = "; ".join(f"{k}: {v}" for k, v in params.items()) | |
| return f"{query}\nParameters: {param_str}" | |
| ''' | |
| def make_tool_registry(mcp_client) -> Dict[str, Any]: | |
| """ | |
| Return callables that invoke actual MCP tools via your client. | |
| """ | |
| def call_ask_excel(query: str, required_params: Dict[str, Any], temperature: float = 0.1, top_k: int = 5) -> str: | |
| q_text = _build_query_text(query, required_params) | |
| return mcp_client.ask_excel( | |
| question=q_text, | |
| top_k=top_k, | |
| sheet=required_params.get("sheet", 0), | |
| temperature=temperature, | |
| ) | |
| def call_ask_pdf(query: str, required_params: Dict[str, Any], temperature: float = 0.1, top_k: int = 5) -> str: | |
| q_text = _build_query_text(query, required_params) | |
| return mcp_client.ask_pdf( | |
| question=q_text, | |
| top_k=top_k, | |
| temperature=temperature, | |
| ) | |
| def call_ask_link(query: str, required_params: Dict[str, Any], temperature: float = 0.1, top_k: int = 5) -> str: | |
| q_text = _build_query_text(query, required_params) | |
| return mcp_client.ask_link( | |
| question=q_text, | |
| temperature=temperature, | |
| subquery_context=required_params.get("subquery_context"), | |
| top_k=top_k, | |
| ) | |
| return { | |
| "ask_excel": call_ask_excel, | |
| "ask_pdf": call_ask_pdf, | |
| "ask_link": call_ask_link, | |
| } | |
| ''' | |
| # AFTER (CHANGE): | |
| def make_tool_registry(mcp_client, conversation_context: str) -> Dict[str, Any]: | |
| def _build_query_text(query: str, params: Dict[str, Any], conversation_context: str) -> str: | |
| parts = [query.strip()] | |
| if params: | |
| parts.append("Parameters: " + "; ".join(f"{k}: {v}" for k, v in params.items())) | |
| if conversation_context: | |
| parts.append("Conversation context:\n" + conversation_context) | |
| return "\n".join(parts) | |
| def call_ask_excel(query, required_params, temperature=0.1, top_k=5): | |
| q_text = _build_query_text(query, required_params, conversation_context) | |
| return mcp_client.ask_excel(question=q_text, top_k=top_k, sheet=required_params.get("sheet", 0), temperature=temperature) | |
| def call_ask_pdf(query, required_params, temperature=0.1, top_k=5): | |
| q_text = _build_query_text(query, required_params, conversation_context) | |
| return mcp_client.ask_pdf(question=q_text, top_k=top_k, temperature=temperature) | |
| def call_ask_link(query, required_params, temperature=0.1, top_k=5): | |
| q_text = _build_query_text(query, required_params, "") # put convo in subquery_context instead | |
| subctx = conversation_context if conversation_context else required_params.get("subquery_context") | |
| # IMPORTANT: align param name with your server (query vs question) | |
| return mcp_client.ask_link( | |
| query=q_text, # if server expects 'query'; use question=q_text otherwise | |
| temperature=temperature, | |
| subquery_context=subctx, | |
| top_k=top_k, | |
| ) | |
| return {"ask_excel": call_ask_excel, "ask_pdf": call_ask_pdf, "ask_link": call_ask_link} | |
| # -------------------- Execute subqueries & synthesize final -------------------- | |
| def build_execution_order(subqueries: List[SubQuery]) -> List[List[str]]: | |
| """ | |
| Create batches of IDs whose dependencies are satisfied (simple topological batching). | |
| """ | |
| if not subqueries: | |
| return [] | |
| remaining = {sq.id: sq for sq in subqueries} | |
| completed = set() | |
| order: List[List[str]] = [] | |
| while remaining: | |
| ready = [sq_id for sq_id, sq in remaining.items() if all(dep in completed for dep in sq.depends_on)] | |
| if not ready: | |
| raise ValueError(f"Circular or unsatisfiable dependencies: {list(remaining.keys())}") | |
| order.append(ready) | |
| for sq_id in ready: | |
| completed.add(sq_id) | |
| del remaining[sq_id] | |
| return order | |
| #def execute_plan( | |
| # user_q: str, | |
| # plan: PlanResponse, | |
| # mcp_client, | |
| # temperature: float = 0.1, | |
| # top_k: int = 5 | |
| #) -> Dict[str, Any]: | |
| # """ | |
| # Execute subqueries in batches; returns a dict of {sq_id: {tool, answer}}. | |
| # """ | |
| # registry = make_tool_registry(mcp_client) | |
| def execute_plan(user_q, plan, mcp_client, conversation_context: str, temperature=0.1, top_k=5): | |
| registry = make_tool_registry(mcp_client, conversation_context) | |
| subqs = plan.subqueries | |
| exec_order = build_execution_order(subqs) | |
| results: Dict[str, Any] = {} | |
| for batch in exec_order: | |
| for sq_id in batch: | |
| sq = next(s for s in subqs if s.id == sq_id) | |
| tool_fn = registry.get(sq.tool_name) | |
| if not tool_fn: | |
| results[sq_id] = {"tool": sq.tool_name, "answer": f"β Unknown tool '{sq.tool_name}'"} | |
| continue | |
| try: | |
| ans = tool_fn(sq.query, sq.required_params, temperature=temperature, top_k=top_k) | |
| results[sq_id] = {"tool": sq.tool_name, "answer": ans} | |
| except Exception as e: | |
| results[sq_id] = {"tool": sq.tool_name, "answer": f"β Error calling tool: {e}"} | |
| return {"execution_order": exec_order, "results": results} | |
| ''' | |
| def synthesize_answer(user_q: str, exec_result: Dict[str, Any]) -> str: | |
| """ | |
| Use OpenAI to write a concise final answer using all tool outputs. | |
| """ | |
| tool_outputs = [] | |
| for batch in exec_result.get("execution_order", []): | |
| for sq_id in batch: | |
| entry = exec_result["results"].get(sq_id, {}) | |
| tool_outputs.append(f"[{sq_id} β’ {entry.get('tool')}] {entry.get('answer', '')}") | |
| context = "\n".join(tool_outputs) if tool_outputs else "(no tool outputs)" | |
| prompt = f"""You are a helpful assistant for JEE/JOSAA counseling. | |
| User Question: | |
| {user_q} | |
| Tool Results: | |
| {context} | |
| Write a concise, accurate final answer grounded in the tool results. | |
| If the tool results are insufficient, state that clearly. | |
| Avoid bracketed tags and avoid repeating metadata like [sq1]. | |
| """ | |
| return llm_invoke(prompt, system="You are a helpful assistant. Use only provided context.", temperature=0.2) | |
| ''' | |
| # AFTER (CHANGE): | |
| def synthesize_answer(user_q, exec_result, conversation_context: str): | |
| tool_outputs = [] | |
| # ... | |
| prompt = f"""You are a helpful assistant for JEE/JOSAA counseling. | |
| Recent conversation: | |
| {conversation_context or "(none)"} | |
| User Question: | |
| {user_q} | |
| Tool Results: | |
| {exec_result} | |
| Write a concise, accurate final answer grounded in the tool results and the recent conversation. | |
| If the available context is insufficient, state that clearly. | |
| Avoid bracketed tags and metadata like [sq1]. | |
| """ | |
| return llm_invoke(prompt, system="You are a helpful assistant. Use only provided context.", temperature=0.2) | |
| # -------------------- Public entry point used by chat_app -------------------- | |
| # AFTER (CHANGE): | |
| def run_agent( | |
| user_q: str, | |
| mcp_client, | |
| available_iits: List[str], | |
| available_branches: List[str], | |
| years: List[str], | |
| conversation: List[Dict[str, str]], # NEW | |
| top_k: int = 5, | |
| temperature: float = 0.1, | |
| ) -> str: | |
| conversation_context = _format_history_for_context(conversation, max_turns=8) | |
| intent = intent_detect(user_q, available_iits, available_branches, years, conversation_context) | |
| print(intent) | |
| print("The intent response is", f"{intent}") | |
| if intent.intent == "chit_chat": | |
| return ( | |
| f"Hi! Iβm your JOSAA Counseling Assistant.\n" | |
| f"Ask about branches, opening/closing ranks, or options for your rank.\n" | |
| f"Supported IITs: {', '.join(available_iits)}; branches: {', '.join(available_branches)}." | |
| ) | |
| if not intent.in_scope or intent.intent == "out_of_scope": | |
| return ( | |
| "This assistant only supports JEE/JOSAA counseling.\n" | |
| f"Supported IITs: {', '.join(available_iits)}; branches: {', '.join(available_branches)}.\n" | |
| "Please refine your query accordingly." | |
| ) | |
| # In-scope β plan β execute β synthesize | |
| plan = make_query_plan(user_q, available_iits, available_branches, years, conversation_context) | |
| print(plan) | |
| exec_result = execute_plan(user_q, plan, mcp_client, conversation_context, temperature=temperature, top_k=top_k) | |
| final = synthesize_answer(user_q, exec_result, conversation_context) | |
| return final.strip() | |