File size: 4,579 Bytes
74bb5fe
 
2a42fac
74bb5fe
 
 
2a42fac
a15c284
90a6e2c
2a42fac
 
 
 
 
 
 
 
 
 
 
 
 
a15c284
 
 
51d45ce
90a6e2c
2a42fac
90a6e2c
 
ac1f51b
2a42fac
 
74bb5fe
2a42fac
 
74bb5fe
2a42fac
a15c284
2a42fac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74bb5fe
 
 
51d45ce
2a42fac
 
 
 
 
 
 
 
 
 
 
a15c284
2a42fac
 
74bb5fe
2a42fac
 
 
 
74bb5fe
2a42fac
 
 
 
 
 
 
 
 
 
 
 
 
 
74bb5fe
2a42fac
74bb5fe
 
2a42fac
74bb5fe
2a42fac
 
 
74bb5fe
2a42fac
74bb5fe
2a42fac
74bb5fe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# models/tts_router.py
from __future__ import annotations
import os, re, uuid, wave, glob, subprocess, sys
from shutil import which
from typing import Optional

RUNTIME_AUDIO_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "runtime", "audio"))
os.makedirs(RUNTIME_AUDIO_DIR, exist_ok=True)

def _have(cmd: str) -> bool:
    return which(cmd) is not None

def _is_valid_wav(path: str, min_duration_s: float = 0.25) -> bool:
    try:
        with wave.open(path, "rb") as w:
            frames = w.getnframes()
            rate = w.getframerate()
            dur = (frames / float(rate)) if rate else 0.0
            return frames > 0 and rate > 0 and dur >= min_duration_s
    except Exception:
        return False

def ensure_runtime_audio_dir() -> str:
    os.makedirs(RUNTIME_AUDIO_DIR, exist_ok=True)
    return RUNTIME_AUDIO_DIR

def cleanup_old_audio(keep_latest: Optional[str] = None):
    for f in glob.glob(os.path.join(RUNTIME_AUDIO_DIR, "*")):
        if keep_latest and os.path.abspath(f) == os.path.abspath(keep_latest):
            continue
        if f.endswith((".wav", ".aiff")):
            try: os.remove(f)
            except Exception as e: print(f"[CLEANUP] Could not delete {f}: {e}")

# --- TTS engines ---
def _tts_with_pyttsx3(text: str) -> Optional[str]:
    try:
        import pyttsx3
    except Exception:
        return None
    out_dir = ensure_runtime_audio_dir()
    out_path = os.path.join(out_dir, f"tts_{uuid.uuid4().hex}.wav")
    try:
        eng = pyttsx3.init()
        # Use eSpeak on Linux; allow SPACE env VOICE
        voice_name = os.getenv("ESPEAK_VOICE") or os.getenv("TTS_VOICE")
        if voice_name:
            for v in eng.getProperty("voices"):
                if voice_name.lower() in (v.name or "").lower():
                    eng.setProperty("voice", v.id)
                    break
        eng.save_to_file(re.sub(r"[\x00-\x1F]+", " ", text).strip(), out_path)
        eng.runAndWait()
        return out_path if _is_valid_wav(out_path) else None
    except Exception as e:
        print("[TTS] pyttsx3 failed:", e)
        return None

def _tts_with_piper(text: str) -> Optional[str]:
    model = os.getenv("PIPER_MODEL")
    piper_bin = os.getenv("PIPER_BIN", "piper")
    if not (model and os.path.exists(model)): return None
    if not (_have(piper_bin) or os.path.isabs(piper_bin) and os.path.exists(piper_bin)): return None
    out_path = os.path.join(ensure_runtime_audio_dir(), f"tts_{uuid.uuid4().hex}.wav")
    safe_text = re.sub(r"[\x00-\x1F]+", " ", text).strip()
    try:
        p = subprocess.Popen([piper_bin, "--model", model, "--output_file", out_path],
                             stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        p.communicate(input=safe_text.encode("utf-8"), timeout=45)
        return out_path if (p.returncode == 0 and _is_valid_wav(out_path)) else None
    except Exception as e:
        print("[TTS] Piper error:", e); return None

def _tts_with_say(text: str) -> Optional[str]:
    if sys.platform != "darwin" or not _have("say"): return None
    out_dir = ensure_runtime_audio_dir()
    aiff = os.path.join(out_dir, f"tts_{uuid.uuid4().hex}.aiff")
    wav  = os.path.join(out_dir, f"tts_{uuid.uuid4().hex}.wav")
    voice = os.getenv("SAY_VOICE")
    safe_text = re.sub(r"[\x00-\x1F`<>]+", " ", text).strip() or "Hello."
    try:
        cmd = ["say", "-o", aiff]; 
        if voice: cmd += ["-v", voice]
        cmd.append(safe_text)
        subprocess.run(cmd, check=True)
        if _have("afconvert"):
            subprocess.run(["afconvert","-f","WAVE","-d","LEI16","-c","1","-s","1", aiff, wav], check=True)
        elif _have("ffmpeg"):
            subprocess.run(["ffmpeg","-y","-i", aiff, "-ar","22050","-ac","1", wav],
                           check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        if _is_valid_wav(wav):
            try: os.remove(aiff)
            except: pass
            return wav
        return aiff if os.path.exists(aiff) else None
    except Exception as e:
        print("[TTS] say failed:", e); return None

def tts_synthesize(text: str) -> Optional[str]:
    if not (text and text.strip()): return None
    ensure_runtime_audio_dir()
    # On Spaces/Linux prefer pyttsx3
    out = _tts_with_pyttsx3(text)
    if out: cleanup_old_audio(keep_latest=out); return out
    out = _tts_with_piper(text)
    if out: cleanup_old_audio(keep_latest=out); return out
    out = _tts_with_say(text)
    if out: cleanup_old_audio(keep_latest=out); return out
    return None