Spaces:
Paused
Paused
File size: 5,275 Bytes
fb56537 8b02702 fb56537 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
# app_api.py
#
# Copyright (C) August 4, 2025 Carlos Rodrigues dos Santos
#
# Versão 3.0.0 (API Head for Aduc Framework)
#
# Este arquivo implementa um servidor de API usando FastAPI para expor as
# funcionalidades do Aduc Framework. Ele permite o controle programático
# do processo de geração de vídeo.
import yaml
import logging
import uuid
from typing import Dict
from fastapi import FastAPI, BackgroundTasks, HTTPException
# --- 1. IMPORTAÇÃO DO FRAMEWORK E SEUS TIPOS ---
import aduc_framework
from aduc_framework.types import GenerationState, PreProductionParams, ProductionParams
# --- CONFIGURAÇÃO INICIAL ---
logger = logging.getLogger(__name__)
# Cria a aplicação FastAPI
app = FastAPI(
title="ADUC-SDR Framework API",
description="API para orquestração de geração de vídeo coerente com IA.",
version="3.0.0"
)
config[]
with open("config.yaml", 'r') as f:
config = yaml.safe_load(f)
WORKSPACE_DIR = config['application']['workspace_dir']
# Carrega a configuração e inicializa uma instância SINGLETON do framework.
# O framework é pesado e deve ser carregado apenas uma vez na inicialização da API.
try:
with open("config.yaml", 'r') as f: config = yaml.safe_load(f)
WORKSPACE_DIR = config['application']['workspace_dir']
aduc = aduc_framework.create_aduc_instance(workspace_dir=WORKSPACE_DIR)
logger.info("API FastAPI inicializada e conectada ao Aduc Framework.")
except Exception as e:
logger.critical(f"ERRO CRÍTICO durante a inicialização da API: {e}", exc_info=True)
# A API não pode funcionar sem o framework, então saímos se falhar.
exit()
# --- ARMAZENAMENTO DE TAREFAS EM MEMÓRIA ---
# Em um ambiente de produção real, isso seria substituído por um banco de dados
# ou um cache como Redis para persistir o estado das tarefas.
tasks_state: Dict[str, GenerationState] = {}
# --- FUNÇÕES DE BACKGROUND ---
def run_production_in_background(task_id: str, params: ProductionParams):
"""
Função que executa a tarefa de produção demorada em segundo plano.
Ela opera na instância global 'aduc' para modificar seu estado interno.
"""
logger.info(f"Background task {task_id}: Iniciando produção de vídeo...")
try:
# A tarefa do framework modifica o estado interno da instância 'aduc'
_, _, final_state = aduc.task_produce_original_movie(params=params)
# Armazena o estado final e completo no nosso "banco de dados" de tarefas
tasks_state[task_id] = final_state
logger.info(f"Background task {task_id}: Produção de vídeo concluída com sucesso.")
except Exception as e:
logger.error(f"Background task {task_id}: Falha na produção. Erro: {e}", exc_info=True)
# Opcional: Atualizar o estado da tarefa com uma mensagem de erro.
# --- ENDPOINTS DA API ---
@app.post("/v1/pre-production", response_model=GenerationState, tags=["Workflow"])
async def start_pre_production(params: PreProductionParams):
"""
Inicia e executa a etapa de pré-produção (storyboard e keyframes).
Esta é uma chamada síncrona, pois a pré-produção é relativamente rápida.
Ela retorna o estado de geração completo após a conclusão.
"""
logger.info(f"API: Recebida solicitação de pré-produção com prompt: '{params.prompt[:30]}...'")
try:
_, _, updated_state = aduc.task_pre_production(params=params)
return updated_state
except Exception as e:
logger.error(f"API: Erro na pré-produção: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Erro interno durante a pré-produção: {e}")
@app.post("/v1/production", status_code=202, tags=["Workflow"])
async def start_production(params: ProductionParams, background_tasks: BackgroundTasks):
"""
Inicia a tarefa de produção de vídeo principal em segundo plano.
Esta chamada retorna imediatamente com um `task_id`. Use o endpoint
`/v1/status/{task_id}` para verificar o progresso e obter o resultado final.
"""
task_id = str(uuid.uuid4())
logger.info(f"API: Recebida solicitação de produção. Criando tarefa de background com ID: {task_id}")
# Armazena o estado atual (pré-produção) antes de iniciar a nova tarefa
tasks_state[task_id] = aduc.get_current_state()
# Adiciona a função demorada para ser executada em segundo plano
background_tasks.add_task(run_production_in_background, task_id, params)
return {"message": "Produção de vídeo iniciada em segundo plano.", "task_id": task_id}
@app.get("/v1/status/{task_id}", response_model=GenerationState, tags=["Workflow"])
async def get_task_status(task_id: str):
"""
Verifica o estado de uma tarefa de geração em andamento ou concluída.
"""
logger.info(f"API: Verificando status da tarefa {task_id}")
state = tasks_state.get(task_id)
if not state:
raise HTTPException(status_code=404, detail="ID de tarefa não encontrado.")
# Retorna o estado mais recente que temos para essa tarefa
return state
@app.get("/health", tags=["Infra"])
async def health_check():
"""
Endpoint simples para verificar se a API está online.
"""
return {"status": "ok"} |