NeerajAhire's picture
Upload test.py
3b3be44 verified
raw
history blame
18.1 kB
# test.py β€” Agentic logic using OpenAI + MCP tools (langchain_core for parsing)
import os
import json
from typing import Any, Dict, Optional, List, Literal, Type
from pydantic import BaseModel, ValidationError
from openai import OpenAI
from langchain_core.output_parsers import PydanticOutputParser # ← requested parser
# -------------------- OpenAI setup --------------------
OAI_MODEL = os.getenv("OAI_MODEL", "gpt-4o-mini")
client_oai = OpenAI(api_key="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")
def _format_history_for_context(
conversation: List[Dict[str, str]],
max_turns: int = 8
) -> str:
"""
Convert the last N messages from the session into a compact context string.
Expected item format: {"role": "user"|"assistant", "content": "..."}.
"""
if not conversation:
return ""
window = conversation[-max_turns:]
lines = []
for m in window:
role = m.get("role", "user")
content = m.get("content", "").strip()
if not content:
continue
if role == "user":
lines.append(f"User: {content}")
else:
lines.append(f"Assistant: {content}")
return "\n".join(lines)
def llm_invoke(
prompt: str,
system: str = "You are a helpful assistant. Return JSON when requested.",
temperature: float = 0.0,
) -> str:
"""
Invoke OpenAI Chat Completions for planning/intent classification (low temperature).
"""
resp = client_oai.chat.completions.create(
model=OAI_MODEL,
messages=[
{"role": "system", "content": system},
{"role": "user", "content": prompt},
],
temperature=temperature,
)
return resp.choices[0].message.content
# -------------------- Pydantic models --------------------
class IntentSpec(BaseModel):
in_scope: bool
intent: Literal["in_scope", "out_of_scope", "chit_chat"]
reason: Optional[str] = None
class SubQuery(BaseModel):
id: str
query: str
tool_name: Literal["ask_excel", "ask_pdf", "ask_link"]
required_params: Dict[str, Any]
depends_on: List[str] = []
class PlanResponse(BaseModel):
subqueries: List[SubQuery]
class ContextEnhancer(BaseModel):
answer_found: bool
needs_enhancement: bool
enhanced_query: Optional[str] = None
cached_answer: Optional[str] = None
reason: Optional[str] = None
# -------------------- JSON parsing via langchain_core --------------------
def _safe_json(text: str) -> str:
"""
Heuristic sanitizer: strip code fences and extract the main JSON block
to help PydanticOutputParser if the model adds extra text.
"""
t = text.strip()
if t.startswith("```"):
# Remove triple backtick fences; allow optional 'json' hint
t = t.strip("`").strip()
if t.lower().startswith("json"):
t = t[4:].strip()
# Try direct JSON
try:
json.loads(t)
return t
except Exception:
pass
# Fallback: find first '{' and last '}'
start = t.find("{")
end = t.rfind("}")
if start != -1 and end != -1 and end > start:
return t[start : end + 1]
return text
def parse_response(text: str, model_spec: Type[BaseModel]) -> BaseModel:
"""
Parse into a Pydantic model using langchain_core's PydanticOutputParser,
with a robust fallback to standard json+pydantic if needed.
"""
parser = PydanticOutputParser(pydantic_object=model_spec)
# First try parser.parse() directly
try:
return parser.parse(text)
except Exception:
pass
# Fallback: sanitize and try again
try:
return parser.parse(_safe_json(text))
except Exception:
# Last fallback: manual pydantic construction
data = json.loads(_safe_json(text))
return model_spec(**data)
# -------------------- Prompts (intent + planning) --------------------
'''
def intent_prompt(query: str, available_iits: List = [], available_branches: List = [], years: List = []) -> str:
parser = PydanticOutputParser(pydantic_object=IntentSpec)
fmt = parser.get_format_instructions() # <- tells the LLM the exact JSON keys/types
return f"""You are an intent classifier for a JOSAA Counseling Assistant.
Supported IITs: {', '.join(available_iits)}
Supported Branches: {', '.join(available_branches)}
Available Data: opening/closing ranks ({', '.join(years)}), curriculum, NIRF, placements/faculty/research/facilities.
Classify the user's message into EXACTLY ONE of:
- "chit_chat"
- "in_scope"
- "out_of_scope"
Rules:
- "chit_chat" for greetings/small talk (hi/hello/how are you/what can you do).
- "in_scope" for queries about SUPPORTED IITs/branches, counseling, ranks/cutoffs, courses, curriculum, NIRF, placements, faculty, research, alumni/distinguished alumni and campus facilities.
- "out_of_scope" otherwise.
Return ONLY a JSON object following these instructions:
{fmt}
User query: "{query}"
""".strip()
'''
def intent_prompt(
query: str,
available_iits: List = [],
available_branches: List = [],
years: List = [],
conversation_context: str = "" # NEW
) -> str:
parser = PydanticOutputParser(pydantic_object=IntentSpec)
fmt = parser.get_format_instructions()
convo = f"\n\nRecent conversation:\n{conversation_context}\n\n" if conversation_context else "\n\n"
return f"""You are an intent classifier for a JOSAA Counseling Assistant.
Supported IITs: {', '.join(available_iits)}
Supported Branches: {', '.join(available_branches)}
Available Data: opening/closing ranks ({', '.join(years)}), curriculum, NIRF, placements/faculty/research/facilities.{convo}
Classify the user's message into EXACTLY ONE of:
- "chit_chat"
- "in_scope"
- "out_of_scope"
Rules:
- "chit_chat" for greetings/small talk (hi/hello/how are you/what can you do).
- "in_scope" for queries about SUPPORTED IITs/branches, counseling, ranks/cutoffs, courses, curriculum, NIRF, placements, faculty, research, alumni/distinguished alumni and campus facilities.
- "out_of_scope" otherwise.
Return ONLY a JSON object following these instructions:
{fmt}
User query: "{query}"
""".strip()
'''
def planning_prompt(query: str, available_iits: List = [], available_branches: List = [], years: List = []) -> str:
parser = PydanticOutputParser(pydantic_object=PlanResponse)
fmt = parser.get_format_instructions()
return f"""You are a query planner for a JEE counseling assistant.
AVAILABLE TOOLS:
- ask_excel β€” ranks/cutoffs; params may include iit_name, branch, year
- ask_pdf β€” curriculum/NIRF; params may include iit_name, branch
- ask_link β€” placements/faculty/research/facilities; params may include iit_name, branch, or a URL
Break the user query into specific subqueries targeting ONE tool each.
Use ONLY supported IIT names and branch names when present.
Return ONLY a JSON object following these instructions:
{fmt}
User Query: "{query}"
""".strip()
'''
def planning_prompt(
query: str,
available_iits: List = [],
available_branches: List = [],
years: List = [],
conversation_context: str = "" # NEW
) -> str:
parser = PydanticOutputParser(pydantic_object=PlanResponse)
fmt = parser.get_format_instructions()
convo = f"\n\nRecent conversation:\n{conversation_context}\n\n" if conversation_context else "\n\n"
return f"""You are a query planner for a JEE counseling assistant.
AVAILABLE TOOLS:
- ask_excel β€” ranks/cutoffs
- ask_pdf β€” curriculum/NIRF
- ask_link β€” placements/faculty/research/facilities{convo}
Break the user query into specific subqueries targeting ONE tool each.
Return ONLY a JSON object following these instructions:
{fmt}
User Query: "{query}"
""".strip()
# -------------------- Intent detection & planning --------------------
'''
def intent_detect(user_q: str, available_iits: List, available_branches: List, years: List) -> IntentSpec:
response = llm_invoke(intent_prompt(user_q, available_iits, available_branches, years), temperature=0.0)
print("intent is", f"{response}")
try:
return parse_response(response, IntentSpec)
except Exception as e:
# default to out_of_scope if parsing fails
return IntentSpec(in_scope=False, intent="out_of_scope", reason=f"Parse error: {e}")
'''
def intent_detect(
user_q: str,
available_iits: List,
available_branches: List,
years: List,
conversation_context: str # NEW
) -> IntentSpec:
response = llm_invoke(
intent_prompt(user_q, available_iits, available_branches, years, conversation_context),
temperature=0.0
)
return parse_response(response, IntentSpec)
'''
def make_query_plan(user_q: str, available_iits: List, available_branches: List, years: List) -> PlanResponse:
response = llm_invoke(planning_prompt(user_q, available_iits, available_branches, years), temperature=0.0)
return parse_response(response, PlanResponse)
'''
def make_query_plan(
user_q: str,
available_iits: List,
available_branches: List,
years: List,
conversation_context: str # NEW
) -> PlanResponse:
response = llm_invoke(
planning_prompt(user_q, available_iits, available_branches, years, conversation_context),
temperature=0.0
)
return parse_response(response, PlanResponse)
# -------------------- MCP tool registry (real calls) --------------------
def _build_query_text(query: str, params: Dict[str, Any]) -> str:
"""Compose a single question string using the planner's params and description."""
if not params:
return query
param_str = "; ".join(f"{k}: {v}" for k, v in params.items())
return f"{query}\nParameters: {param_str}"
'''
def make_tool_registry(mcp_client) -> Dict[str, Any]:
"""
Return callables that invoke actual MCP tools via your client.
"""
def call_ask_excel(query: str, required_params: Dict[str, Any], temperature: float = 0.1, top_k: int = 5) -> str:
q_text = _build_query_text(query, required_params)
return mcp_client.ask_excel(
question=q_text,
top_k=top_k,
sheet=required_params.get("sheet", 0),
temperature=temperature,
)
def call_ask_pdf(query: str, required_params: Dict[str, Any], temperature: float = 0.1, top_k: int = 5) -> str:
q_text = _build_query_text(query, required_params)
return mcp_client.ask_pdf(
question=q_text,
top_k=top_k,
temperature=temperature,
)
def call_ask_link(query: str, required_params: Dict[str, Any], temperature: float = 0.1, top_k: int = 5) -> str:
q_text = _build_query_text(query, required_params)
return mcp_client.ask_link(
question=q_text,
temperature=temperature,
subquery_context=required_params.get("subquery_context"),
top_k=top_k,
)
return {
"ask_excel": call_ask_excel,
"ask_pdf": call_ask_pdf,
"ask_link": call_ask_link,
}
'''
# AFTER (CHANGE):
def make_tool_registry(mcp_client, conversation_context: str) -> Dict[str, Any]:
def _build_query_text(query: str, params: Dict[str, Any], conversation_context: str) -> str:
parts = [query.strip()]
if params:
parts.append("Parameters: " + "; ".join(f"{k}: {v}" for k, v in params.items()))
if conversation_context:
parts.append("Conversation context:\n" + conversation_context)
return "\n".join(parts)
def call_ask_excel(query, required_params, temperature=0.1, top_k=5):
q_text = _build_query_text(query, required_params, conversation_context)
return mcp_client.ask_excel(question=q_text, top_k=top_k, sheet=required_params.get("sheet", 0), temperature=temperature)
def call_ask_pdf(query, required_params, temperature=0.1, top_k=5):
q_text = _build_query_text(query, required_params, conversation_context)
return mcp_client.ask_pdf(question=q_text, top_k=top_k, temperature=temperature)
def call_ask_link(query, required_params, temperature=0.1, top_k=5):
q_text = _build_query_text(query, required_params, "") # put convo in subquery_context instead
subctx = conversation_context if conversation_context else required_params.get("subquery_context")
# IMPORTANT: align param name with your server (query vs question)
return mcp_client.ask_link(
query=q_text, # if server expects 'query'; use question=q_text otherwise
temperature=temperature,
subquery_context=subctx,
top_k=top_k,
)
return {"ask_excel": call_ask_excel, "ask_pdf": call_ask_pdf, "ask_link": call_ask_link}
# -------------------- Execute subqueries & synthesize final --------------------
def build_execution_order(subqueries: List[SubQuery]) -> List[List[str]]:
"""
Create batches of IDs whose dependencies are satisfied (simple topological batching).
"""
if not subqueries:
return []
remaining = {sq.id: sq for sq in subqueries}
completed = set()
order: List[List[str]] = []
while remaining:
ready = [sq_id for sq_id, sq in remaining.items() if all(dep in completed for dep in sq.depends_on)]
if not ready:
raise ValueError(f"Circular or unsatisfiable dependencies: {list(remaining.keys())}")
order.append(ready)
for sq_id in ready:
completed.add(sq_id)
del remaining[sq_id]
return order
#def execute_plan(
# user_q: str,
# plan: PlanResponse,
# mcp_client,
# temperature: float = 0.1,
# top_k: int = 5
#) -> Dict[str, Any]:
# """
# Execute subqueries in batches; returns a dict of {sq_id: {tool, answer}}.
# """
# registry = make_tool_registry(mcp_client)
def execute_plan(user_q, plan, mcp_client, conversation_context: str, temperature=0.1, top_k=5):
registry = make_tool_registry(mcp_client, conversation_context)
subqs = plan.subqueries
exec_order = build_execution_order(subqs)
results: Dict[str, Any] = {}
for batch in exec_order:
for sq_id in batch:
sq = next(s for s in subqs if s.id == sq_id)
tool_fn = registry.get(sq.tool_name)
if not tool_fn:
results[sq_id] = {"tool": sq.tool_name, "answer": f"❌ Unknown tool '{sq.tool_name}'"}
continue
try:
ans = tool_fn(sq.query, sq.required_params, temperature=temperature, top_k=top_k)
results[sq_id] = {"tool": sq.tool_name, "answer": ans}
except Exception as e:
results[sq_id] = {"tool": sq.tool_name, "answer": f"❌ Error calling tool: {e}"}
return {"execution_order": exec_order, "results": results}
'''
def synthesize_answer(user_q: str, exec_result: Dict[str, Any]) -> str:
"""
Use OpenAI to write a concise final answer using all tool outputs.
"""
tool_outputs = []
for batch in exec_result.get("execution_order", []):
for sq_id in batch:
entry = exec_result["results"].get(sq_id, {})
tool_outputs.append(f"[{sq_id} β€’ {entry.get('tool')}] {entry.get('answer', '')}")
context = "\n".join(tool_outputs) if tool_outputs else "(no tool outputs)"
prompt = f"""You are a helpful assistant for JEE/JOSAA counseling.
User Question:
{user_q}
Tool Results:
{context}
Write a concise, accurate final answer grounded in the tool results.
If the tool results are insufficient, state that clearly.
Avoid bracketed tags and avoid repeating metadata like [sq1].
"""
return llm_invoke(prompt, system="You are a helpful assistant. Use only provided context.", temperature=0.2)
'''
# AFTER (CHANGE):
def synthesize_answer(user_q, exec_result, conversation_context: str):
tool_outputs = []
# ...
prompt = f"""You are a helpful assistant for JEE/JOSAA counseling.
Recent conversation:
{conversation_context or "(none)"}
User Question:
{user_q}
Tool Results:
{exec_result}
Write a concise, accurate final answer grounded in the tool results and the recent conversation.
If the available context is insufficient, state that clearly.
Avoid bracketed tags and metadata like [sq1].
"""
return llm_invoke(prompt, system="You are a helpful assistant. Use only provided context.", temperature=0.2)
# -------------------- Public entry point used by chat_app --------------------
# AFTER (CHANGE):
def run_agent(
user_q: str,
mcp_client,
available_iits: List[str],
available_branches: List[str],
years: List[str],
conversation: List[Dict[str, str]], # NEW
top_k: int = 5,
temperature: float = 0.1,
) -> str:
conversation_context = _format_history_for_context(conversation, max_turns=8)
intent = intent_detect(user_q, available_iits, available_branches, years, conversation_context)
print(intent)
print("The intent response is", f"{intent}")
if intent.intent == "chit_chat":
return (
f"Hi! I’m your JOSAA Counseling Assistant.\n"
f"Ask about branches, opening/closing ranks, or options for your rank.\n"
f"Supported IITs: {', '.join(available_iits)}; branches: {', '.join(available_branches)}."
)
if not intent.in_scope or intent.intent == "out_of_scope":
return (
"This assistant only supports JEE/JOSAA counseling.\n"
f"Supported IITs: {', '.join(available_iits)}; branches: {', '.join(available_branches)}.\n"
"Please refine your query accordingly."
)
# In-scope β†’ plan β†’ execute β†’ synthesize
plan = make_query_plan(user_q, available_iits, available_branches, years, conversation_context)
print(plan)
exec_result = execute_plan(user_q, plan, mcp_client, conversation_context, temperature=temperature, top_k=top_k)
final = synthesize_answer(user_q, exec_result, conversation_context)
return final.strip()