Spaces:
Sleeping
Sleeping
| # models/tts_router.py | |
| from __future__ import annotations | |
| import os, re, uuid, wave, glob, subprocess, sys, shutil | |
| from shutil import which | |
| from typing import Optional | |
| RUNTIME_AUDIO_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "runtime", "audio")) | |
| os.makedirs(RUNTIME_AUDIO_DIR, exist_ok=True) | |
| def _have(cmd: str) -> bool: | |
| return which(cmd) is not None | |
| def _is_valid_wav(path: str, min_duration_s: float = 0.25) -> bool: | |
| try: | |
| with wave.open(path, "rb") as w: | |
| frames = w.getnframes() | |
| rate = w.getframerate() | |
| dur = (frames / float(rate)) if rate else 0.0 | |
| return frames > 0 and rate > 0 and dur >= min_duration_s | |
| except Exception: | |
| return False | |
| def ensure_runtime_audio_dir() -> str: | |
| os.makedirs(RUNTIME_AUDIO_DIR, exist_ok=True) | |
| return RUNTIME_AUDIO_DIR | |
| def cleanup_old_audio(keep_latest: Optional[str] = None): | |
| for f in glob.glob(os.path.join(RUNTIME_AUDIO_DIR, "*")): | |
| if keep_latest and os.path.abspath(f) == os.path.abspath(keep_latest): | |
| continue | |
| if f.endswith((".wav", ".aiff")): | |
| try: os.remove(f) | |
| except Exception as e: print(f"[CLEANUP] Could not delete {f}: {e}") | |
| # --------- helpers: optional post-process (speed + trim) ---------- | |
| def _ffmpeg_speed_and_trim(src_wav: str, speed: float) -> Optional[str]: | |
| """Use ffmpeg to slightly speed up and trim leading/trailing silences.""" | |
| if not _have("ffmpeg"): | |
| return None | |
| # ffmpeg atempo supports 0.5–2.0; clamp just in case | |
| speed = max(0.5, min(2.0, float(speed or 1.0))) | |
| dst = os.path.join(RUNTIME_AUDIO_DIR, f"tts_{uuid.uuid4().hex}.wav") | |
| # light silence trim + tempo | |
| # (two-pass trim via areverse avoids tail silence) | |
| af = ( | |
| f"silenceremove=start_periods=1:start_silence=0.15:start_threshold=-45dB," | |
| f"areverse," | |
| f"silenceremove=start_periods=1:start_silence=0.15:start_threshold=-45dB," | |
| f"areverse," | |
| f"atempo={speed}" | |
| ) | |
| try: | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", src_wav, "-af", af, dst], | |
| check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL | |
| ) | |
| if _is_valid_wav(dst, 0.15): | |
| return dst | |
| except Exception as e: | |
| print("[TTS][ffmpeg] post-process failed:", e) | |
| return None | |
| # --- TTS engines --- | |
| def _tts_with_pyttsx3(text: str) -> Optional[str]: | |
| try: | |
| import pyttsx3 | |
| except Exception: | |
| return None | |
| out_dir = ensure_runtime_audio_dir() | |
| out_path = os.path.join(out_dir, f"tts_{uuid.uuid4().hex}.wav") | |
| try: | |
| eng = pyttsx3.init() | |
| # rate control | |
| try: | |
| rate_env = int(os.getenv("PYTTSX3_RATE", "0") or "0") | |
| if rate_env > 0: | |
| eng.setProperty("rate", rate_env) | |
| except Exception: | |
| pass | |
| # voice selection (optional) | |
| voice_name = os.getenv("ESPEAK_VOICE") or os.getenv("TTS_VOICE") | |
| if voice_name: | |
| for v in eng.getProperty("voices"): | |
| if voice_name.lower() in (v.name or "").lower(): | |
| eng.setProperty("voice", v.id) | |
| break | |
| eng.save_to_file(re.sub(r"[\x00-\x1F]+", " ", text).strip(), out_path) | |
| eng.runAndWait() | |
| return out_path if _is_valid_wav(out_path) else None | |
| except Exception as e: | |
| print("[TTS] pyttsx3 failed:", e) | |
| return None | |
| def _tts_with_piper(text: str) -> Optional[str]: | |
| model = os.getenv("PIPER_MODEL") | |
| piper_bin = os.getenv("PIPER_BIN", "piper") | |
| if not (model and os.path.exists(model)): | |
| return None | |
| if not (_have(piper_bin) or (os.path.isabs(piper_bin) and os.path.exists(piper_bin))): | |
| return None | |
| out_path = os.path.join(ensure_runtime_audio_dir(), f"tts_{uuid.uuid4().hex}.wav") | |
| safe_text = re.sub(r"[\x00-\x1F]+", " ", text).strip() | |
| # speed via native piper arg: smaller length_scale => faster speaking | |
| length_scale = os.getenv("PIPER_LENGTH_SCALE", "").strip() | |
| cmd = [piper_bin, "--model", model, "--output_file", out_path] | |
| if length_scale: | |
| cmd += ["--length_scale", length_scale] | |
| # ensure espeak-ng data is discoverable (HF Spaces often need this) | |
| espeak_data = os.getenv("ESPEAK_DATA_PATH") | |
| env = os.environ.copy() | |
| if espeak_data and os.path.isdir(espeak_data): | |
| env["ESPEAK_DATA_PATH"] = espeak_data | |
| try: | |
| p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, env=env) | |
| p.communicate(input=safe_text.encode("utf-8"), timeout=45) | |
| return out_path if (p.returncode == 0 and _is_valid_wav(out_path)) else None | |
| except Exception as e: | |
| print("[TTS] Piper error:", e) | |
| return None | |
| def _tts_with_say(text: str) -> Optional[str]: | |
| if sys.platform != "darwin" or not _have("say"): | |
| return None | |
| out_dir = ensure_runtime_audio_dir() | |
| aiff = os.path.join(out_dir, f"tts_{uuid.uuid4().hex}.aiff") | |
| wav = os.path.join(out_dir, f"tts_{uuid.uuid4().hex}.wav") | |
| voice = os.getenv("SAY_VOICE") | |
| safe_text = re.sub(r"[\x00-\x1F`<>]+", " ", text).strip() or "Hello." | |
| try: | |
| cmd = ["say", "-o", aiff] | |
| if voice: cmd += ["-v", voice] | |
| cmd.append(safe_text) | |
| subprocess.run(cmd, check=True) | |
| # convert to wav for consistent playback | |
| if _have("afconvert"): | |
| subprocess.run(["afconvert","-f","WAVE","-d","LEI16","-c","1","-s","1", aiff, wav], check=True, | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| elif _have("ffmpeg"): | |
| subprocess.run(["ffmpeg","-y","-i", aiff, "-ar","22050","-ac","1", wav], check=True, | |
| stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| if _is_valid_wav(wav): | |
| try: os.remove(aiff) | |
| except: pass | |
| return wav | |
| return aiff if os.path.exists(aiff) else None | |
| except Exception as e: | |
| print("[TTS] say failed:", e) | |
| return None | |
| def tts_synthesize(text: str) -> Optional[str]: | |
| if not (text and text.strip()): | |
| return None | |
| ensure_runtime_audio_dir() | |
| # Prefer Piper on Linux/Spaces if available | |
| out = _tts_with_piper(text) | |
| if not out: | |
| out = _tts_with_pyttsx3(text) | |
| if not out: | |
| out = _tts_with_say(text) | |
| if not out: | |
| print("[TTS] all engines failed") | |
| return None | |
| # Optional: global speed-up & trim with ffmpeg | |
| try: | |
| speed = float(os.getenv("TTS_SPEED", "1.0")) | |
| except Exception: | |
| speed = 1.0 | |
| if abs(speed - 1.0) > 1e-3: | |
| sped = _ffmpeg_speed_and_trim(out, speed) | |
| if sped: | |
| # clean the original, keep sped-up one | |
| try: os.remove(out) | |
| except: pass | |
| out = sped | |
| # prune older files to keep the folder tidy | |
| cleanup_old_audio(keep_latest=out) | |
| return out |