import base64 import html import mimetypes import os from pathlib import Path from typing import Any, Dict, List import gradio as gr from openai import OpenAI DEFAULT_MODEL = os.getenv("DEFAULT_MODEL", "ERNIE-4.5-VL-28B-A3B-Thinking") BASE_URL = os.getenv("BASE_URL","") api_key = os.getenv("ERNIE_API_KEY","") CUSTOM_CSS = """ body { background: radial-gradient(circle at top, #fdfbff 0%, #e7ecf7 45%, #dfe6f5 100%); font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Arial, sans-serif; color: #0f172a; } .gradio-container { max-width: 1200px !important; margin: 0 auto; } #ernie-hero { padding: 12px 0 4px; } #ernie-hero h1 { font-size: 1.85rem; margin-bottom: 0; font-weight: 500; } #model-link { margin-top: 6px; font-size: 0.95rem; } #model-link a { color: #4c1d95; text-decoration: none; font-weight: 500; } #model-link a:hover { text-decoration: underline; } #examples-panel { margin-top: 20px; padding: 18px 22px; border-radius: 18px; border: 1px solid rgba(15, 23, 42, 0.12); background: rgba(255, 255, 255, 0.92); box-shadow: 0 15px 35px rgba(15, 23, 42, 0.08); gap: 18px; } #examples-panel h4 { margin: 0 0 8px; font-size: 1.1rem; font-weight: 500; } #examples-panel p { margin: 0; color: rgba(15, 23, 42, 0.7); font-size: 0.95rem; } #examples-grid table { width: 100%; } #examples-grid table tbody { display: grid; grid-template-columns: repeat(auto-fit, minmax(220px, 1fr)); gap: 12px; } #examples-grid table tr { display: block; background: #f7f9ff; border-radius: 14px; border: 1px solid rgba(15, 23, 42, 0.08); padding: 14px; box-shadow: 0 10px 28px rgba(15, 23, 42, 0.08); } #examples-grid table td { display: block; padding: 0; } #chat-wrapper { margin-top: 32px; border-radius: 24px; padding: 18px; background: rgba(255, 255, 255, 0.95); border: 1px solid rgba(15, 23, 42, 0.1); box-shadow: 0 25px 60px rgba(15, 23, 42, 0.12); } .ernie-section { border-radius: 18px; margin-bottom: 14px; padding: 16px 18px; border: 1px solid rgba(15, 23, 42, 0.1); background: rgba(255, 255, 255, 0.95); box-shadow: 0 10px 24px rgba(15, 23, 42, 0.08); } .ernie-section-header { font-size: 0.85rem; text-transform: uppercase; letter-spacing: 0.08em; font-weight: 600; color: rgba(15, 23, 42, 0.65); display: flex; align-items: center; gap: 6px; } .ernie-section-body { margin-top: 10px; font-size: 1rem; color: rgba(15, 23, 42, 0.92); white-space: pre-wrap; line-height: 1.55; } .ernie-thinking { border-color: rgba(79, 70, 229, 0.35); background: rgba(129, 140, 248, 0.08); } .ernie-answer { border-color: rgba(16, 185, 129, 0.35); background: rgba(110, 231, 183, 0.08); } @media (prefers-color-scheme: dark) { body { background: radial-gradient(circle at top, #1f264b 0%, #0f172a 45%, #040713 100%); color: #ecf2ff; } #model-link a { color: #a5b4fc; } #examples-panel { border: 1px solid rgba(255, 255, 255, 0.05); background: rgba(8, 13, 30, 0.85); box-shadow: 0 15px 45px rgba(3, 7, 18, 0.55); } #examples-panel p { color: rgba(236, 242, 255, 0.75); } #examples-grid table tr { background: rgba(15, 23, 42, 0.7); border: 1px solid rgba(255, 255, 255, 0.04); box-shadow: 0 10px 30px rgba(4, 6, 15, 0.45); } #chat-wrapper { background: rgba(2, 6, 23, 0.78); border: 1px solid rgba(99, 102, 241, 0.25); box-shadow: 0 25px 70px rgba(2, 6, 23, 0.7); } .ernie-section { border: 1px solid rgba(255, 255, 255, 0.08); background: rgba(15, 23, 42, 0.85); box-shadow: 0 10px 30px rgba(2, 6, 23, 0.55); } .ernie-section-header { color: rgba(236, 242, 255, 0.75); } .ernie-section-body { color: rgba(248, 250, 255, 0.95); } .ernie-answer { border-color: rgba(45, 212, 191, 0.45); background: rgba(8, 47, 56, 0.65); } .ernie-thinking { border-color: rgba(165, 180, 252, 0.4); background: rgba(30, 27, 75, 0.65); } } """ _client = OpenAI( base_url=BASE_URL, api_key=api_key, ) def _data_url(path: str) -> str: mime, _ = mimetypes.guess_type(path) mime = mime or "application/octet-stream" data = base64.b64encode(Path(path).read_bytes()).decode("utf-8") return f"data:{mime};base64,{data}" def _media_content(path: str) -> Dict[str, Any]: """支持图片和视频""" mime, _ = mimetypes.guess_type(path) if mime and mime.startswith("video"): # 视频格式 return {"type": "video_url", "video_url": {"url": _data_url(path)}} else: # 图片格式(默认) return {"type": "image_url", "image_url": {"url": _data_url(path)}} def _text_content(text: str) -> Dict[str, Any]: return {"type": "text", "text": text} def _message(role: str, content: Any) -> Dict[str, Any]: return {"role": role, "content": content} def _format_sections(thinking: str, answer: str | None = None) -> str: """Render Thinking/Answer blocks with HTML so the chatbot can style them.""" def _build_block(kind: str, label: str, text: str, icon: str) -> str: text = (text or "").strip() if not text: return "" escaped = html.escape(text) return ( f'
' f'
{icon} {label}
' f'
{escaped}
' "
" ) sections = [ _build_block("thinking", "Thinking", thinking, "🧠"), _build_block("answer", "Answer", answer, "✨") if answer is not None else "", ] rendered = "".join(section for section in sections if section) return rendered def _build_user_message(message: Dict[str, Any]) -> Dict[str, Any]: files = message.get("files") or [] text = (message.get("text") or "").strip() content: List[Dict[str, Any]] = [_media_content(p) for p in files] if text: content.append(_text_content(text)) return _message("user", content) def _convert_history(history: List[Dict[str, Any]]) -> List[Dict[str, Any]]: msgs: List[Dict[str, Any]] = [] user_content: List[Dict[str, Any]] = [] for turn in history or []: role, content = turn.get("role"), turn.get("content") if role == "user": if isinstance(content, str): user_content.append(_text_content(content)) elif isinstance(content, tuple): user_content.extend(_media_content(path) for path in content if path) elif role == "assistant": if "Answer:\n" in content: # 分割并仅保留Answer部分 answer_only = content.split("Answer:\n", 1)[1].strip() else: # 兼容没有Thinking的情况 answer_only = content.strip() if user_content: msgs.append(_message("user", user_content.copy())) user_content.clear() msgs.append(_message("assistant", [{"type": "text", "text": answer_only}])) return msgs def stream_response(message: Dict[str, Any], history: List[Dict[str, Any]], model_name: str = DEFAULT_MODEL): messages = _convert_history(history) messages.append(_build_user_message(message)) try: stream = _client.chat.completions.create( model="default", messages=messages, stream=True ) thinking_parts: List[str] = [] answer_parts: List[str] = [] answer_started = False for chunk in stream: delta = chunk.choices[0].delta if getattr(delta, "reasoning_content", None): thinking_parts.append(delta.reasoning_content) if getattr(delta, "content", None): answer_started = True answer_parts.append(delta.content) thinking_text = "".join(thinking_parts) answer_text = "".join(answer_parts) if answer_parts else None if answer_started: rendered = _format_sections(thinking_text, answer_text) else: rendered = _format_sections(thinking_text) if rendered: yield rendered if not answer_started and thinking_parts: # 流结束但模型未返回Answer时,至少保证Thinking被展示完全 rendered = _format_sections("".join(thinking_parts)) if rendered: yield rendered except Exception as e: yield f"Failed to get response: {e}" def run_example(message: Dict[str, Any], history: List[Dict[str, Any]] | None = None): """ 用于 Examples 点击时直接走大模型。 - 输入还是 ChatInterface 那种 message dict:{"text": ..., "files": [...]} - history 是 Chatbot 当前的消息列表(type="messages") - 输出改成 Chatbot 需要的消息列表:[{role, content}, ...] """ history = history or [] # 直接复用你现有的流式函数,只是把它返回的 HTML 包一层 messages for rendered in stream_response(message, history): # 这里只简单把 user 文本展示出来;图片就当“上下文里有了”,不专门渲染 user_text = (message.get("text") or "").strip() or "[Example]" display_history = history + [ {"role": "user", "content": user_text}, {"role": "assistant", "content": rendered}, ] # 关键:对 Chatbot 来说,返回值要是「完整的消息列表」 yield display_history def build_demo() -> gr.Blocks: theme = gr.themes.Soft(primary_hue="violet", secondary_hue="cyan", neutral_hue="slate") with gr.Blocks( title="ERNIE-4.5-VL-28B-A3B-Thinking", theme=theme, css=CUSTOM_CSS, ) as demo: with gr.Column(elem_id="ernie-hero"): gr.Markdown( """

Chat with ERNIE-4.5-VL-28B-A3B-Thinking

""", elem_id="hero-text", ) gr.Markdown( """ """ ) textbox = gr.MultimodalTextbox( show_label=False, placeholder="Enter text, or upload one or more images...", file_types=["image","video"], file_count="multiple" ) chatbot = gr.Chatbot( type="messages", allow_tags=["think"], height=560, render_markdown=True, show_copy_button=True, ) examples = [ { "text": "这道题怎么解", "files": ["examples/case1.png"] }, { "text": "How many real people are actually in the picture?", "files": ["examples/case2.png"] }, ] with gr.Column(elem_id="examples-panel"): gr.Examples( examples=examples, inputs=textbox, label=None, examples_per_page=4, elem_id="examples-grid", fn=run_example, # 点击示例时,直接走大模型 outputs=chatbot, run_on_click=True, ) with gr.Column(elem_id="chat-wrapper"): chat_interface = gr.ChatInterface( fn=stream_response, type="messages", multimodal=True, chatbot=chatbot, textbox=textbox, ) return demo.queue(default_concurrency_limit=8) if __name__ == "__main__": build_demo().launch()