Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import uuid | |
| import json | |
| import random | |
| import asyncio | |
| import logging | |
| import time | |
| import traceback | |
| from html import escape | |
| from langchain_core.messages.ai import AIMessageChunk | |
| from langchain_core.messages.system import SystemMessage | |
| from langchain_core.messages.tool import ToolMessage | |
| from graph_helper import generate_graph | |
| # Logging | |
| logging.basicConfig() | |
| logger = logging.getLogger() | |
| logger.setLevel(logging.INFO) | |
| thinking_verbs = [ | |
| "thinking", | |
| "processing", | |
| "crunching data", | |
| "please wait", | |
| "just a few more seconds", | |
| "closing in", | |
| "analyzing", | |
| "reasoning", | |
| "computing", | |
| "synthesizing insight", | |
| "searching through the cosmos", | |
| "decoding ancient knowledge", | |
| "scanning the scriptures", | |
| "accessing divine memory", | |
| "gathering wisdom", | |
| "consulting the rishis", | |
| "listening to the ātmā", | |
| "channeling sacred energy", | |
| "unfolding the divine word", | |
| "meditating on the meaning", | |
| "reciting from memory", | |
| "traversing the Vedas", | |
| "seeking the inner light", | |
| "invoking paramārtha", | |
| "putting it all together", | |
| "digging deeper", | |
| "making sense of it", | |
| "connecting the dots", | |
| "almost there", | |
| "getting closer", | |
| "wrapping it up", | |
| "piecing it together", | |
| "swirling through verses", | |
| "diving into the ocean of knowledge", | |
| "lighting the lamp of understanding", | |
| "walking the path of inquiry", | |
| "aligning stars of context", | |
| ] | |
| graph = generate_graph() | |
| def add_node_to_tree( | |
| node_tree: list[str], node_label: str, tooltip: str = "no arguments to show" | |
| ) -> list[str]: | |
| if tooltip: | |
| tooltip = escape(tooltip).replace("'", "'") | |
| node_with_tooltip = ( | |
| f"<span class='node-label' title='{tooltip}'>{node_label}</span>" | |
| ) | |
| else: | |
| node_with_tooltip = node_label | |
| node_tree[-1] = node_with_tooltip | |
| node_tree.append("<span class='spinner'> </span>") | |
| return node_tree | |
| def end_node_tree(node_tree: list[str]) -> list[str]: | |
| node_tree[-1] = "🏁" | |
| return node_tree | |
| def get_args_for_toolcall(tool_calls_buffer: dict, tool_call_id: str): | |
| return ( | |
| tool_calls_buffer[tool_call_id]["args_str"] | |
| if tool_call_id in tool_calls_buffer | |
| and "args_str" in tool_calls_buffer[tool_call_id] | |
| else "" | |
| ) | |
| async def chat_wrapper( | |
| message, history, thread_id, debug, preferred_language="English" | |
| ): | |
| if debug: | |
| async for chunk in chat_streaming( | |
| debug, message, history, thread_id, preferred_language=preferred_language | |
| ): | |
| yield chunk | |
| else: | |
| response = chat( | |
| debug, message, history, thread_id, preferred_language=preferred_language | |
| ) | |
| yield response | |
| def chat(debug_mode, message, history, thread_id, preferred_language="English"): | |
| config = {"configurable": {"thread_id": thread_id}, "recursion_limit": 30} | |
| response = graph.invoke( | |
| { | |
| "debug_mode": debug_mode, | |
| "messages": [{"role": "user", "content": message}], | |
| "language": preferred_language, | |
| }, | |
| config=config, | |
| ) | |
| return response["messages"][-1].content | |
| async def chat_streaming( | |
| debug_mode: bool, message, history, thread_id, preferred_language="English" | |
| ): | |
| state = { | |
| "debug_mode": debug_mode, | |
| "messages": (history or []) + [{"role": "user", "content": message}], | |
| "language": preferred_language, | |
| } | |
| config = {"configurable": {"thread_id": thread_id}, "recursion_limit": 30} | |
| start_time = time.time() | |
| streamed_response = "" | |
| final_response = "" | |
| # final_node = "validator" | |
| MAX_CONTENT = 500 | |
| try: | |
| node_tree = ["🚩", "<span class='spinner'> </span>"] | |
| tool_calls_buffer = {} | |
| async for msg, metadata in graph.astream( | |
| state, config=config, stream_mode="messages" | |
| ): | |
| node = metadata.get("langgraph_node", "?") | |
| name = getattr(msg, "name", "-") | |
| if not isinstance(msg, ToolMessage): | |
| node_icon = "🧠" | |
| else: | |
| node_icon = "⚙️" | |
| node_label = f"{node}" | |
| tool_label = f"{name or ''}" | |
| if tool_label: | |
| node_label = node_label + f":{tool_label}" | |
| label = f"{node_icon} {node_label}" | |
| tooltip = "" | |
| if isinstance(msg, ToolMessage): | |
| tooltip = get_args_for_toolcall(tool_calls_buffer, msg.tool_call_id) | |
| # logger.info("tooltip = ", tooltip) | |
| # checking for -2 last but one. since last entry is always a spinner | |
| if node_tree[-2] != label: | |
| add_node_to_tree(node_tree, label, tooltip) | |
| full: str = escape(msg.content) | |
| truncated = (full[:MAX_CONTENT] + "…") if len(full) > MAX_CONTENT else full | |
| def generate_processing_message(): | |
| return f"<div class='thinking-bubble'><em>🤔{random.choice(thinking_verbs)} ...</em></div>" | |
| if ( | |
| not isinstance(msg, ToolMessage) | |
| and not isinstance(msg, SystemMessage) | |
| and not isinstance(msg, AIMessageChunk) | |
| ): | |
| logger.info("msg = %s", msg) | |
| if isinstance(msg, ToolMessage): | |
| logger.debug("tool message = %s", msg) | |
| html = f"<div class='thinking-bubble'><em>🤔 {msg.name} tool: {random.choice(thinking_verbs)} ...</em></div>" | |
| yield f"### { ' → '.join(node_tree)}\n{html}" | |
| elif isinstance(msg, AIMessageChunk): | |
| def truncate_middle(text, front=50, back=50): | |
| if not text: | |
| return "" | |
| if len(text) <= front + back: | |
| return text | |
| return f"{text[:front]}…{text[-back:]}".replace( | |
| "\n", "" | |
| ) # remove new lines. | |
| if not msg.content: | |
| # logger.warning("*** No Message Chunk!") | |
| yield f"### { " → ".join(node_tree)}\n{generate_processing_message()}\n<div class='intermediate-output'>{escape(truncate_middle(streamed_response))}</div>" | |
| else: | |
| # Stream intermediate messages with transparent style | |
| # if node != final_node: | |
| streamed_response += msg.content | |
| yield f"### { ' → '.join(node_tree) }\n<div class='intermediate-output'>{escape(truncate_middle(streamed_response))}</div>" | |
| # else: | |
| # Buffer the final validated response instead of yielding | |
| final_response += msg.content | |
| if msg.tool_call_chunks: | |
| for tool_call_chunk in msg.tool_call_chunks: | |
| logger.debug("*** tool_call_chunk = ", tool_call_chunk) | |
| if tool_call_chunk["id"] is not None: | |
| tool_call_id = tool_call_chunk["id"] | |
| if tool_call_id not in tool_calls_buffer: | |
| tool_calls_buffer[tool_call_id] = { | |
| "name": "", | |
| "args_str": "", | |
| "id": tool_call_id, | |
| "type": "tool_call", | |
| } | |
| # Accumulate tool call name and arguments | |
| if tool_call_chunk["name"] is not None: | |
| tool_calls_buffer[tool_call_id]["name"] += tool_call_chunk[ | |
| "name" | |
| ] | |
| if tool_call_chunk["args"] is not None: | |
| tool_calls_buffer[tool_call_id][ | |
| "args_str" | |
| ] += tool_call_chunk["args"] | |
| else: | |
| logger.debug("message = ", type(msg), msg.content[:100]) | |
| full: str = escape(msg.content) | |
| truncated = ( | |
| (full[:MAX_CONTENT] + "…") if len(full) > MAX_CONTENT else full | |
| ) | |
| html = ( | |
| f"<div class='thinking-bubble'><em>🤔 {random.choice(thinking_verbs)} ...</em></div>" | |
| f"<div style='opacity: 0.1'>" | |
| f"<strong>Telling myself:</strong> {truncated or '...'}" | |
| f"</div>" | |
| ) | |
| yield f"### { " → ".join(node_tree)}\n{html}" | |
| if getattr(msg, "tool_calls", []): | |
| logger.info("ELSE::tool_calls = %s", msg.tool_calls) | |
| node_tree[-1] = "✅" | |
| end_time = time.time() | |
| duration = end_time - start_time | |
| final_response = ( | |
| f"\n{final_response}" f"\n\n⏱️ Processed in {duration:.2f} seconds" | |
| ) | |
| buffer = f"### {' → '.join(node_tree)}\n" | |
| yield buffer | |
| for c in final_response: | |
| buffer += c | |
| yield buffer | |
| await asyncio.sleep(0.0005) | |
| logger.debug("************************************") | |
| # Now, you can process the complete tool calls from the buffer | |
| for tool_call_id, accumulated_tool_call in tool_calls_buffer.items(): | |
| # Attempt to parse arguments only if the 'args_str' isn't empty | |
| if accumulated_tool_call["args_str"]: | |
| try: | |
| parsed_args = json.loads(accumulated_tool_call["args_str"]) | |
| logger.debug(f"Tool Name: {accumulated_tool_call['name']}") | |
| logger.debug(f"Tool Arguments: {parsed_args}") | |
| except json.JSONDecodeError: | |
| logger.debug( | |
| f"Partial arguments for tool {accumulated_tool_call['name']}: {accumulated_tool_call['args_str']}" | |
| ) | |
| except asyncio.CancelledError: | |
| logger.warning("⚠️ Request cancelled by user") | |
| node_tree = end_node_tree(node_tree=node_tree) | |
| yield ( | |
| f"### {' → '.join(node_tree)}" | |
| "\n⚠️⚠️⚠️ Request cancelled by user" | |
| "\nhere is what I got so far ...\n" | |
| f"\n{streamed_response}" | |
| ) | |
| # Important: re-raise if you want upstream to also know | |
| # raise | |
| return | |
| except Exception as e: | |
| logger.error("❌❌❌ Error processing request: %s", e) | |
| traceback.print_exc() | |
| node_tree = end_node_tree(node_tree=node_tree) | |
| yield ( | |
| f"### { " → ".join(node_tree)}" | |
| f"\n❌❌❌ Error processing request : {str(e)}" | |
| "\nhere is what I got so far ...\n" | |
| f"\n{streamed_response}" | |
| ) | |
| return | |
| def init_session(): | |
| return str(uuid.uuid4()) | |
| MAX_MESSAGES_IN_CONVERSATION = 5 | |
| async def limited_chat_wrapper( | |
| message, history, thread_id, debug, preferred_language, count | |
| ): | |
| # increment **after processing the message** | |
| count += 1 | |
| # warn before reset | |
| if count == MAX_MESSAGES_IN_CONVERSATION - 1: | |
| yield [ | |
| { | |
| "role": "system", | |
| "content": "⚠️ You are allowed to ask one more follow-up. The next question will be considered a new conversation. Please wait ... processing your request ...", | |
| } | |
| ], thread_id, count | |
| await asyncio.sleep(1) | |
| # reset | |
| if count > MAX_MESSAGES_IN_CONVERSATION: | |
| thread_id = init_session() | |
| history = [] | |
| count = 1 | |
| yield [ | |
| { | |
| "role": "system", | |
| "content": "🔄 This is now considered a new question. Don't worry, your message shall still be processed! If I am giving irrelevant responses, you know why :-)", | |
| } | |
| ], thread_id, count | |
| await asyncio.sleep(1) | |
| # normal flow: stream from your original chat_wrapper | |
| final_chunk = [] | |
| async for chunk in chat_wrapper( | |
| message, history, thread_id, debug, preferred_language | |
| ): | |
| yield chunk, thread_id, count | |
| final_chunk = chunk | |
| # Simulating LLM Response | |
| # for i in range(5): | |
| # final_chunk += [{ | |
| # "role": "assistant", | |
| # "content": f"Simulated LLM output {i+1}", | |
| # }] | |
| # yield final_chunk, thread_id, count | |
| # await asyncio.sleep(0.25) | |
| yield final_chunk, thread_id, count | |