""" Data Loader: Load from HuggingFace, parse JSON files, and build tables. """ import json import pandas as pd from pathlib import Path from bisect import bisect_left from datasets import load_dataset # Global caches HF_DATASET_CACHE = {} LEADERBOARD_CACHE = {} # Compact search index: tuples (model_lower, model_name, leaderboard_lower) MODEL_SEARCH_INDEX = [] # Prefix map for fast narrowing by model prefix MODEL_PREFIX_MAP = {} # Lightweight incremental cache LAST_QUERY = "" LAST_RESULTS = [] DATA_DIR = Path("leaderboard_data") def load_hf_dataset_on_startup(): """Load all splits from HuggingFace dataset at startup.""" print("Loading dataset from HuggingFace...") try: dataset = load_dataset("evaleval/every_eval_ever") for split_name, split_data in dataset.items(): print(f"Loading split: {split_name} ({len(split_data)} rows)") df = split_data.to_pandas() parsed_items = [] for _, row in df.iterrows(): # New schema (v0.1.0) stores complex fields as JSON strings in parquet # and uses unified top-level keys that mirror eval.schema.json. # # We keep this flexible so it works with both the old dataset # (flattened columns) and the new one (JSON columns). def _safe_json_load(value): if isinstance(value, str): try: return json.loads(value) except Exception: return value return value # --- Core structured fields --- evaluation_results = ( _safe_json_load(row.get("evaluation_results", "[]")) or [] ) source_metadata = ( _safe_json_load(row.get("source_metadata", "{}")) or {} ) source_data = _safe_json_load(row.get("source_data", "[]")) or [] model_info = _safe_json_load(row.get("model_info", "{}")) or {} # Some older parquet versions had flattened columns instead of JSON blobs. # We transparently patch those into the new structure if present. if not source_metadata: # Old columns: source_organization_name, evaluator_relationship, # source_organization_url, source_organization_logo_url, evaluation_source_name, evaluation_source_type sm = {} if pd.notna(row.get("evaluation_source_name", None)): sm["source_name"] = row["evaluation_source_name"] if pd.notna(row.get("evaluation_source_type", None)): sm["source_type"] = row["evaluation_source_type"] if pd.notna(row.get("source_organization_name", None)): sm["source_organization_name"] = row["source_organization_name"] if pd.notna(row.get("source_organization_url", None)): sm["source_organization_url"] = row["source_organization_url"] if pd.notna(row.get("source_organization_logo_url", None)): sm["source_organization_logo_url"] = row[ "source_organization_logo_url" ] if pd.notna(row.get("evaluator_relationship", None)): sm["evaluator_relationship"] = row["evaluator_relationship"] source_metadata = sm if not source_data: # Old schema used `source_data` as list of URLs already; if we see a # plain string, wrap it into a list for consistency. raw_sd = row.get("source_data") if isinstance(raw_sd, str) and raw_sd: source_data = [raw_sd] if not model_info: # Old flattened model columns mi = {} if pd.notna(row.get("model_name", None)): mi["name"] = row["model_name"] if pd.notna(row.get("model_id", None)): mi["id"] = row["model_id"] if pd.notna(row.get("model_developer", None)): mi["developer"] = row["model_developer"] if pd.notna(row.get("model_inference_platform", None)): mi["inference_platform"] = row["model_inference_platform"] model_info = mi additional_details = {} # New schema: additional_details lives inside model_info if isinstance(model_info, dict): additional_details = model_info.get("additional_details") or {} # Old schema sometimes had an `additional_details` top-level column # with JSON, we still honour that as a source of params/precision/arch. if not additional_details and pd.notna( row.get("additional_details", None) ): additional_details = ( _safe_json_load(row["additional_details"]) or {} ) results = {} for eval_result in evaluation_results: eval_name = eval_result.get("evaluation_name") score = eval_result.get("score_details", {}).get("score") if eval_name and score is not None: results[eval_name] = score parsed_item = { "leaderboard": row.get("_leaderboard", "unknown_leaderboard"), # Provider is the organization owning the source/leaderboard "provider": source_metadata.get( "source_organization_name", "Unknown Provider" ), # Prefer the canonical model id from the new schema; fall back to old columns "model": model_info.get("id") or row.get("_model") or row.get("model_id", "Unknown Model"), "developer": model_info.get("developer") or row.get("_developer") or row.get("model_developer", "Unknown Developer"), "params": additional_details.get("params_billions"), "architecture": additional_details.get("architecture", "Unknown"), "precision": additional_details.get("precision", "Unknown"), "results": results, "raw_data": { "schema_version": row.get("schema_version"), "evaluation_id": row.get("evaluation_id"), "retrieved_timestamp": row.get("retrieved_timestamp"), "source_data": source_data, "source_metadata": source_metadata, "model_info": model_info, "evaluation_results": evaluation_results, "additional_details": additional_details, }, } parsed_items.append(parsed_item) HF_DATASET_CACHE[split_name] = parsed_items print(f"Loaded {len(HF_DATASET_CACHE)} leaderboard(s) from HuggingFace") _build_search_index() return True except Exception as e: print(f"Warning: Could not load HuggingFace dataset: {e}") print("Falling back to local file system...") return False def parse_eval_json(file_path): """Parses a single JSON file to extract model, provider, and results.""" try: with open(file_path, "r") as f: data = json.load(f) # New schema (v0.1.0) removes `evaluation_source` and moves most # metadata into `source_metadata` and `model_info.additional_details`. source_meta = data.get("source_metadata", {}) or {} model_info = data.get("model_info", {}) or {} # Leaderboard name: # - new schema: source_metadata.source_name # - old schema: evaluation_source.evaluation_source_name leaderboard_name = source_meta.get("source_name") if not leaderboard_name: leaderboard_name = data.get("evaluation_source", {}).get( "evaluation_source_name", "Unknown Leaderboard" ) provider_name = source_meta.get("source_organization_name", "Unknown Provider") model_id = model_info.get("id", "Unknown Model") developer_name = model_info.get("developer", "Unknown Developer") # Model-level details: additional_details = model_info.get("additional_details") or {} # Backwards compatibility with old layout if not additional_details: additional_details = data.get("additional_details", {}) or {} params = additional_details.get("params_billions") architecture = additional_details.get("architecture", "Unknown") precision = additional_details.get("precision", "Unknown") if precision == "Unknown": precision = model_info.get("precision", "Unknown") results = {} if "evaluation_results" in data: for res in data["evaluation_results"]: eval_name = res.get("evaluation_name", "Unknown Metric") score = res.get("score_details", {}).get("score", None) if score is not None: results[eval_name] = score return { "leaderboard": leaderboard_name, "provider": provider_name, "model": model_id, "developer": developer_name, "params": params, "architecture": architecture, "precision": precision, "results": results, "raw_data": data, } except Exception as e: print(f"Error parsing {file_path}: {e}") return None def get_available_leaderboards(): """Returns available leaderboards from HF cache or local directory.""" if HF_DATASET_CACHE: return list(HF_DATASET_CACHE.keys()) if not DATA_DIR.exists(): return [] return [d.name for d in DATA_DIR.iterdir() if d.is_dir()] def walk_eval_files(leaderboard_name): """Generator that walks through Leaderboard directory recursively.""" lb_path = DATA_DIR / leaderboard_name if not lb_path.exists(): return yield from lb_path.rglob("*.json") def get_eval_metadata(selected_leaderboard): """Extracts evaluation metadata from the leaderboard data.""" if not selected_leaderboard: return {} eval_metadata = {"evals": {}, "source_info": {}} if selected_leaderboard in HF_DATASET_CACHE: parsed_items = HF_DATASET_CACHE[selected_leaderboard] if parsed_items: parsed = parsed_items[0] source_meta = parsed["raw_data"].get("source_metadata", {}) or {} source_data_val = parsed["raw_data"].get("source_data", []) # source_data can be either: # - list[str] (URLs) OR # - object describing a HF dataset. For the latter, we skip the URL. url = "#" if isinstance(source_data_val, list) and source_data_val: url = source_data_val[0] eval_metadata["source_info"] = { "organization": source_meta.get("source_organization_name", "Unknown"), "relationship": source_meta.get("evaluator_relationship", "Unknown"), "url": url, } if "evaluation_results" in parsed["raw_data"]: for res in parsed["raw_data"]["evaluation_results"]: eval_name = res.get("evaluation_name", "Unknown Metric") if eval_name not in eval_metadata["evals"]: metric_config = res.get("metric_config", {}) eval_metadata["evals"][eval_name] = { "description": metric_config.get( "evaluation_description", "No description available" ), "score_type": metric_config.get("score_type", "unknown"), "lower_is_better": metric_config.get( "lower_is_better", False ), "min_score": metric_config.get("min_score"), "max_score": metric_config.get("max_score"), "level_names": metric_config.get("level_names", []), "level_metadata": metric_config.get("level_metadata", []), "has_unknown_level": metric_config.get( "has_unknown_level", False ), } return eval_metadata # Fall back to file system for json_file in walk_eval_files(selected_leaderboard): parsed = parse_eval_json(json_file) if parsed: if not eval_metadata["source_info"]: source_meta = parsed["raw_data"].get("source_metadata", {}) source_data_list = parsed["raw_data"].get("source_data", []) url = ( source_data_list[0] if isinstance(source_data_list, list) and source_data_list else "#" ) eval_metadata["source_info"] = { "organization": source_meta.get( "source_organization_name", "Unknown" ), "relationship": source_meta.get( "evaluator_relationship", "Unknown" ), "url": url, } if "evaluation_results" in parsed["raw_data"]: for res in parsed["raw_data"]["evaluation_results"]: eval_name = res.get("evaluation_name", "Unknown Metric") if eval_name not in eval_metadata["evals"]: metric_config = res.get("metric_config", {}) eval_metadata["evals"][eval_name] = { "description": metric_config.get( "evaluation_description", "No description available" ), "score_type": metric_config.get("score_type", "unknown"), "lower_is_better": metric_config.get( "lower_is_better", False ), "min_score": metric_config.get("min_score"), "max_score": metric_config.get("max_score"), "level_names": metric_config.get("level_names", []), "level_metadata": metric_config.get("level_metadata", []), "has_unknown_level": metric_config.get( "has_unknown_level", False ), } break return eval_metadata def build_leaderboard_table( selected_leaderboard, search_query="", progress_callback=None ): """Builds the leaderboard DataFrame from cache or files.""" if not selected_leaderboard: return pd.DataFrame() if selected_leaderboard in LEADERBOARD_CACHE: df, _ = LEADERBOARD_CACHE[selected_leaderboard] else: rows = [] if selected_leaderboard in HF_DATASET_CACHE: if progress_callback: progress_callback( 0, desc=f"Loading {selected_leaderboard} from cache..." ) parsed_items = HF_DATASET_CACHE[selected_leaderboard] for i, parsed in enumerate(parsed_items): if i % 100 == 0 and progress_callback: progress_callback( (i / len(parsed_items)), desc=f"Processing {selected_leaderboard}...", ) row = { "Model": parsed["model"], "Developer": parsed["developer"], "Params (B)": parsed["params"], "Arch": parsed["architecture"], "Precision": parsed["precision"], } row.update(parsed["results"]) rows.append(row) else: # Fall back to file system if progress_callback: progress_callback(0, desc=f"Scanning {selected_leaderboard}...") all_files = list(walk_eval_files(selected_leaderboard)) total_files = len(all_files) for i, json_file in enumerate(all_files): if i % 100 == 0 and progress_callback: progress_callback( (i / total_files), desc=f"Loading {selected_leaderboard}..." ) parsed = parse_eval_json(json_file) if parsed: row = { "Model": parsed["model"], "Developer": parsed["developer"], "Params (B)": parsed["params"], "Arch": parsed["architecture"], "Precision": parsed["precision"], } row.update(parsed["results"]) rows.append(row) if not rows: df = pd.DataFrame( columns=["Model", "Developer", "Params (B)", "Arch", "Precision"] ) LEADERBOARD_CACHE[selected_leaderboard] = (df, None) return df df = pd.DataFrame(rows) df = df.dropna(axis=1, how="all") if df.empty: LEADERBOARD_CACHE[selected_leaderboard] = (df, None) return df numeric_cols = df.select_dtypes(include=["float", "int"]).columns df[numeric_cols] = df[numeric_cols].round(2) # Add Average Score eval_only_cols = [c for c in numeric_cols if c not in ["Params (B)"]] if len(eval_only_cols) > 0: df["Average"] = df[eval_only_cols].mean(axis=1).round(2) # Base columns: Model, Developer, Params, Average # Eval columns: all evaluation scores # Model detail columns: Arch, Precision (moved to end) base_cols = ["Model", "Developer", "Params (B)", "Average"] model_detail_cols = ["Arch", "Precision"] eval_cols = [ c for c in df.columns if c not in base_cols and c not in model_detail_cols ] base_cols = [c for c in base_cols if c in df.columns] model_detail_cols = [c for c in model_detail_cols if c in df.columns] final_cols = base_cols + sorted(eval_cols) + model_detail_cols df = df[final_cols] if "Average" in df.columns: df = df.sort_values("Average", ascending=False) LEADERBOARD_CACHE[selected_leaderboard] = (df, None) return df def clear_cache(): """Clears all caches.""" LEADERBOARD_CACHE.clear() def _build_search_index(): """Build compact sorted search index for fast prefix/substring matching.""" global MODEL_SEARCH_INDEX, MODEL_PREFIX_MAP, LAST_QUERY, LAST_RESULTS entries = [] for leaderboard_name, parsed_items in HF_DATASET_CACHE.items(): lb_lower = leaderboard_name.lower() for item in parsed_items: model_name = item.get("model", "") entries.append((model_name.lower(), model_name, lb_lower)) # Sort by model_lower for prefix binary search MODEL_SEARCH_INDEX = sorted(entries, key=lambda x: x[0]) # Build small prefix map (first 2 chars of model) to narrow searches MODEL_PREFIX_MAP = {} for model_lower, model_name, lb_lower in MODEL_SEARCH_INDEX: key = model_lower[:2] if len(model_lower) >= 2 else model_lower MODEL_PREFIX_MAP.setdefault(key, []).append((model_lower, model_name, lb_lower)) # Reset incremental cache LAST_QUERY = "" LAST_RESULTS = [] print(f"Built search index with {len(MODEL_SEARCH_INDEX)} entries") def get_model_suggestions_fast(query, limit=15): """Fast search with prefix narrowing and incremental reuse (substring only).""" global LAST_QUERY, LAST_RESULTS if not query or len(query) < 2 or not MODEL_SEARCH_INDEX: return [] query_lower = query.lower() results = [] # Incremental reuse: if user keeps typing the same prefix, reuse last pool base_pool = None if LAST_QUERY and query_lower.startswith(LAST_QUERY) and LAST_RESULTS: base_pool = LAST_RESULTS else: prefix_key = query_lower[:2] base_pool = MODEL_PREFIX_MAP.get(prefix_key, MODEL_SEARCH_INDEX) # 1) Prefix match on model names if base_pool is MODEL_SEARCH_INDEX: idx = bisect_left(MODEL_SEARCH_INDEX, (query_lower,)) while idx < len(MODEL_SEARCH_INDEX) and len(results) < limit: name_lower, name_orig, lb_lower = MODEL_SEARCH_INDEX[idx] if name_lower.startswith(query_lower): results.append((0, len(name_lower), name_orig)) idx += 1 else: break else: for name_lower, name_orig, lb_lower in base_pool: if name_lower.startswith(query_lower): results.append((0, len(name_lower), name_orig)) if len(results) >= limit: break # 2) Substring fallback on the narrowed pool if len(results) < limit: seen = {r[2] for r in results} # Use full index for substring to catch leaderboard-name matches scan_pool = MODEL_SEARCH_INDEX for name_lower, name_orig, lb_lower in scan_pool: if name_orig in seen: continue pos_model = name_lower.find(query_lower) pos_lb = lb_lower.find(query_lower) if pos_model != -1 or pos_lb != -1: # Prefer model matches; leaderboard-only matches still allowed pos = pos_model if pos_model != -1 else (pos_lb + 1) results.append((pos, len(name_lower), name_orig)) if len(results) >= limit * 2: break results.sort(key=lambda x: (x[0], x[1])) # Update incremental cache LAST_QUERY = query_lower LAST_RESULTS = base_pool if base_pool is not None else MODEL_SEARCH_INDEX return [r[2] for r in results[:limit]] def search_model_across_leaderboards(model_query): """Search for a model across all leaderboards and return aggregated results.""" if not model_query or not HF_DATASET_CACHE: return {}, [] # Use fast fuzzy search for suggestions matches = get_model_suggestions_fast(model_query, limit=20) # Get detailed results only for matched models results = {} for leaderboard_name, parsed_items in HF_DATASET_CACHE.items(): for item in parsed_items: model_id = item.get("model", "") if model_id in matches: if model_id not in results: results[model_id] = {} results[model_id][leaderboard_name] = { "developer": item.get("developer"), "params": item.get("params"), "architecture": item.get("architecture"), "precision": item.get("precision"), "results": item.get("results", {}), } return results, matches def get_all_model_names(): """Get all unique model names across all leaderboards.""" if not HF_DATASET_CACHE: return [] models = set() for parsed_items in HF_DATASET_CACHE.values(): for item in parsed_items: models.add(item.get("model", "")) return sorted(models)