|
|
|
|
|
from __future__ import annotations
|
|
|
from typing import Any, Dict, List, Optional
|
|
|
import os, json, requests
|
|
|
from tenacity import retry, stop_after_attempt, wait_exponential
|
|
|
|
|
|
try:
|
|
|
from gradio_client import Client as GradioClient
|
|
|
except Exception:
|
|
|
GradioClient = None
|
|
|
|
|
|
class BaseRemoteClient:
|
|
|
def __init__(self, base_url: str, use_gradio: bool = True, hf_token: Optional[str] = None, timeout: int = 180):
|
|
|
self.base_url = base_url.rstrip("/")
|
|
|
self.use_gradio = use_gradio and GradioClient is not None
|
|
|
self.hf_token = hf_token or os.getenv("HF_TOKEN")
|
|
|
self.timeout = timeout
|
|
|
self._client = None
|
|
|
if self.use_gradio:
|
|
|
headers = {"Authorization": f"Bearer {self.hf_token}"} if self.hf_token else None
|
|
|
self._client = GradioClient(self.base_url, hf_token=self.hf_token, headers=headers)
|
|
|
|
|
|
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=8))
|
|
|
def _post_json(self, route: str, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
url = f"{self.base_url}{route}"
|
|
|
headers = {"Authorization": f"Bearer {self.hf_token}"} if self.hf_token else {}
|
|
|
r = requests.post(url, json=payload, headers=headers, timeout=self.timeout)
|
|
|
r.raise_for_status()
|
|
|
return r.json()
|
|
|
|
|
|
class InstructClient(BaseRemoteClient):
|
|
|
def generate(self, prompt: str, system: Optional[str] = None, **kwargs) -> str:
|
|
|
if self.use_gradio and self._client:
|
|
|
out = self._client.predict(prompt, api_name="/predict")
|
|
|
return str(out)
|
|
|
data = {"prompt": prompt, "system": system, **kwargs}
|
|
|
res = self._post_json("/generate", data)
|
|
|
return res.get("text", "")
|
|
|
|
|
|
class VisionClient(BaseRemoteClient):
|
|
|
def describe(self, image_paths: List[str], context: Optional[Dict[str, Any]] = None, **kwargs) -> List[str]:
|
|
|
if self.use_gradio and self._client:
|
|
|
out = self._client.predict(image_paths, json.dumps(context or {}), api_name="/predict")
|
|
|
if isinstance(out, str):
|
|
|
try:
|
|
|
return json.loads(out)
|
|
|
except Exception:
|
|
|
return [out]
|
|
|
return list(out)
|
|
|
data = {"images": image_paths, "context": context or {}, **kwargs}
|
|
|
res = self._post_json("/describe", data)
|
|
|
return res.get("descriptions", [])
|
|
|
|
|
|
class ToolsClient(BaseRemoteClient):
|
|
|
def chat(self, messages: List[Dict[str, str]], tools: Optional[List[Dict[str, Any]]] = None, **kwargs) -> Dict[str, Any]:
|
|
|
if self.use_gradio and self._client:
|
|
|
out = self._client.predict(json.dumps(messages), json.dumps(tools or []), api_name="/predict")
|
|
|
if isinstance(out, str):
|
|
|
try:
|
|
|
return json.loads(out)
|
|
|
except Exception:
|
|
|
return {"text": out}
|
|
|
return out
|
|
|
data = {"messages": messages, "tools": tools or [], **kwargs}
|
|
|
return self._post_json("/chat", data)
|
|
|
|
|
|
class ASRClient(BaseRemoteClient):
|
|
|
def transcribe(self, audio_path: str, **kwargs) -> Dict[str, Any]:
|
|
|
if self.use_gradio and self._client:
|
|
|
out = self._client.predict(audio_path, api_name="/predict")
|
|
|
if isinstance(out, str):
|
|
|
return {"text": out}
|
|
|
return out
|
|
|
files = {"file": open(audio_path, "rb")}
|
|
|
headers = {"Authorization": f"Bearer {self.hf_token}"} if self.hf_token else {}
|
|
|
r = requests.post(f"{self.base_url}/transcribe", files=files, data=kwargs, headers=headers, timeout=self.timeout)
|
|
|
r.raise_for_status()
|
|
|
return r.json()
|
|
|
|