Spaces:
Sleeping
Sleeping
| import os | |
| import gradio as gr | |
| import base64 | |
| import ffmpeg, cv2, numpy as np, tempfile, io, base64, os, pathlib | |
| import openai | |
| from pathlib import Path | |
| from typing import List, TypedDict, Dict, Any | |
| from langchain_core.messages import SystemMessage, HumanMessage, AIMessage | |
| from pytube import YouTube | |
| from langchain.tools import tool | |
| from langchain_community.utilities import WikipediaAPIWrapper | |
| from langchain.agents import OpenAIFunctionsAgent, AgentExecutor | |
| from langchain_openai import ChatOpenAI | |
| from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder | |
| from langgraph.graph import START, StateGraph, END | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| import PIL.Image as Image | |
| import subprocess | |
| import requests, os, tempfile, shutil | |
| import requests | |
| import pandas as pd | |
| import time | |
| openai.api_key = os.getenv("OPENAI_API_KEY") | |
| # (Keep Constants as is) | |
| # --- Constants --- | |
| DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
| #llm = HuggingFaceInferenceAPI(model_name="meta-llama/Llama-3.2-3B-Instruct") | |
| # --- Final Agent Definition --- | |
| # ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------ | |
| general_llm = ChatOpenAI(model="gpt-4o-mini") | |
| audio_llm = "whisper-1" | |
| class AgentState(TypedDict, total=False): | |
| file_path: str | None # Contains file path | |
| question: str # Contains tabular file path (CSV) | |
| answer: str | None | |
| agent_type: str | None | |
| messages: list[AIMessage | HumanMessage | SystemMessage] | |
| def addition_tool(list: List[float]) -> float: | |
| """ | |
| Description: | |
| A simple addition tool that takes a list of numbers and returns their sum. | |
| Arguments: | |
| • list (List[float]): List of numbers to add. | |
| Return: | |
| float – The sum of the numbers in the list. | |
| """ | |
| return sum(list) | |
| def xlsx_handler(filepath: str) -> List[Dict[str, Any]]: | |
| """ | |
| Description: | |
| Load the first sheet of an Excel workbook and convert it into | |
| a JSON-serialisable list of row dictionaries (records). | |
| Arguments: | |
| • filepath (str): Absolute or relative path to the .xlsx file. | |
| Return: | |
| str – A list of dictionaries representing the column names and their values. | |
| """ | |
| # Load the Excel file | |
| df = pd.read_excel(filepath) | |
| columns = df.columns.tolist() | |
| result = [] | |
| for col in columns: | |
| result.append({"column": col, "values": df[col].tolist()}) | |
| # Convert to list of dictionaries (records) | |
| #data = df.to_dict(orient="records") | |
| # Convert to JSON string (pretty-printed) | |
| #return json.dumps(data, indent=4) | |
| return result | |
| def python_handler(filepath: str) -> str: | |
| """ | |
| Description: | |
| Execute a stand-alone Python script in a sandboxed subprocess and | |
| capture anything the script prints to stdout. Stderr is returned | |
| instead if the script exits with a non-zero status. | |
| Arguments: | |
| • filepath (str): Path to the .py file to run. | |
| Return: | |
| str – The final output of the .py file. | |
| """ | |
| try: | |
| result = subprocess.run( | |
| ["python", filepath], | |
| capture_output=True, | |
| text=True, | |
| timeout=30 # Safety | |
| ) | |
| return result.stdout.strip() if result.returncode == 0 else result.stderr | |
| except Exception as e: | |
| return f"Execution failed: {str(e)}" | |
| def video_decomposition(url: str, task: str) -> str: | |
| """ | |
| Description: | |
| Download a YouTube video, extract ≤ 10 visually distinct key frames | |
| and a Whisper transcript, feed them plus the user’s task to a | |
| vision-capable LLM, and return the model’s answer. | |
| Arguments: | |
| • url (str) : Full YouTube link. | |
| • task (str) : The question the model should answer about the clip. | |
| Return: | |
| str – The final response to the user question derived from both audio and visuals. | |
| """ | |
| with tempfile.TemporaryDirectory() as tmp: | |
| tmp_dir = pathlib.Path(tmp) | |
| # 1) Fetch clip | |
| vid_path = download_youtube(url, tmp_dir) | |
| # 2) Key-frame extraction | |
| frames = key_frames_retrieval(vid_path) | |
| # 3) Audio extraction | |
| transcript = audio_retrieval(vid_path) | |
| system_msg = SystemMessage( | |
| content=("You are a Vision AI assistant that can process videos and answer correctly the user's questions" | |
| "You are provided with key video frames, an audio transcript and a task related with those" | |
| "Read the task **carefully**, examine all the video frames and the audio transcript and your final response **MUST** be only the final answer to the task's question" | |
| "The content and format of your final respose is dictated by the task and only that") | |
| ) | |
| # 4) Build multimodal prompt | |
| parts = [ | |
| { | |
| "type": "text", | |
| "Task": (f"{task}") | |
| }, | |
| { | |
| "type": "text", | |
| "Transcript": (f"{transcript[:4000]}") | |
| } | |
| ] | |
| for im in frames: | |
| parts.extend( | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": img_to_data(im)}, | |
| } | |
| ) | |
| messages = [ | |
| system_msg, | |
| HumanMessage( | |
| content=parts | |
| ) | |
| ] | |
| response = general_llm.invoke(messages) | |
| return response | |
| def reverse_string(text: str) -> str: | |
| """ | |
| Description: | |
| Reverse the order of words *and* the letters inside each word. | |
| Converts a fully reversed sentence back to readable form. | |
| Arguments: | |
| • text (str): Original sentence to transform. | |
| Return: | |
| str – The readable reversed sentence. | |
| """ | |
| # 1️⃣ split into words, 2️⃣ reverse word order, | |
| # 3️⃣ reverse letters in each word, 4️⃣ re-join | |
| reversed_words = [word[::-1] for word in reversed(text.split())] | |
| return " ".join(reversed_words) | |
| def web_search(query: str): | |
| """ | |
| Description: | |
| A web search tool. Scrapes the top results and returns each on its own line. | |
| Arguments: | |
| • query (str) : question you want to web search. | |
| Return: | |
| str – A newline-separated text summary: '<title> — <url> : <snippet>' or 'No results found' | |
| """ | |
| search = TavilySearchResults() | |
| results = search.run(query) | |
| return "\n".join([f"- {r['content']} ({r['url']})" for r in results]) | |
| def wikipedia_search(query: str): | |
| """ | |
| Description: | |
| Query the English-language Wikipedia via the MediaWiki API and | |
| return a short plain-text extract. | |
| Arguments: | |
| • query (str) : Page title or free-text search string. | |
| Return: | |
| str – Extracted summary paragraph. | |
| """ | |
| wiki = WikipediaAPIWrapper() | |
| return wiki.run(query) | |
| def download_youtube(url: str, out_dir: pathlib.Path) -> pathlib.Path: | |
| delay = 2 | |
| yt = YouTube(url) | |
| stream = yt.streams.filter(progressive=True, file_extension="mp4")\ | |
| .order_by("resolution").desc().first() | |
| return pathlib.Path(stream.download(output_path=out_dir)) | |
| def key_frames_retrieval(video: pathlib.Path, max: int = 6, thresh: float = 0.35, max_frame_mb: float = 0.25): | |
| """ | |
| Scan *all* frames in `video`, keep every frame whose colour-histogram | |
| differs from the previous scene by more than `thresh`, then return the first | |
| `max` most-distinct ones (highest histogram distance). | |
| Returns | |
| ------- | |
| List[PIL.Image] # ≤ `limit` images, sorted by descending “scene change” score | |
| """ | |
| cap = cv2.VideoCapture(str(video)) | |
| ok, frame = cap.read() | |
| if not ok: | |
| cap.release() | |
| return [] | |
| def hsv_hist(img) -> np.ndarray: | |
| return cv2.calcHist( | |
| [cv2.cvtColor(img, cv2.COLOR_BGR2HSV)], | |
| [0, 1], None, [50, 60], [0, 180, 0, 256] | |
| ) | |
| def bgr_to_pil(bgr) -> Image.Image: | |
| img = Image.fromarray(cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)) | |
| # shrink oversized frames so base64 prompt stays small | |
| if (img.width * img.height * 3 / 1_048_576) > max_frame_mb: | |
| img.thumbnail((800, 800)) | |
| return img | |
| prev_hist = hsv_hist(frame) | |
| candidates: list[tuple[float, Image.Image]] = [(1.0, bgr_to_pil(frame))] # always keep first | |
| while ok: | |
| ok, frame = cap.read() | |
| if not ok: | |
| break | |
| hist = hsv_hist(frame) | |
| diff = cv2.compareHist(prev_hist, hist, cv2.HISTCMP_BHATTACHARYYA) | |
| if diff > thresh: | |
| candidates.append((diff, bgr_to_pil(frame))) | |
| prev_hist = hist | |
| cap.release() | |
| candidates.sort(key=lambda t: t[0], reverse=True) | |
| top_frames = [img for _, img in candidates[:max]] | |
| return top_frames | |
| def audio_retrieval(video: pathlib.Path) -> str: | |
| """ | |
| Extract the audio track from `video`, save it as a temporary MP3, | |
| and return the transcript produced by `audio_llm.audio_to_text`. | |
| """ | |
| with tempfile.NamedTemporaryFile(suffix=".mp3") as tmp_mp3: | |
| ( | |
| ffmpeg | |
| .input(str(video)) | |
| .output( | |
| tmp_mp3.name, | |
| ac=1, ar="16000", # mono, 16 kHz (keeps Whisper happy) | |
| audio_bitrate="128k", | |
| format="mp3", | |
| loglevel="quiet" | |
| ) | |
| .overwrite_output() | |
| .run() | |
| ) | |
| tmp_mp3.seek(0) # rewind before passing the handle | |
| transcript = openai.audio.transcriptions.create(model=audio_llm, file=tmp_mp3, response_format="text") | |
| return transcript | |
| def img_to_data(img: Image.Image) -> str: | |
| buf = io.BytesIO(); img.save(buf, format="PNG", optimize=True) | |
| b64 = base64.b64encode(buf.getvalue()).decode() | |
| return f"data:image/png;base64,{b64}" | |
| def task_examiner(state: AgentState): | |
| file_path = state["file_path"] | |
| if file_path != None: | |
| p = Path(file_path) | |
| suffix = p.suffix | |
| if suffix == ".png": | |
| state["agent_type"] = "vision" | |
| elif suffix == ".mp3": | |
| state["agent_type"] = "audio" | |
| elif suffix == ".py" or suffix == ".xlsx": | |
| state["agent_type"] = "code" | |
| else: | |
| #if "video" in state["question"]: | |
| # state["agent_type"] = "vision" | |
| #else: | |
| state["agent_type"] = "general" | |
| return state | |
| def task_router(state: AgentState) -> str: | |
| return state["agent_type"] | |
| def general_agent(state: AgentState): | |
| question = state["question"] | |
| tools = [web_search, wikipedia_search, reverse_string] | |
| system_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", | |
| """ | |
| SYSTEM GUIDELINES: | |
| - You are a general AI assistant that is tasked with answering correctly the user's questions. | |
| - You have several tools in your disposal for differend kinds of tasks. | |
| - You **MUST** think step by step before using any tool and call the tools only when you are sure that you need them. | |
| **Tool-reuse rule:** | |
| - Keep an internal list of tool names you have already called in this answer | |
| - If a name is on that list you MUST NOT call it again. (You may still call a different tool once.) | |
| TOOLS: | |
| - reverse_string: This is a tool that reverses a sentence so if a question is not readable then try to pass it to this tool. | |
| - web_search: This tool takes a question as input and searches the web for up-to-date information and return an answer. | |
| - wikipedia_search: This searches exclusively the english wikipedia page for up-to-date information that may not available in your training data. | |
| INPUT FORMAT: | |
| - A question (text) that you should answer correctly. | |
| OUTPUT FORMAT: | |
| Output **ONLY** the final answer dictated by the user's question and only that. | |
| **NEVER** wrap your final answer like this: <sentence> answer </sentence>. | |
| <**IMPORTANT**> If the question contains a youtube link (https://www.youtube.com/watch?...) and **ONLY THEN** output this "Don't know". | |
| If the question tells you to output 'How many ...' you **MUST** response with **only** a single numeral and absolutely nothing else (no punctuation, no sentence, no units). | |
| If the question tells you to output 'What number ...' you **MUST** response with **only** a single numeral and absolutely nothing else (no punctuation, no sentence, no units). | |
| If the question tells you to output 'Who did ...' you **MUST** response with **only** the full name unless the question directs you otherwise and absolutely nothing else (no punctuation, no sentence, no units). | |
| If the question asks to provide a comma-separated list that you **MUST** response with **only** a comma-separated list '[...,...,...]'. **ABSOLUTELY NEVER** output a list like this a,b,c,d,e. | |
| If the question asks to output a list -> Output: [item1,item2,item3] | |
| If the question tells you to output 'What does the person A say when ...' you **MUST** response with **only** the phrase that person says and absolutely nothing else (no punctuation, no sentence, no units). | |
| """), | |
| ("user", "{input}"), | |
| MessagesPlaceholder("agent_scratchpad"), | |
| ]) | |
| agent = OpenAIFunctionsAgent( | |
| llm=general_llm, | |
| tools=tools, | |
| prompt=system_prompt | |
| ) | |
| agent_executor = AgentExecutor.from_agent_and_tools( | |
| agent=agent, | |
| tools=tools, | |
| verbose=True, | |
| ) | |
| response = agent_executor.invoke({"input": question}) | |
| state["answer"] = response["output"] | |
| return state | |
| def audio_agent(state: AgentState): | |
| with open(state["file_path"], "rb") as f: | |
| transcript = openai.audio.transcriptions.create(model=audio_llm, file=f, response_format="text") | |
| question = state["question"] | |
| system_msg = SystemMessage( | |
| content=("You are an AI assistant that answers the user's question based solely on the provided transcript." | |
| "When the user asks for a “comma-delimited / comma-separated list”, you must:" | |
| " - Filter the items exactly as requested." | |
| " - Output one single line that contains the items separated by commas and a space enclosed in square brackets." | |
| " - Output nothing else- no extra words or explanations" | |
| "OUTPUT FORMAT EXAMPLES:" | |
| "If asked to output a list -> Output: [item1,item2,item3]" | |
| "If asked something else -> Output: text answering exactly that question and nothing more" | |
| ) | |
| ) | |
| messages = [ | |
| system_msg, | |
| HumanMessage( | |
| content=[ | |
| { | |
| "type": "text", | |
| "text": f"Transcript:\n{transcript}\n\nQuestion:\n{question}" | |
| } | |
| ] | |
| ) | |
| ] | |
| response = general_llm.invoke(messages) | |
| state["answer"] = response.content.strip() | |
| return state | |
| def vision_agent(state: AgentState): | |
| file_path = state["file_path"] | |
| question = state["question"] | |
| with open(file_path, "rb") as image_file: | |
| image_bytes = image_file.read() | |
| image_base64 = base64.b64encode(image_bytes).decode("utf-8") | |
| system_msg = SystemMessage( | |
| content=(""" | |
| You are a Vision AI assistant that can process images and answer correctly the user's questions" | |
| **OUTPUT** only the final answer and absolutely nothing else (no punctuation, no sentence, no units). | |
| """) | |
| ) | |
| messages = [ | |
| system_msg, | |
| HumanMessage( | |
| content=[ | |
| { | |
| "type": "text", | |
| "text": (f"{question}") | |
| }, | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/png;base64,{image_base64}" | |
| }, | |
| } | |
| ] | |
| ) | |
| ] | |
| response = general_llm.invoke(messages) | |
| state["answer"] = response.content.strip() | |
| return state | |
| def code_agent(state: AgentState): | |
| file_path = state["file_path"] | |
| question = state["question"] | |
| tools = [xlsx_handler, python_handler, addition_tool] | |
| system_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", | |
| """ SYSTEM GUIDELINES: | |
| - You are a data AI assistant and your job is to answer questions that depend on .xlsx or .py files. | |
| - You have in your disposal 2 tools that are mandatory for solving the tasks. | |
| - You **MUST** use the tools as instructed below and you **MUST** output only the final numeric result of the task. | |
| INPUT FORMAT: | |
| - A question (text) based on a file which will be either .py or .xlsx. | |
| - The path of the file related to the question. | |
| TOOLS: | |
| - Tool name: xlsx_handler, Purpose: This is the tool you should use if the file contained in the file_path is an .xlsx file and it's purpose is to return the contents of the file in a list of dictionaries for you to process, reason **INTERNALLY** and output only the final numeric result. | |
| - Tool name: python_handler, Purpose: This is the tool you should use if the file contained in the file_path is a .py file and it's purpose is to execute the python file and return the final numeric result of it. | |
| - Tool name: addition_tool, Purpose: This is the tool you should use if the question asks you to sum a list of numbers and return the final numeric result. | |
| EXAMPLE OUTPUTS: | |
| - Input: "What is the result of the code in the file?" Output: "5" | |
| - Input: "What is the total sales mentioned in the file. Your answer must have 2 decimal places?" Output: "305.00" | |
| - YOU MUST OUTPUT ONLY THE FINAL NUMBER. | |
| The file relevant to the task is at: {file_path}."""), | |
| ("user", "{input}"), | |
| MessagesPlaceholder("agent_scratchpad"), | |
| ]) | |
| agent = OpenAIFunctionsAgent( | |
| llm=general_llm, | |
| tools=tools, | |
| prompt=system_prompt | |
| ) | |
| agent_executor = AgentExecutor.from_agent_and_tools( | |
| agent=agent, | |
| tools=tools, | |
| verbose=True, | |
| ) | |
| #agent_executor = agent_executor.partial(file_path=file_path) | |
| response = agent_executor.invoke({"input": question, "file_path": file_path}) | |
| state["answer"] = response["output"] | |
| return state | |
| class Agent_Workflow: | |
| def __init__(self): | |
| print("Agent Workflow initialized.") | |
| def __call__(self, question: str, filepath: str) -> str: | |
| builder = StateGraph(AgentState) | |
| # Agent Nodes | |
| builder.add_node("task_examiner", task_examiner) | |
| builder.add_node("general_agent", general_agent) | |
| builder.add_node("audio_agent", audio_agent) | |
| builder.add_node("vision_agent", vision_agent) | |
| builder.add_node("code_agent", code_agent) | |
| # Edges that connect agent nodes | |
| builder.add_edge(START, "task_examiner") | |
| builder.add_conditional_edges("task_examiner", task_router, | |
| { | |
| "general": "general_agent", | |
| "audio": "audio_agent", | |
| "vision": "vision_agent", | |
| "code": "code_agent" | |
| } | |
| ) | |
| builder.add_edge("general_agent", END) | |
| builder.add_edge("audio_agent", END) | |
| builder.add_edge("vision_agent", END) | |
| builder.add_edge("code_agent", END) | |
| workflow_graph = builder.compile() | |
| state = workflow_graph.invoke({"file_path": filepath, "question": question, "answer": "",}) | |
| return state["answer"] | |
| def fetch_task_file_static(task_id: str, file_name: str | None = None, session: requests.Session | None = None) -> Path: | |
| """ | |
| Download the attachment for `task_id` to temp_files/<task_id>.<suffix> | |
| """ | |
| if file_name == None: | |
| return None | |
| # Decide the suffix | |
| suffix = Path(file_name).suffix if file_name else "" | |
| dest = "temp/"+task_id+suffix | |
| url = f"{DEFAULT_API_URL}/files/{task_id}" | |
| s = session or requests | |
| with s.get(url, stream=True, timeout=30) as r: | |
| r.raise_for_status() | |
| with open(dest, "wb") as f: | |
| shutil.copyfileobj(r.raw, f) | |
| return dest | |
| def run_and_submit_all( profile: gr.OAuthProfile | None): | |
| """ | |
| Fetches all questions, runs the FinalAgent on them, submits all answers, | |
| and displays the results. | |
| """ | |
| if profile: | |
| username= f"{profile.username}" | |
| print(f"User logged in: {username}") | |
| else: | |
| print("User not logged in.") | |
| return "Please Login to Hugging Face with the button.", None | |
| api_url = DEFAULT_API_URL | |
| questions_url = f"{api_url}/questions" | |
| submit_url = f"{api_url}/submit" | |
| # 1. Instantiate Agent ( modify this part to create your agent) | |
| try: | |
| agent = Agent_Workflow() | |
| except Exception as e: | |
| print(f"Error instantiating agent: {e}") | |
| return f"Error initializing agent: {e}", None | |
| # In the case of an app running as a hugging Face space, this link points toward your codebase ( usefull for others so please keep it public) | |
| agent_code = f"https://huggingface.co/spaces/Psiska/Final_Assignment/tree/main" | |
| # 2. Fetch Questions | |
| print(f"Fetching questions from: {questions_url}") | |
| try: | |
| response = requests.get(questions_url, timeout=15) | |
| response.raise_for_status() | |
| questions_data = response.json() | |
| if not questions_data: | |
| print("Fetched questions list is empty.") | |
| return "Fetched questions list is empty or invalid format.", None | |
| print(f"Fetched {len(questions_data)} questions.") | |
| except requests.exceptions.RequestException as e: | |
| print(f"Error fetching questions: {e}") | |
| return f"Error fetching questions: {e}", None | |
| except requests.exceptions.JSONDecodeError as e: | |
| print(f"Error decoding JSON response from questions endpoint: {e}") | |
| print(f"Response text: {response.text[:500]}") | |
| return f"Error decoding server response for questions: {e}", None | |
| except Exception as e: | |
| print(f"An unexpected error occurred fetching questions: {e}") | |
| return f"An unexpected error occurred fetching questions: {e}", None | |
| # 3. Run your Agent | |
| results_log = [] | |
| answers_payload = [] | |
| print(f"Running agent on {len(questions_data)} questions...") | |
| session = requests.Session() | |
| j=0 | |
| for item in questions_data: | |
| task_id = item["task_id"] | |
| question = item["question"] | |
| file_name = item.get("file_name") | |
| file_path = None | |
| if file_name: | |
| try: | |
| file_path = fetch_task_file_static(task_id, file_name, session=session) | |
| except requests.HTTPError as e: | |
| print(f"⚠️ Couldn’t fetch file for {task_id}: {e}") | |
| #print(f"Question is : {question}\n") | |
| #[2,4,5,6,7,8,10,12,15,16,17] | |
| """ | |
| if j in [2,4,5,6,7,8,10,12,15,16,17]: | |
| time.sleep(5) | |
| print(f"Question is : {question}") | |
| print(f"File path is : {file_path}") | |
| submitted_answer = agent(question=question, filepath=file_path) | |
| print(f"Answer is : {submitted_answer}") | |
| j=j+1 | |
| """ | |
| print(f"Question {j+1} is : {question}") | |
| print(f"File path is : {file_path}") | |
| if not task_id or question is None: | |
| print(f"Skipping item with missing task_id or question: {item}") | |
| continue | |
| try: | |
| submitted_answer = agent(question=question, filepath=file_path) | |
| print(f"Answer for question {j+1} is: {submitted_answer}") | |
| answers_payload.append({"task_id": task_id, "submitted_answer": submitted_answer}) | |
| results_log.append({"Task ID": task_id, "Question": question, "Submitted Answer": submitted_answer}) | |
| except Exception as e: | |
| print(f"Error running agent on task {task_id}: {e}") | |
| results_log.append({"Task ID": task_id, "Question": question, "Submitted Answer": f"AGENT ERROR: {e}"}) | |
| j=j+1 | |
| if not answers_payload: | |
| print("Agent did not produce any answers to submit.") | |
| return "Agent did not produce any answers to submit.", pd.DataFrame(results_log) | |
| # 4. Prepare Submission | |
| submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload} | |
| status_update = f"Agent finished. Submitting {len(answers_payload)} answers for user '{username}'..." | |
| print(status_update) | |
| # 5. Submit | |
| print(f"Submitting {len(answers_payload)} answers to: {submit_url}") | |
| try: | |
| response = requests.post(submit_url, json=submission_data, timeout=60) | |
| response.raise_for_status() | |
| result_data = response.json() | |
| final_status = ( | |
| f"Submission Successful!\n" | |
| f"User: {result_data.get('username')}\n" | |
| f"Overall Score: {result_data.get('score', 'N/A')}% " | |
| f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n" | |
| f"Message: {result_data.get('message', 'No message received.')}" | |
| ) | |
| print("Submission successful.") | |
| results_df = pd.DataFrame(results_log) | |
| return final_status, results_df | |
| except requests.exceptions.HTTPError as e: | |
| error_detail = f"Server responded with status {e.response.status_code}." | |
| try: | |
| error_json = e.response.json() | |
| error_detail += f" Detail: {error_json.get('detail', e.response.text)}" | |
| except requests.exceptions.JSONDecodeError: | |
| error_detail += f" Response: {e.response.text[:500]}" | |
| status_message = f"Submission Failed: {error_detail}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.Timeout: | |
| status_message = "Submission Failed: The request timed out." | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except requests.exceptions.RequestException as e: | |
| status_message = f"Submission Failed: Network error - {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| except Exception as e: | |
| status_message = f"An unexpected error occurred during submission: {e}" | |
| print(status_message) | |
| results_df = pd.DataFrame(results_log) | |
| return status_message, results_df | |
| # --- Build Gradio Interface using Blocks --- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Basic Agent Evaluation Runner") | |
| gr.Markdown( | |
| """ | |
| **Instructions:** | |
| 1. Please clone this space, then modify the code to define your agent's logic, the tools, the necessary packages, etc ... | |
| 2. Log in to your Hugging Face account using the button below. This uses your HF username for submission. | |
| 3. Click 'Run Evaluation & Submit All Answers' to fetch questions, run your agent, submit answers, and see the score. | |
| --- | |
| **Disclaimers:** | |
| Once clicking on the "submit button, it can take quite some time ( this is the time for the agent to go through all the questions). | |
| This space provides a basic setup and is intentionally sub-optimal to encourage you to develop your own, more robust solution. For instance for the delay process of the submit button, a solution could be to cache the answers and submit in a seperate action or even to answer the questions in async. | |
| """ | |
| ) | |
| gr.LoginButton() | |
| run_button = gr.Button("Run Evaluation & Submit All Answers") | |
| status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False) | |
| # Removed max_rows=10 from DataFrame constructor | |
| results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True) | |
| run_button.click( | |
| fn=run_and_submit_all, | |
| outputs=[status_output, results_table] | |
| ) | |
| if __name__ == "__main__": | |
| print(os.getenv("HF_TOKEN")) | |
| print("\n" + "-"*30 + " App Starting " + "-"*30) | |
| # Check for SPACE_HOST and SPACE_ID at startup for information | |
| space_host_startup = os.getenv("SPACE_HOST") | |
| space_id_startup = os.getenv("SPACE_ID") # Get SPACE_ID at startup | |
| if space_host_startup: | |
| print(f"✅ SPACE_HOST found: {space_host_startup}") | |
| print(f" Runtime URL should be: https://{space_host_startup}.hf.space") | |
| else: | |
| print("ℹ️ SPACE_HOST environment variable not found (running locally?).") | |
| if space_id_startup: # Print repo URLs if SPACE_ID is found | |
| print(f"✅ SPACE_ID found: {space_id_startup}") | |
| print(f" Repo URL: https://huggingface.co/spaces/{space_id_startup}") | |
| print(f" Repo Tree URL: https://huggingface.co/spaces/{space_id_startup}/tree/main") | |
| else: | |
| print("ℹ️ SPACE_ID environment variable not found (running locally?). Repo URL cannot be determined.") | |
| print("-"*(60 + len(" App Starting ")) + "\n") | |
| print("Launching Gradio Interface for Basic Agent Evaluation...") | |
| demo.launch(debug=True, share=False) |