futurecafe-voice-core / app /gradio_app.py
Eyob-Sol's picture
Update app/gradio_app.py
bbb42d9 verified
raw
history blame
14.4 kB
# app/gradio_app.py
from __future__ import annotations
from typing import List, Dict, Any, Tuple
import os, time, shutil, uuid
import gradio as gr
try:
from models.asr_whisper import get_asr
except Exception:
get_asr = None
try:
from models.llm_chat import respond_chat as llm_respond_chat
except Exception:
llm_respond_chat = None
from models.tts_router import (
tts_synthesize,
ensure_runtime_audio_dir,
cleanup_old_audio,
AUDIO_DIR,
)
# =============================================================================
# Helpers (pure, modular)
# =============================================================================
def _safe_llm_reply(history: List[Dict[str, str]], user_text: str) -> str:
"""
Try local LLM. If it's missing or errors, log loudly and return a safe fallback.
"""
if llm_respond_chat is None:
print("[LLM] respond_chat not imported; using fallback.")
return "Hello! How can I assist you today? Would you like to place an order or inquire about the menu?"
try:
bot_text, _guard, _diag = llm_respond_chat(history or [], user_text, {})
if isinstance(bot_text, str) and bot_text.strip():
print("[LLM] returned:", bot_text[:120].replace("\n"," "))
return bot_text.strip()
else:
print("[LLM] empty/invalid response; using fallback.")
except Exception as e:
import traceback
print("[LLM] error -> fallback:", repr(e))
traceback.print_exc()
return "Hello! How can I assist you today? Would you like to place an order or inquire about the menu?"
def _persist_copy(src_path: str) -> str | None:
"""Copy mic recording into runtime/audio with a stable filename, returns the new path."""
if not (src_path and os.path.exists(src_path)):
return None
os.makedirs(AUDIO_DIR, exist_ok=True)
dst = os.path.join(AUDIO_DIR, f"user_{uuid.uuid4().hex}.wav")
shutil.copyfile(src_path, dst)
return dst
def _asr_transcribe(aud_path: str) -> str:
"""
Transcribe audio to text. If ASR is unavailable, return a safe message.
"""
if not aud_path:
return "(no audio)"
if get_asr is None:
return "(ASR unavailable)"
try:
asr = get_asr()
out = asr.transcribe(aud_path)
return (out.get("text") or "").strip() or "(no speech detected)"
except Exception as e:
print("[ASR] error:", e)
return "(transcription failed)"
def _tts_from_text(text: str) -> str | None:
"""
Synthesize assistant text to a WAV in runtime/audio.
Returns a file path or None.
"""
if not (text and text.strip()):
return None
path = tts_synthesize(text.strip())
if path and os.path.exists(path):
return path
# always attempt one more minimal fallback to avoid empty path
return tts_synthesize("How can I help with FutureCafe?")
def _append_chat(history: List[Dict[str, str]] | None,
role: str, content: str) -> List[Dict[str, str]]:
hist = list(history or [])
hist.append({"role": role, "content": content})
return hist
def _startup_clean_runtime_audio():
"""
On app start, clean previous session audio artifacts.
"""
audio_dir = ensure_runtime_audio_dir()
try:
for name in os.listdir(audio_dir):
p = os.path.join(audio_dir, name)
if os.path.isfile(p):
os.remove(p)
except Exception as e:
print("[RUNTIME] Cannot clean runtime/audio:", e)
# =============================================================================
# Voice handlers (modular)
# =============================================================================
def handle_voice_turn(
user_audio_path: str,
voice_history: List[Dict[str, str]] | None
) -> Tuple[List[Dict[str, str]], str | None, Dict[str, Any]]:
"""
Single voice turn:
1) Transcribe user audio
2) Ask LLM for a reply (text)
3) TTS the reply to a WAV
4) Append both transcript and assistant text to the voice chat history
Returns: (new_voice_history, assistant_audio_path, diag_json)
"""
t0 = time.time()
transcript = _asr_transcribe(user_audio_path)
hist1 = _append_chat(voice_history, "user", transcript)
bot_text = _safe_llm_reply(hist1, transcript)
hist2 = _append_chat(hist1, "assistant", bot_text)
tts_path = _tts_from_text(bot_text)
diag = {
"intent": None,
"slots": {},
"tool_selected": None,
"tool_result": {
"transcript": transcript,
"llm_response": bot_text
},
"latency_ms": int((time.time() - t0) * 1000),
}
return hist2, tts_path, diag
# =============================================================================
# Text handlers (modular)
# =============================================================================
def handle_text_turn(
user_text: str,
chat_history: List[Dict[str, str]] | None
) -> Tuple[List[Dict[str, str]], Dict[str, Any], str]:
"""
Single text turn:
1) Append user text
2) Ask LLM for a reply
3) Append assistant text
4) Prepare diagnostics
Returns: (new_chat_history, diag_json, clear_text_value)
"""
t0 = time.time()
user_text = (user_text or "").strip()
if not user_text:
return (chat_history or []), {"intent": None, "slots": {}, "tool_selected": None, "tool_result": None, "latency_ms": 0}, ""
hist1 = _append_chat(chat_history, "user", user_text)
bot_text = _safe_llm_reply(hist1, user_text)
hist2 = _append_chat(hist1, "assistant", bot_text)
diag = {
"intent": None,
"slots": {},
"tool_selected": None,
"tool_result": {"user": user_text, "llm_response": bot_text},
"latency_ms": int((time.time() - t0) * 1000),
}
return hist2, diag, ""
# =============================================================================
# Fixed UI (as requested) + wiring
# =============================================================================
def build_demo():
"""
Fixed UI layout:
LEFT (Voice Call):
- voice_in (mic recorder)
- assistant_audio (autoplay)
- voice_chat (transcript chat)
- call_diag (JSON)
RIGHT (SMS/Chat):
- chat_box
- text_in (enter to send)
- chat_diag (JSON)
"""
_startup_clean_runtime_audio()
with gr.Blocks(title="FutureCafe Call/SMS Agent (MVP)") as demo:
gr.Markdown("### ☎️ FutureCafe AI Agent (MVP)\n**Call (voice)** on the left · **SMS/Chat** on the right")
# States
voice_state = gr.State([]) # list of {"role","content"} for voice transcript chat
chat_state = gr.State([]) # list of {"role","content"} for SMS chat
with gr.Row():
# ---------------- LEFT: VOICE ----------------
with gr.Column(scale=1, min_width=430):
gr.Markdown("#### 📞 Voice Call")
voice_in = gr.Audio(
label="Press Record → Speak → Stop (auto-sends)",
sources=["microphone"],
type="filepath",
format="wav",
interactive=True,
editable=False,
waveform_options={"show_recording_waveform": True},
)
assistant_audio = gr.Audio(
label="Assistant Response (auto-play)",
autoplay=True,
type="filepath",
interactive=False
)
voice_chat = gr.Chatbot(value=[], type="messages", height=220, label="Voice Chat (transcripts)")
call_diag = gr.JSON(
value={"intent": None, "slots": {}, "tool_selected": None, "tool_result": None, "latency_ms": 0},
label="Voice Diagnostics"
)
# ---------------- RIGHT: SMS / CHAT ----------------
with gr.Column(scale=1, min_width=430):
gr.Markdown("#### 💬 SMS / Chat")
chat_box = gr.Chatbot(value=[], type="messages", height=360, label=None)
text_in = gr.Textbox(
placeholder="Type here… e.g., “Any vegan pizzas?”, “Book a table for 2 at 7.” (Enter to send)",
label=None, lines=1
)
chat_diag = gr.JSON(
value={"intent": None, "slots": {}, "tool_selected": None, "tool_result": None, "latency_ms": 0},
label="Chat Diagnostics"
)
# ---------- Handlers (thin wrappers that call modular functions) ----------
def _clear_recorder():
# Only clears the recorder input; leaves assistant audio + transcripts intact
return gr.update(value=None, interactive=True)
def on_voice_change(aud_path: str | None, voice_hist: list[dict] | None):
"""
- Copy mic recording into runtime/audio
- ASR -> transcript
- LLM -> bot text
- TTS -> assistant wav in runtime/audio
- Cleanup:
* delete the user clip immediately after ASR
* delete all older TTS, keep only latest one
- Append transcript pairs to voice chat state
Returns exactly: (voice_chat_messages, assistant_audio_path, diag_json)
"""
empty_diag = {
"intent": None,
"slots": {},
"tool_selected": None,
"tool_result": None,
"latency_ms": 0,
}
if not aud_path:
return (voice_hist or []), None, empty_diag
t0 = time.time()
# 1) Stabilize mic path into runtime/audio (ENV-driven via AUDIO_DIR)
stable_user = _persist_copy(aud_path)
# 2) Transcribe
transcript = "(transcription failed)"
try:
if get_asr is None:
transcript = "(ASR unavailable)"
else:
asr = get_asr()
asr_out = asr.transcribe(stable_user)
transcript = (asr_out.get("text") or "").strip() or "(no speech detected)"
except Exception as e:
print("[ASR] error:", e)
finally:
# Remove the user clip ASAP to keep the folder small
if stable_user and os.path.exists(stable_user):
try:
os.remove(stable_user)
except Exception as e:
print("[CLEANUP] Could not delete user clip:", e)
# 3) Get bot reply (LLM response) – env/model-path handled inside models/llm_chat
try:
from models.llm_chat import respond_chat_voice
except Exception:
from models.llm_chat import respond_chat as respond_chat_voice
try:
bot_text, new_policy, policy_diag = respond_chat_voice(voice_hist or [], transcript, {})
except Exception as e:
print("[LLM] voice fallback due to error:", e)
bot_text, new_policy, policy_diag = (
"Hello! How can I help with FutureCafe today?",
{},
{},
)
# 4) TTS the bot reply into runtime/audio (ENV-driven path via tts_router)
new_tts = tts_synthesize(bot_text) # path in VOICE_AUDIO_DIR
cleanup_old_audio(keep_latest=new_tts)
# 5) Append to voice chat state (text transcripts)
new_hist = (voice_hist or []) + [
{"role": "user", "content": transcript},
{"role": "assistant", "content": bot_text},
]
diag = {
"intent": policy_diag.get("policy") if isinstance(policy_diag, dict) else None,
"slots": {},
"tool_selected": None,
"tool_result": {
"transcript": transcript,
"llm_response": bot_text,
"policy": policy_diag,
},
"latency_ms": int((time.time() - t0) * 1000),
}
# Return EXACTLY the 3 outputs you wired: (voice_chat, assistant_audio, call_diag)
return new_hist, new_tts, diag
def on_text_send(txt: str, hist: List[Dict[str, str]]):
new_hist, diag, clear_text = handle_text_turn(txt, hist or [])
return new_hist, diag, clear_text
# ---------- Wiring ----------
# Voice lane: update (voice_chat, assistant_audio, call_diag), do NOT clear recorder to keep it stable for now
# Try to fire on explicit Stop; fall back to generic change if not supported
rec_event = getattr(voice_in, "stop_recording", None)
if callable(rec_event):
rec_event(
on_voice_change,
inputs=[voice_in, voice_state],
outputs=[voice_chat, assistant_audio, call_diag],
).then(
_clear_recorder, # runs AFTER outputs are set → autoplay isn’t interrupted
inputs=None,
outputs=[voice_in],
)
else:
voice_in.change(
on_voice_change,
inputs=[voice_in, voice_state],
outputs=[voice_chat, assistant_audio, call_diag],
api_name="/voice",
).then(
_clear_recorder,
inputs=None,
outputs=[voice_in],
)
# Keep voice_state in sync with what's shown in voice_chat (unchanged)
voice_chat.change(lambda x: x, inputs=[voice_chat], outputs=[voice_state])
# Text lane: Enter to send
text_in.submit(
on_text_send,
inputs=[text_in, chat_state],
outputs=[chat_box, chat_diag, text_in],
api_name="/text",
)
# Keep chat_state in sync with what's shown in chat_box
chat_box.change(lambda x: x, inputs=[chat_box], outputs=[chat_state])
return demo