# models/tts_router.py from __future__ import annotations import os, re, uuid, wave, glob, subprocess, sys, shutil from shutil import which from typing import Optional RUNTIME_AUDIO_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "runtime", "audio")) os.makedirs(RUNTIME_AUDIO_DIR, exist_ok=True) def _have(cmd: str) -> bool: return which(cmd) is not None def _is_valid_wav(path: str, min_duration_s: float = 0.25) -> bool: try: with wave.open(path, "rb") as w: frames = w.getnframes() rate = w.getframerate() dur = (frames / float(rate)) if rate else 0.0 return frames > 0 and rate > 0 and dur >= min_duration_s except Exception: return False def ensure_runtime_audio_dir() -> str: os.makedirs(RUNTIME_AUDIO_DIR, exist_ok=True) return RUNTIME_AUDIO_DIR def cleanup_old_audio(keep_latest: Optional[str] = None): for f in glob.glob(os.path.join(RUNTIME_AUDIO_DIR, "*")): if keep_latest and os.path.abspath(f) == os.path.abspath(keep_latest): continue if f.endswith((".wav", ".aiff")): try: os.remove(f) except Exception as e: print(f"[CLEANUP] Could not delete {f}: {e}") # --------- helpers: optional post-process (speed + trim) ---------- def _ffmpeg_speed_and_trim(src_wav: str, speed: float) -> Optional[str]: """Use ffmpeg to slightly speed up and trim leading/trailing silences.""" if not _have("ffmpeg"): return None # ffmpeg atempo supports 0.5–2.0; clamp just in case speed = max(0.5, min(2.0, float(speed or 1.0))) dst = os.path.join(RUNTIME_AUDIO_DIR, f"tts_{uuid.uuid4().hex}.wav") # light silence trim + tempo # (two-pass trim via areverse avoids tail silence) af = ( f"silenceremove=start_periods=1:start_silence=0.15:start_threshold=-45dB," f"areverse," f"silenceremove=start_periods=1:start_silence=0.15:start_threshold=-45dB," f"areverse," f"atempo={speed}" ) try: subprocess.run( ["ffmpeg", "-y", "-i", src_wav, "-af", af, dst], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) if _is_valid_wav(dst, 0.15): return dst except Exception as e: print("[TTS][ffmpeg] post-process failed:", e) return None # --- TTS engines --- def _tts_with_pyttsx3(text: str) -> Optional[str]: try: import pyttsx3 except Exception: return None out_dir = ensure_runtime_audio_dir() out_path = os.path.join(out_dir, f"tts_{uuid.uuid4().hex}.wav") try: eng = pyttsx3.init() # rate control try: rate_env = int(os.getenv("PYTTSX3_RATE", "0") or "0") if rate_env > 0: eng.setProperty("rate", rate_env) except Exception: pass # voice selection (optional) voice_name = os.getenv("ESPEAK_VOICE") or os.getenv("TTS_VOICE") if voice_name: for v in eng.getProperty("voices"): if voice_name.lower() in (v.name or "").lower(): eng.setProperty("voice", v.id) break eng.save_to_file(re.sub(r"[\x00-\x1F]+", " ", text).strip(), out_path) eng.runAndWait() return out_path if _is_valid_wav(out_path) else None except Exception as e: print("[TTS] pyttsx3 failed:", e) return None def _tts_with_piper(text: str) -> Optional[str]: model = os.getenv("PIPER_MODEL") piper_bin = os.getenv("PIPER_BIN", "piper") if not (model and os.path.exists(model)): return None if not (_have(piper_bin) or (os.path.isabs(piper_bin) and os.path.exists(piper_bin))): return None out_path = os.path.join(ensure_runtime_audio_dir(), f"tts_{uuid.uuid4().hex}.wav") safe_text = re.sub(r"[\x00-\x1F]+", " ", text).strip() # speed via native piper arg: smaller length_scale => faster speaking length_scale = os.getenv("PIPER_LENGTH_SCALE", "").strip() cmd = [piper_bin, "--model", model, "--output_file", out_path] if length_scale: cmd += ["--length_scale", length_scale] # ensure espeak-ng data is discoverable (HF Spaces often need this) espeak_data = os.getenv("ESPEAK_DATA_PATH") env = os.environ.copy() if espeak_data and os.path.isdir(espeak_data): env["ESPEAK_DATA_PATH"] = espeak_data try: p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=env) p.communicate(input=safe_text.encode("utf-8"), timeout=45) return out_path if (p.returncode == 0 and _is_valid_wav(out_path)) else None except Exception as e: print("[TTS] Piper error:", e) return None def _tts_with_say(text: str) -> Optional[str]: if sys.platform != "darwin" or not _have("say"): return None out_dir = ensure_runtime_audio_dir() aiff = os.path.join(out_dir, f"tts_{uuid.uuid4().hex}.aiff") wav = os.path.join(out_dir, f"tts_{uuid.uuid4().hex}.wav") voice = os.getenv("SAY_VOICE") safe_text = re.sub(r"[\x00-\x1F`<>]+", " ", text).strip() or "Hello." try: cmd = ["say", "-o", aiff] if voice: cmd += ["-v", voice] cmd.append(safe_text) subprocess.run(cmd, check=True) # convert to wav for consistent playback if _have("afconvert"): subprocess.run(["afconvert","-f","WAVE","-d","LEI16","-c","1","-s","1", aiff, wav], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) elif _have("ffmpeg"): subprocess.run(["ffmpeg","-y","-i", aiff, "-ar","22050","-ac","1", wav], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if _is_valid_wav(wav): try: os.remove(aiff) except: pass return wav return aiff if os.path.exists(aiff) else None except Exception as e: print("[TTS] say failed:", e) return None def tts_synthesize(text: str) -> Optional[str]: if not (text and text.strip()): return None ensure_runtime_audio_dir() # Prefer Piper on Linux/Spaces if available out = _tts_with_piper(text) if not out: out = _tts_with_pyttsx3(text) if not out: out = _tts_with_say(text) if not out: print("[TTS] all engines failed") return None # Optional: global speed-up & trim with ffmpeg try: speed = float(os.getenv("TTS_SPEED", "1.0")) except Exception: speed = 1.0 if abs(speed - 1.0) > 1e-3: sped = _ffmpeg_speed_and_trim(out, speed) if sped: # clean the original, keep sped-up one try: os.remove(out) except: pass out = sped # prune older files to keep the folder tidy cleanup_old_audio(keep_latest=out) return out