|
|
|
|
|
from __future__ import annotations |
|
|
from typing import Any, Dict, List, Optional |
|
|
from pathlib import Path |
|
|
import os |
|
|
import yaml |
|
|
|
|
|
from remote_clients import InstructClient, VisionClient, ToolsClient, ASRClient |
|
|
import time |
|
|
|
|
|
def load_yaml(path: str) -> Dict[str, Any]: |
|
|
p = Path(path) |
|
|
if not p.exists(): |
|
|
return {} |
|
|
return yaml.safe_load(p.read_text(encoding="utf-8")) or {} |
|
|
|
|
|
class LLMRouter: |
|
|
def __init__(self, cfg: Dict[str, Any]): |
|
|
self.cfg = cfg |
|
|
self.rem = cfg.get("models", {}).get("routing", {}).get("use_remote_for", []) |
|
|
base_user = cfg.get("remote_spaces", {}).get("user", "veureu") |
|
|
eps = cfg.get("remote_spaces", {}).get("endpoints", {}) |
|
|
token_enabled = cfg.get("security", {}).get("use_hf_token", False) |
|
|
hf_token = os.getenv(cfg.get("security", {}).get("hf_token_env", "HF_TOKEN")) if token_enabled else None |
|
|
|
|
|
def mk_factory(endpoint_key: str, cls): |
|
|
info = eps.get(endpoint_key, {}) |
|
|
base_url = info.get("base_url") or f"https://{base_user}-{info.get('space')}.hf.space" |
|
|
use_gradio = (info.get("client", "gradio") == "gradio") |
|
|
timeout = int(cfg.get("remote_spaces", {}).get("http", {}).get("timeout_seconds", 180)) |
|
|
def _factory(): |
|
|
return cls(base_url=base_url, use_gradio=use_gradio, hf_token=hf_token, timeout=timeout) |
|
|
return _factory |
|
|
|
|
|
self.client_factories = { |
|
|
"salamandra-instruct": mk_factory("salamandra-instruct", InstructClient), |
|
|
"salamandra-vision": mk_factory("salamandra-vision", VisionClient), |
|
|
"salamandra-tools": mk_factory("salamandra-tools", ToolsClient), |
|
|
"whisper-catalan": mk_factory("whisper-catalan", ASRClient), |
|
|
} |
|
|
|
|
|
self.service_names = { |
|
|
"salamandra-instruct": "schat", |
|
|
"salamandra-vision": "svision", |
|
|
"salamandra-tools": "stools", |
|
|
"whisper-catalan": "asr", |
|
|
} |
|
|
|
|
|
def _log_connect(self, model_key: str, phase: str, elapsed: float | None = None): |
|
|
svc = self.service_names.get(model_key, model_key) |
|
|
if phase == "connect": |
|
|
print(f"[LLMRouter] Connecting to {svc} space...") |
|
|
elif phase == "done": |
|
|
print(f"[LLMRouter] Response from {svc} space received in {elapsed:.2f} s") |
|
|
|
|
|
|
|
|
def instruct(self, prompt: str, system: Optional[str] = None, model: str = "salamandra-instruct", **kwargs) -> str: |
|
|
if model in self.rem: |
|
|
self._log_connect(model, "connect") |
|
|
t0 = time.time() |
|
|
client = self.client_factories[model]() |
|
|
out = client.generate(prompt, system=system, **kwargs) |
|
|
self._log_connect(model, "done", time.time() - t0) |
|
|
return out |
|
|
raise RuntimeError(f"Modelo local no implementado para: {model}") |
|
|
|
|
|
|
|
|
def vision_describe(self, image_paths: List[str], context: Optional[Dict[str, Any]] = None, model: str = "salamandra-vision", **kwargs) -> List[str]: |
|
|
if model in self.rem: |
|
|
self._log_connect(model, "connect") |
|
|
t0 = time.time() |
|
|
client = self.client_factories[model]() |
|
|
out = client.describe(image_paths, context=context, **kwargs) |
|
|
self._log_connect(model, "done", time.time() - t0) |
|
|
return out |
|
|
raise RuntimeError(f"Modelo local no implementado para: {model}") |
|
|
|
|
|
|
|
|
def chat_with_tools(self, messages: List[Dict[str, str]], tools: Optional[List[Dict[str, Any]]] = None, model: str = "salamandra-tools", **kwargs) -> Dict[str, Any]: |
|
|
if model in self.rem: |
|
|
self._log_connect(model, "connect") |
|
|
t0 = time.time() |
|
|
client = self.client_factories[model]() |
|
|
out = client.chat(messages, tools=tools, **kwargs) |
|
|
self._log_connect(model, "done", time.time() - t0) |
|
|
return out |
|
|
raise RuntimeError(f"Modelo local no implementado para: {model}") |
|
|
|
|
|
|
|
|
def asr_transcribe(self, audio_path: str, model: str = "whisper-catalan", **kwargs) -> Dict[str, Any]: |
|
|
if model in self.rem: |
|
|
self._log_connect(model, "connect") |
|
|
t0 = time.time() |
|
|
client = self.client_factories[model]() |
|
|
out = client.transcribe(audio_path, **kwargs) |
|
|
self._log_connect(model, "done", time.time() - t0) |
|
|
return out |
|
|
raise RuntimeError(f"Modelo local no implementado para: {model}") |
|
|
|