OmniVoice / server.py
pravinuxd's picture
add server.py for self-bootstrap
7684f3c verified
"""
OmniVoice FastAPI server for Linux + NVIDIA (e.g. RunPod).
Exposes the full OmniVoice surface (voice clone, voice design, auto voice) + all
generation parameters from `omnivoice.OmniVoiceGenerationConfig`.
Endpoints
---------
- GET /health Liveness/readiness + startup error.
- GET /v1/models List served model.
- GET /v1/languages All language display names supported by the model.
- GET /v1/voice-design/attributes Attribute groups for the Voice Design composer.
- POST /v1/audio/speech Unified TTS endpoint (multipart):
text=<str> (required)
mode=clone|design|auto (default: clone if ref_audio else design if instruct else auto)
ref_audio=<file> (clone)
ref_text=<str> (clone, optional — Whisper auto-transcribes if omitted)
instruct=<str> (design or clone overlay)
language=<str> (display name, e.g. "English", "Hindi"; "Auto" for auto-detect)
speed=<float> (default 1.0)
duration=<float> (seconds; if set overrides speed)
num_step=<int> (default 32)
guidance_scale=<float> (default 2.0)
denoise=<bool> (default true)
preprocess_prompt=<bool> (default true)
postprocess_output=<bool> (default true)
- POST /v1/audio/speech/multi Multi-character story generation (JSON body).
Splits text on [Sn]…[/Sn] markers, runs OmniVoice once per chunk with
each character's mode/instruct/ref_audio (clone or design), and stitches
the resulting WAVs (with a configurable inter-segment silence) into a
single WAV response. See `MultiSpeechRequest` schema below.
- POST /v1/audio/speech/clone Backward-compat shim (forwards to /v1/audio/speech with mode=clone).
Returns: 200 audio/wav (16-bit PCM mono @ 24 kHz).
"""
import asyncio
import base64
import binascii
import io
import logging
import os
import platform
import re
import tempfile
import wave
from typing import Any
import numpy as np
import torch
from fastapi import FastAPI, File, Form, HTTPException, UploadFile
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel, Field
from omnivoice import OmniVoice, OmniVoiceGenerationConfig
from omnivoice.utils.lang_map import LANG_NAMES, lang_display_name
logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s")
MODEL_ID = os.getenv("OMNIVOICE_MODEL_ID", "pravinuxd/OmniVoice")
SAMPLE_RATE = 24000
def _resolve_device_and_dtype() -> tuple[str, torch.dtype]:
if torch.cuda.is_available():
return "cuda:0", torch.float16
return "cpu", torch.float32
DEVICE, DTYPE = _resolve_device_and_dtype()
def _enforce_cuda_if_runpod() -> None:
on_runpod = bool(os.getenv("RUNPOD_POD_ID"))
require = os.getenv("OMNIVOICE_REQUIRE_CUDA", "").strip().lower()
if require in ("1", "true", "yes"):
must_cuda = True
elif require in ("0", "false", "no"):
must_cuda = False
else:
must_cuda = on_runpod
if must_cuda and not torch.cuda.is_available():
raise RuntimeError(
"CUDA is not available. This image is built for NVIDIA GPUs on Linux (RunPod). "
"Attach a GPU to the pod or set OMNIVOICE_REQUIRE_CUDA=0 only for debugging."
)
_enforce_cuda_if_runpod()
if torch.cuda.is_available():
logging.info(
"Using GPU: %s | CUDA %s",
torch.cuda.get_device_name(0),
torch.version.cuda,
)
else:
logging.warning("Running on CPU — not recommended for production OmniVoice inference.")
logging.info(
"Platform: %s | MODEL_ID=%s | device=%s | dtype=%s",
platform.system(),
MODEL_ID,
DEVICE,
DTYPE,
)
# Pre-compute language list (sorted display names) once.
_LANGUAGES_SORTED = sorted({lang_display_name(n) for n in LANG_NAMES})
# Voice Design attributes (mirrors omnivoice.cli.demo categories so the UI stays in sync).
_VOICE_DESIGN_ATTRIBUTES = {
"gender": {
"label": "Gender",
"info": "Speaker gender.",
"options": [
{"value": "male", "label": "Male / 男"},
{"value": "female", "label": "Female / 女"},
],
},
"age": {
"label": "Age",
"info": "Approximate speaker age.",
"options": [
{"value": "child", "label": "Child / 儿童"},
{"value": "teenager", "label": "Teenager / 少年"},
{"value": "young adult", "label": "Young Adult / 青年"},
{"value": "middle-aged", "label": "Middle-aged / 中年"},
{"value": "elderly", "label": "Elderly / 老年"},
],
},
"pitch": {
"label": "Pitch",
"info": "Voice pitch register.",
"options": [
{"value": "very low pitch", "label": "Very Low / 极低音调"},
{"value": "low pitch", "label": "Low / 低音调"},
{"value": "moderate pitch", "label": "Moderate / 中音调"},
{"value": "high pitch", "label": "High / 高音调"},
{"value": "very high pitch", "label": "Very High / 极高音调"},
],
},
"style": {
"label": "Style",
"info": "Speaking style.",
"options": [
{"value": "whisper", "label": "Whisper / 耳语"},
],
},
"english_accent": {
"label": "English Accent",
"info": "Only effective when generating English speech.",
"options": [
{"value": "american accent", "label": "American"},
{"value": "australian accent", "label": "Australian"},
{"value": "british accent", "label": "British"},
{"value": "canadian accent", "label": "Canadian"},
{"value": "chinese accent", "label": "Chinese"},
{"value": "indian accent", "label": "Indian"},
{"value": "japanese accent", "label": "Japanese"},
{"value": "korean accent", "label": "Korean"},
{"value": "portuguese accent", "label": "Portuguese"},
{"value": "russian accent", "label": "Russian"},
],
},
"chinese_dialect": {
"label": "Chinese Dialect",
"info": "Only effective when generating Chinese speech.",
"options": [
{"value": "河南话", "label": "Henan / 河南话"},
{"value": "陕西话", "label": "Shaanxi / 陕西话"},
{"value": "四川话", "label": "Sichuan / 四川话"},
{"value": "贵州话", "label": "Guizhou / 贵州话"},
{"value": "云南话", "label": "Yunnan / 云南话"},
{"value": "桂林话", "label": "Guilin / 桂林话"},
{"value": "济南话", "label": "Jinan / 济南话"},
{"value": "石家庄话", "label": "Shijiazhuang / 石家庄话"},
{"value": "甘肃话", "label": "Gansu / 甘肃话"},
{"value": "宁夏话", "label": "Ningxia / 宁夏话"},
{"value": "青岛话", "label": "Qingdao / 青岛话"},
{"value": "东北话", "label": "Northeast / 东北话"},
],
},
}
app = FastAPI(title="OmniVoice Pod API", version="2")
model: OmniVoice | None = None
startup_error: str | None = None
# Dubbing Studio Lite — sibling module that registers /v1/dub/* routes.
# Imported here so the GPU_LOCK lives in a single shared instance and the
# OmniVoice model handle can be shared in-process (no HTTP round-trip per
# segment).
try:
from . import dub as _dub # type: ignore[import-not-found]
except ImportError:
import dub as _dub # type: ignore[no-redef]
app.include_router(_dub.router)
def _load_model() -> OmniVoice:
return OmniVoice.from_pretrained(
MODEL_ID,
device_map=DEVICE,
dtype=DTYPE,
load_asr=True,
)
@app.on_event("startup")
def startup_event() -> None:
global model, startup_error
try:
model = _load_model()
startup_error = None
# Hand the live model + generation-config class to the dub module so
# `/v1/dub/jobs` can synthesize segments without going over HTTP.
_dub.configure(model, OmniVoiceGenerationConfig)
except Exception as exc: # pragma: no cover
startup_error = f"{type(exc).__name__}: {exc}"
raise
def _wav_bytes(audio: np.ndarray) -> bytes:
clipped = np.clip(audio, -1.0, 1.0)
pcm16 = (clipped * 32767.0).astype(np.int16)
buf = io.BytesIO()
with wave.open(buf, "wb") as wav:
wav.setnchannels(1)
wav.setsampwidth(2)
wav.setframerate(SAMPLE_RATE)
wav.writeframes(pcm16.tobytes())
return buf.getvalue()
def _parse_bool(val: str | None, default: bool) -> bool:
if val is None:
return default
return val.strip().lower() in ("1", "true", "yes", "on")
def _normalize_language(lang: str | None) -> str | None:
if not lang:
return None
cleaned = lang.strip()
if not cleaned or cleaned.lower() == "auto":
return None
return cleaned
def _resolve_mode(mode: str | None, has_ref_audio: bool, has_instruct: bool) -> str:
if mode:
m = mode.strip().lower()
if m in ("clone", "design", "auto"):
return m
if has_ref_audio:
return "clone"
if has_instruct:
return "design"
return "auto"
@app.get("/health")
def health() -> JSONResponse:
ready = model is not None and startup_error is None
return JSONResponse(
{
"status": "healthy" if ready else "starting",
"ready": ready,
"model_loaded": ready,
"model_id": MODEL_ID,
"device": DEVICE,
"startup_error": startup_error,
}
)
@app.get("/v1/models")
def list_models() -> JSONResponse:
return JSONResponse(
{
"object": "list",
"data": [
{
"id": "omnivoice",
"object": "model",
"owned_by": "pravinuxd",
"root": MODEL_ID,
}
],
}
)
@app.get("/v1/languages")
def list_languages() -> JSONResponse:
return JSONResponse({"languages": _LANGUAGES_SORTED, "count": len(_LANGUAGES_SORTED)})
@app.get("/v1/voice-design/attributes")
def list_voice_design_attributes() -> JSONResponse:
return JSONResponse({"attributes": _VOICE_DESIGN_ATTRIBUTES})
def _generate_audio(
*,
text: str,
mode: str,
ref_audio_path: str | None,
ref_text: str | None,
instruct: str | None,
language: str | None,
speed: float,
duration: float | None,
num_step: int,
guidance_scale: float,
denoise: bool,
preprocess_prompt: bool,
postprocess_output: bool,
) -> bytes:
if model is None:
raise HTTPException(status_code=503, detail=startup_error or "Model not ready")
gen_config = OmniVoiceGenerationConfig(
num_step=num_step,
guidance_scale=guidance_scale,
denoise=denoise,
preprocess_prompt=preprocess_prompt,
postprocess_output=postprocess_output,
)
kw: dict[str, Any] = {
"text": text,
"language": language,
"generation_config": gen_config,
}
if speed != 1.0:
kw["speed"] = speed
if duration is not None and duration > 0:
kw["duration"] = duration
if mode == "clone":
if not ref_audio_path:
raise HTTPException(status_code=400, detail="mode=clone requires ref_audio")
kw["voice_clone_prompt"] = model.create_voice_clone_prompt(
ref_audio=ref_audio_path,
ref_text=ref_text or None,
)
if instruct and instruct.strip():
kw["instruct"] = instruct.strip()
try:
generated = model.generate(**kw)
except HTTPException:
raise
except Exception as exc:
logging.exception("OmniVoice generation failed")
raise HTTPException(
status_code=500, detail=f"{type(exc).__name__}: {exc}"
) from exc
return _wav_bytes(generated[0])
@app.post("/v1/audio/speech")
async def synth_speech(
text: str = Form(...),
mode: str | None = Form(None),
ref_audio: UploadFile | None = File(None),
ref_text: str | None = Form(None),
instruct: str | None = Form(None),
language: str | None = Form(None),
speed: float = Form(1.0),
duration: float | None = Form(None),
num_step: int = Form(32),
guidance_scale: float = Form(2.0),
denoise: str | None = Form(None),
preprocess_prompt: str | None = Form(None),
postprocess_output: str | None = Form(None),
) -> StreamingResponse:
text = (text or "").strip()
if not text:
raise HTTPException(status_code=400, detail="text is required")
has_ref = ref_audio is not None and (ref_audio.filename or "")
has_instruct = bool(instruct and instruct.strip())
resolved_mode = _resolve_mode(mode, bool(has_ref), has_instruct)
tmp_path: str | None = None
try:
if resolved_mode == "clone":
if not has_ref:
raise HTTPException(
status_code=400, detail="mode=clone requires ref_audio"
)
audio_bytes = await ref_audio.read()
suffix = (
os.path.splitext(ref_audio.filename or "reference.wav")[1] or ".wav"
)
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
tmp.write(audio_bytes)
tmp_path = tmp.name
# Share the dub module's GPU lock so a long-running dub job and a
# live TTS request never fight for the single A40 GPU.
async with _dub.GPU_LOCK:
wav_bytes = await asyncio.to_thread(
_generate_audio,
text=text,
mode=resolved_mode,
ref_audio_path=tmp_path,
ref_text=(ref_text or None),
instruct=(instruct or None),
language=_normalize_language(language),
speed=float(speed),
duration=(float(duration) if duration is not None else None),
num_step=int(num_step or 32),
guidance_scale=float(guidance_scale),
denoise=_parse_bool(denoise, True),
preprocess_prompt=_parse_bool(preprocess_prompt, True),
postprocess_output=_parse_bool(postprocess_output, True),
)
headers = {"X-OmniVoice-Mode": resolved_mode}
return StreamingResponse(
io.BytesIO(wav_bytes), media_type="audio/wav", headers=headers
)
finally:
if tmp_path and os.path.exists(tmp_path):
os.unlink(tmp_path)
# ---------------------------------------------------------------------------
# Multi-character story endpoint
# ---------------------------------------------------------------------------
# Matches [S1] … [/S1] (or [S2], …, up to [S8]). The slot number is captured
# in group 1, optional attributes (e.g. `duration=2.5s`) in group 2, the inner
# text in group 3. We use a non-greedy match so two consecutive blocks like
# "[S1]hi[/S1] [S2]bye[/S2]" parse correctly.
_SPEAKER_TAG_RE = re.compile(
r"\[\s*S([1-8])([^\]]*)\](.*?)\[\s*/\s*S\1\s*\]",
re.DOTALL,
)
# Pure-silence directive embedded in dialogue text. Recognised inside any
# [Sn]…[/Sn] block; when the entire inner text matches we emit `silence(s)`
# without invoking the model. Supports `[pause=2s]`, `[pause=500ms]`,
# `[pause=2]` (defaults to seconds), and case-insensitive.
_PAUSE_TAG_RE = re.compile(
r"^\s*\[\s*pause\s*=\s*([0-9]*\.?[0-9]+)\s*(s|ms)?\s*\]\s*$",
re.IGNORECASE,
)
# Per-block `duration=` attribute parsed out of the speaker tag's attribute
# string (group 2 above).
_DURATION_ATTR_RE = re.compile(
r"duration\s*=\s*([0-9]*\.?[0-9]+)\s*(s|ms)?",
re.IGNORECASE,
)
def _parse_pause_seconds(text: str) -> float | None:
"""If `text` is a single `[pause=…]` directive return its length in seconds."""
m = _PAUSE_TAG_RE.match(text)
if not m:
return None
value = float(m.group(1))
unit = (m.group(2) or "s").lower()
return value / 1000.0 if unit == "ms" else value
def _parse_duration_attr(attrs: str) -> float | None:
"""Parse `duration=Xs` / `duration=Xms` from the `[Sn …]` attribute string."""
m = _DURATION_ATTR_RE.search(attrs or "")
if not m:
return None
value = float(m.group(1))
unit = (m.group(2) or "s").lower()
return value / 1000.0 if unit == "ms" else value
class CharacterConfig(BaseModel):
"""One character slot (S1..S8) used by /v1/audio/speech/multi."""
slot: int = Field(..., ge=1, le=8)
name: str | None = None
mode: str = Field(..., pattern="^(design|clone)$")
instruct: str | None = None
language: str | None = None
speed: float | None = None
# base64-encoded WAV/MP3/etc. — only used when mode == "clone".
ref_audio_b64: str | None = None
ref_text: str | None = None
class MultiSpeechRequest(BaseModel):
text: str
characters: list[CharacterConfig] = Field(default_factory=list)
# Default character used for any narrative text outside [Sn]…[/Sn] blocks.
# If omitted, narration is generated with mode=auto (random voice for the
# detected language).
narrator: CharacterConfig | None = None
# Common controls applied to every chunk unless the character overrides.
speed: float = 1.0
num_step: int = 32
guidance_scale: float = 2.0
denoise: bool = True
preprocess_prompt: bool = True
postprocess_output: bool = True
# Silence inserted between chunks for natural pacing.
inter_segment_silence_ms: int = 250
def _split_into_segments(
text: str,
) -> list[tuple[int | None, str, dict[str, float]]]:
"""Split `text` into (slot|None, chunk_text, attrs) segments.
Slot is None for narration (text outside any [Sn]…[/Sn] block).
Empty / whitespace-only chunks are dropped. `attrs` carries optional
metadata parsed from the speaker tag (currently `duration`).
"""
segments: list[tuple[int | None, str, dict[str, float]]] = []
cursor = 0
for match in _SPEAKER_TAG_RE.finditer(text):
before = text[cursor : match.start()]
if before.strip():
segments.append((None, before.strip(), {}))
slot = int(match.group(1))
attrs_str = match.group(2) or ""
inner = match.group(3).strip()
attrs: dict[str, float] = {}
dur = _parse_duration_attr(attrs_str)
if dur is not None and dur > 0:
attrs["duration"] = dur
if inner:
segments.append((slot, inner, attrs))
cursor = match.end()
tail = text[cursor:]
if tail.strip():
segments.append((None, tail.strip(), {}))
return segments
def _decode_ref_audio(b64: str) -> bytes:
try:
return base64.b64decode(b64, validate=True)
except (binascii.Error, ValueError) as exc:
raise HTTPException(
status_code=400, detail=f"Invalid ref_audio_b64: {exc}"
) from exc
def _silence_pcm(milliseconds: int) -> np.ndarray:
samples = max(0, int(SAMPLE_RATE * (milliseconds / 1000.0)))
return np.zeros(samples, dtype=np.float32)
def _wav_bytes_to_float_pcm(buf: bytes) -> np.ndarray:
"""Read a 16-bit PCM mono WAV (any sample rate) and return float32 [-1, 1].
If the sample rate doesn't match SAMPLE_RATE we keep it as-is and rely on
the model emitting at SAMPLE_RATE; this is a defensive helper used only
when we round-trip WAVs (we never receive WAVs from the model — we get
raw float arrays — so this branch is mostly here for testing).
"""
with wave.open(io.BytesIO(buf), "rb") as wav:
n = wav.getnframes()
raw = wav.readframes(n)
pcm16 = np.frombuffer(raw, dtype=np.int16)
return (pcm16.astype(np.float32) / 32767.0).copy()
def _generate_chunk(
*,
text: str,
character: CharacterConfig | None,
common_speed: float,
num_step: int,
guidance_scale: float,
denoise: bool,
preprocess_prompt: bool,
postprocess_output: bool,
block_duration: float | None = None,
) -> np.ndarray:
"""Generate a single chunk with the given character and return a float32 PCM array."""
if model is None:
raise HTTPException(status_code=503, detail=startup_error or "Model not ready")
mode = "auto"
instruct = None
language = None
speed = common_speed
ref_audio_path: str | None = None
ref_text: str | None = None
cleanup_paths: list[str] = []
try:
if character is not None:
mode = character.mode
instruct = (character.instruct or "").strip() or None
language = _normalize_language(character.language)
if character.speed is not None:
speed = character.speed
if mode == "clone":
if not character.ref_audio_b64:
raise HTTPException(
status_code=400,
detail=(
f"Character S{character.slot} (mode=clone) requires ref_audio_b64"
),
)
audio_bytes = _decode_ref_audio(character.ref_audio_b64)
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp:
tmp.write(audio_bytes)
ref_audio_path = tmp.name
cleanup_paths.append(ref_audio_path)
ref_text = (character.ref_text or "").strip() or None
gen_config = OmniVoiceGenerationConfig(
num_step=num_step,
guidance_scale=guidance_scale,
denoise=denoise,
preprocess_prompt=preprocess_prompt,
postprocess_output=postprocess_output,
)
kw: dict[str, Any] = {
"text": text,
"language": language,
"generation_config": gen_config,
}
# `duration` overrides `speed` per the OmniVoice contract — only set
# one or the other.
if block_duration is not None and block_duration > 0:
kw["duration"] = float(block_duration)
elif speed != 1.0:
kw["speed"] = speed
if mode == "clone" and ref_audio_path:
kw["voice_clone_prompt"] = model.create_voice_clone_prompt(
ref_audio=ref_audio_path,
ref_text=ref_text,
)
if instruct:
kw["instruct"] = instruct
try:
generated = model.generate(**kw)
except HTTPException:
raise
except Exception as exc:
logging.exception("OmniVoice multi-character chunk failed")
raise HTTPException(
status_code=500,
detail=f"{type(exc).__name__}: {exc}",
) from exc
# `generate` returns a tensor or numpy array shaped (T,) or (1, T).
chunk = generated[0]
if isinstance(chunk, torch.Tensor):
chunk = chunk.detach().cpu().float().numpy()
chunk = np.asarray(chunk, dtype=np.float32).reshape(-1)
return chunk
finally:
for path in cleanup_paths:
if path and os.path.exists(path):
try:
os.unlink(path)
except OSError:
pass
def _concat_pcm(arrays: list[np.ndarray], silence_samples: int) -> np.ndarray:
if not arrays:
return np.zeros(0, dtype=np.float32)
if len(arrays) == 1:
return arrays[0]
silence = np.zeros(silence_samples, dtype=np.float32)
out: list[np.ndarray] = []
for i, arr in enumerate(arrays):
if i > 0:
out.append(silence)
out.append(arr)
return np.concatenate(out, dtype=np.float32)
@app.post("/v1/audio/speech/multi")
async def synth_multi_speech(req: MultiSpeechRequest) -> StreamingResponse:
text = (req.text or "").strip()
if not text:
raise HTTPException(status_code=400, detail="text is required")
characters_by_slot: dict[int, CharacterConfig] = {c.slot: c for c in req.characters}
segments = _split_into_segments(text)
if not segments:
raise HTTPException(
status_code=400, detail="text has no generatable content after parsing"
)
referenced_slots = {slot for slot, _, _ in segments if slot is not None}
missing = sorted(referenced_slots - characters_by_slot.keys())
if missing:
raise HTTPException(
status_code=400,
detail=f"Missing character config for slot(s): {missing}",
)
chunks: list[np.ndarray] = []
# Hold the GPU for the entire multi-segment run so a competing dub job
# can't trample our KV cache mid-story. Pause segments don't touch the
# model so they execute instantly inside the lock.
async with _dub.GPU_LOCK:
for slot, chunk_text, attrs in segments:
pause_seconds = _parse_pause_seconds(chunk_text)
if pause_seconds is not None and pause_seconds > 0:
chunks.append(_silence_pcm(int(pause_seconds * 1000)))
continue
character = (
characters_by_slot.get(slot) if slot is not None else req.narrator
)
# Per-block target duration (Movie Dubbing): the speaker tag may carry
# `[Sn duration=2.5s]…[/Sn]`. We pass it down to OmniVoice so it can
# fit the chunk to the requested length.
block_duration = attrs.get("duration") if attrs else None
pcm = await asyncio.to_thread(
_generate_chunk,
text=chunk_text,
character=character,
common_speed=req.speed,
num_step=req.num_step,
guidance_scale=req.guidance_scale,
denoise=req.denoise,
preprocess_prompt=req.preprocess_prompt,
postprocess_output=req.postprocess_output,
block_duration=block_duration,
)
chunks.append(pcm)
silence_samples = max(0, int(SAMPLE_RATE * (req.inter_segment_silence_ms / 1000.0)))
combined = _concat_pcm(chunks, silence_samples=silence_samples)
wav_bytes = _wav_bytes(combined)
headers = {
"X-OmniVoice-Mode": "multi",
"X-OmniVoice-Segments": str(len(segments)),
}
return StreamingResponse(
io.BytesIO(wav_bytes), media_type="audio/wav", headers=headers
)
@app.post("/v1/audio/speech/clone")
async def clone_speech_compat(
text: str = Form(...),
ref_audio: UploadFile = File(...),
ref_text: str | None = Form(None),
instruct: str | None = Form(None),
language: str | None = Form(None),
speed: float = Form(1.0),
num_step: int = Form(32),
guidance_scale: float = Form(2.0),
) -> StreamingResponse:
"""Backward-compat shim — same as POST /v1/audio/speech with mode=clone."""
return await synth_speech( # type: ignore[return-value]
text=text,
mode="clone",
ref_audio=ref_audio,
ref_text=ref_text,
instruct=instruct,
language=language,
speed=speed,
duration=None,
num_step=num_step,
guidance_scale=guidance_scale,
denoise=None,
preprocess_prompt=None,
postprocess_output=None,
)