|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
|
|
|
import os |
|
|
os.environ["CUDA_VISIBLE_DEVICES"] = "0" |
|
|
|
|
|
from dataclasses import dataclass |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, List, Optional, Tuple |
|
|
|
|
|
import json |
|
|
import logging |
|
|
import math |
|
|
import os |
|
|
import shlex |
|
|
import subprocess |
|
|
|
|
|
import numpy as np |
|
|
import torch |
|
|
import torchaudio |
|
|
import torchaudio.transforms as T |
|
|
from transformers import WhisperProcessor, WhisperForConditionalGeneration |
|
|
from pyannote.audio import Pipeline as PyannotePipeline |
|
|
from speechbrain.inference.speaker import SpeakerRecognition |
|
|
from pydub import AudioSegment |
|
|
from sklearn.cluster import KMeans |
|
|
from sklearn.metrics import silhouette_score |
|
|
from scenedetect import VideoManager, SceneManager |
|
|
from scenedetect.detectors import ContentDetector |
|
|
|
|
|
import os, base64, requests, subprocess, contextlib, time |
|
|
|
|
|
from transformers import AutoProcessor, LlavaForConditionalGeneration |
|
|
from PIL import Image |
|
|
|
|
|
from audio_tools import process_audio_for_video |
|
|
from llm_router import load_yaml, LLMRouter |
|
|
|
|
|
import cv2 |
|
|
|
|
|
try: |
|
|
import face_recognition |
|
|
except Exception: |
|
|
face_recognition = None |
|
|
|
|
|
|
|
|
DFRecognizer = None |
|
|
|
|
|
try: |
|
|
from deepface import DeepFace |
|
|
except ImportError: |
|
|
DeepFace = None |
|
|
|
|
|
import easyocr |
|
|
|
|
|
|
|
|
log = logging.getLogger("audio_tools") |
|
|
if not log.handlers: |
|
|
h = logging.StreamHandler() |
|
|
h.setFormatter(logging.Formatter("[%(levelname)s] %(message)s")) |
|
|
log.addHandler(h) |
|
|
log.setLevel(logging.INFO) |
|
|
|
|
|
|
|
|
def load_config(path: str = "configs/config_veureu.yaml") -> Dict[str, Any]: |
|
|
p = Path(path) |
|
|
if not p.exists(): |
|
|
log.warning("Config file not found: %s (using defaults)", path) |
|
|
return {} |
|
|
try: |
|
|
import yaml |
|
|
cfg = yaml.safe_load(p.read_text(encoding="utf-8")) or {} |
|
|
cfg["__path__"] = str(p) |
|
|
return cfg |
|
|
except Exception as e: |
|
|
log.error("Failed to read YAML config: %s", e) |
|
|
return {} |
|
|
|
|
|
|
|
|
class FaceOfImageEmbedding: |
|
|
"""Preferred backend: `face_recognition`; fallback: DeepFace via libs.face_utils.""" |
|
|
def __init__(self, deepface_model: str = 'Facenet512'): |
|
|
self.use_fr = face_recognition is not None |
|
|
self.df = None |
|
|
if not self.use_fr and DFRecognizer is not None: |
|
|
try: |
|
|
self.df = DFRecognizer(model_name=deepface_model) |
|
|
log.info("Using DeepFace (%s) as face embedding backend.", deepface_model) |
|
|
except Exception as e: |
|
|
log.warning("Failed to initialize DeepFace: %s", e) |
|
|
elif self.use_fr: |
|
|
log.info("Using face_recognition as face embedding backend.") |
|
|
else: |
|
|
log.error("No face embedding backend available.") |
|
|
|
|
|
def encode_image(self, image_path: Path) -> Optional[List[float]]: |
|
|
import numpy as np |
|
|
try: |
|
|
if self.use_fr: |
|
|
img = face_recognition.load_image_file(str(image_path)) |
|
|
encs = face_recognition.face_encodings(img) |
|
|
if encs: |
|
|
|
|
|
embeddings = [(e / np.linalg.norm(e)).astype(float).tolist() for e in encs] |
|
|
return embeddings |
|
|
return None |
|
|
|
|
|
if self.df is not None: |
|
|
emb = self.df.get_face_embedding_from_path(str(image_path)) |
|
|
if emb is None: |
|
|
return None |
|
|
|
|
|
emb = np.array(emb, dtype=float) |
|
|
emb = emb / np.linalg.norm(emb) |
|
|
return emb.tolist() |
|
|
|
|
|
except Exception as e: |
|
|
log.debug("Fallo embedding cara %s: %s", image_path, e) |
|
|
|
|
|
return None |
|
|
|
|
|
class FaceAnalyzer: |
|
|
"""Wrapper sencillo para DeepFace que obtiene edad y género de una imagen.""" |
|
|
def __init__(self, actions=None): |
|
|
if actions is None: |
|
|
actions = ["age", "gender"] |
|
|
self.actions = actions |
|
|
if DeepFace is None: |
|
|
log.warning("DeepFace not available - FaceAnalyzer will return None") |
|
|
|
|
|
def analyze_image(self, img_path: str) -> Optional[Dict[str, Any]]: |
|
|
if DeepFace is None: |
|
|
return None |
|
|
try: |
|
|
result = DeepFace.analyze(img_path=img_path, actions=self.actions) |
|
|
|
|
|
|
|
|
if isinstance(result, list) and len(result) > 0: |
|
|
result = result[0] |
|
|
|
|
|
|
|
|
return { |
|
|
"age": result.get("age", "unknown"), |
|
|
"gender": result.get("dominant_gender", "unknown") |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
log.warning("No se pudo analizar la imagen %s: %s", img_path, e) |
|
|
return None |
|
|
|
|
|
|
|
|
def map_identities_per_second(frames_per_second, intervals): |
|
|
for seg in intervals: |
|
|
seg_start = seg["start"] |
|
|
seg_end = seg["end"] |
|
|
|
|
|
|
|
|
identities = [] |
|
|
for f in frames_per_second: |
|
|
if seg_start <= f["start"] <= seg_end: |
|
|
for face in f.get("faces", []): |
|
|
identities.append(face) |
|
|
|
|
|
|
|
|
seg["counts"] = dict(Counter(identities)) |
|
|
|
|
|
return intervals |
|
|
|
|
|
def _split_montage(img: np.ndarray, n: int, cfg: Dict[str, Any]) -> List[np.ndarray]: |
|
|
vd = cfg.get('vision_describer', {}) |
|
|
montage_cfg = vd.get('montage', {}) |
|
|
mode = montage_cfg.get('split_mode', 'horizontal') |
|
|
|
|
|
h, w = img.shape[:2] |
|
|
tiles: List[np.ndarray] = [] |
|
|
|
|
|
if mode == 'vertical': |
|
|
tile_h = h // n |
|
|
for i in range(n): |
|
|
y0 = i * tile_h; y1 = h if i == n-1 else (i+1) * tile_h |
|
|
tiles.append(img[y0:y1, 0:w]) |
|
|
return tiles |
|
|
|
|
|
if mode == 'grid': |
|
|
rows = int(montage_cfg.get('rows', 1) or 1) |
|
|
cols = int(montage_cfg.get('cols', n) or n) |
|
|
assert rows * cols >= n, "grid rows*cols must be >= n" |
|
|
tile_h = h // rows; tile_w = w // cols |
|
|
k = 0 |
|
|
for r in range(rows): |
|
|
for c in range(cols): |
|
|
if k >= n: break |
|
|
y0, y1 = r*tile_h, h if (r==rows-1) else (r+1)*tile_h |
|
|
x0, x1 = c*tile_w, w if (c==cols-1) else (c+1)*tile_w |
|
|
tiles.append(img[y0:y1, x0:x1]); k += 1 |
|
|
return tiles |
|
|
|
|
|
tile_w = w // n |
|
|
for i in range(n): |
|
|
x0 = i * tile_w; x1 = w if i == n-1 else (i+1) * tile_w |
|
|
tiles.append(img[0:h, x0:x1]) |
|
|
return tiles |
|
|
|
|
|
def generar_montage(frame_paths: List[str], output_dir: str) -> None: |
|
|
output_path = Path(output_dir) |
|
|
output_path.mkdir(parents=True, exist_ok=True) |
|
|
montage_path = "" |
|
|
|
|
|
if frame_paths: |
|
|
imgs = [cv2.imread(kf) for kf in frame_paths if os.path.exists(kf)] |
|
|
imgs = [img for img in imgs if img is not None] |
|
|
print(f"Se encontraron {len(imgs)} imágenes para el montaje.") |
|
|
|
|
|
if imgs: |
|
|
h = max(img.shape[0] for img in imgs) |
|
|
imgs_resized = [cv2.resize(img, (int(img.shape[1]*h/img.shape[0]), h)) for img in imgs] |
|
|
montage = cv2.hconcat(imgs_resized) |
|
|
montage_path = os.path.join(output_dir, "keyframes_montage.jpg") |
|
|
print(f"Guardando montaje en: {montage_path}") |
|
|
cv2.imwrite(montage_path, montage) |
|
|
print("Montaje guardado.") |
|
|
else: |
|
|
print("No se encontraron imágenes válidas para el montaje.") |
|
|
|
|
|
return montage_path |
|
|
|
|
|
def describe_montage_sequence( |
|
|
montage_path: str, |
|
|
n: int, |
|
|
informacion, |
|
|
face_identities, |
|
|
*, |
|
|
config_path: str = 'config.yaml' |
|
|
) -> Dict[str, Any]: |
|
|
"""Describe each sub-image of a montage using remote Space (svision) via LLMRouter. |
|
|
|
|
|
Returns a list of descriptions, one per tile. |
|
|
""" |
|
|
|
|
|
img = cv2.imread(montage_path, cv2.IMREAD_COLOR) |
|
|
if img is None: |
|
|
raise RuntimeError(f"No se puede leer la imagen: {montage_path}") |
|
|
|
|
|
|
|
|
cfg = load_yaml(config_path) |
|
|
tiles = _split_montage(img, n, cfg) |
|
|
if len(tiles) < n: |
|
|
raise RuntimeError(f"Se produjeron {len(tiles)} tiles, se esperaban {n}") |
|
|
|
|
|
|
|
|
out_dir = Path(montage_path).parent |
|
|
frame_paths: List[str] = [] |
|
|
for i, t in enumerate(tiles): |
|
|
p = out_dir / f"tile_{i:03d}.jpg" |
|
|
cv2.imwrite(str(p), t) |
|
|
frame_paths.append(str(p)) |
|
|
|
|
|
|
|
|
context = { |
|
|
"informacion": informacion, |
|
|
"face_identities": sorted(list(face_identities or set())), |
|
|
} |
|
|
model_name = (cfg.get("models", {}).get("vision") or "salamandra-vision") |
|
|
router = LLMRouter(cfg) |
|
|
descs = router.vision_describe(frame_paths, context=context, model=model_name) |
|
|
return descs |
|
|
|
|
|
|
|
|
def keyframe_conditional_extraction_ana( |
|
|
video_path, |
|
|
output_dir, |
|
|
threshold=30.0, |
|
|
offset_frames=10 |
|
|
): |
|
|
""" |
|
|
Detecta cambios de escena en un vídeo, guarda un fotograma por cada cambio, |
|
|
devuelve intervalos con start y end basados en los tiempos de los keyframes |
|
|
y genera un montaje con todos los keyframes. |
|
|
""" |
|
|
if not os.path.exists(output_dir): |
|
|
os.makedirs(output_dir) |
|
|
|
|
|
video_manager = VideoManager([video_path]) |
|
|
scene_manager = SceneManager() |
|
|
scene_manager.add_detector(ContentDetector(threshold=threshold)) |
|
|
|
|
|
video_manager.start() |
|
|
scene_manager.detect_scenes(video_manager) |
|
|
|
|
|
scene_list = scene_manager.get_scene_list() |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
total_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT) |
|
|
video_duration = total_frames / fps |
|
|
|
|
|
keyframes = [] |
|
|
for i, (start_time, end_time) in enumerate(scene_list): |
|
|
frame_number = int(start_time.get_frames()) + offset_frames |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) |
|
|
ret, frame = cap.read() |
|
|
if ret: |
|
|
ts = frame_number / fps |
|
|
frame_path = os.path.join(output_dir, f"scene_{i+1:03d}.jpg") |
|
|
cv2.imwrite(frame_path, frame) |
|
|
keyframes.append({ |
|
|
"index": i+1, |
|
|
"time": round(ts, 2), |
|
|
"path": frame_path |
|
|
}) |
|
|
|
|
|
cap.release() |
|
|
video_manager.release() |
|
|
|
|
|
|
|
|
intervals = [] |
|
|
for i, kf in enumerate(keyframes): |
|
|
start = kf["time"] |
|
|
if i < len(keyframes) - 1: |
|
|
end = keyframes[i+1]["time"] |
|
|
else: |
|
|
end = video_duration |
|
|
intervals.append({ |
|
|
"index": kf["index"], |
|
|
"start": start, |
|
|
"end": round(end, 2), |
|
|
"path": kf["path"] |
|
|
}) |
|
|
|
|
|
return intervals |
|
|
|
|
|
def keyframe_every_second( |
|
|
video_path: str, |
|
|
output_dir: str = ".", |
|
|
max_frames: Optional[int] = 10000, |
|
|
) -> List[dict]: |
|
|
""" |
|
|
Extrae un fotograma por cada segundo del video. |
|
|
|
|
|
Returns: |
|
|
List[dict]: Cada elemento es {"index", "start", "end", "path"} |
|
|
""" |
|
|
out_dir = Path(output_dir) |
|
|
out_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
cap = cv2.VideoCapture(str(video_path)) |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
duration = total_frames / fps |
|
|
|
|
|
frames: List[dict] = [] |
|
|
idx = 0 |
|
|
sec = 0.0 |
|
|
|
|
|
while sec <= duration: |
|
|
frame_number = int(sec * fps) |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
timestamp = frame_number / fps |
|
|
frame_path = out_dir / f"frame_per_second{idx:03d}.jpg" |
|
|
cv2.imwrite(str(frame_path), frame) |
|
|
|
|
|
frames.append({ |
|
|
"index": idx + 1, |
|
|
"start": round(timestamp, 2), |
|
|
"end": None, |
|
|
"path": str(frame_path), |
|
|
}) |
|
|
|
|
|
idx += 1 |
|
|
sec += 1.0 |
|
|
|
|
|
if max_frames and idx >= max_frames: |
|
|
break |
|
|
|
|
|
cap.release() |
|
|
|
|
|
|
|
|
for i in range(len(frames)): |
|
|
if i < len(frames) - 1: |
|
|
frames[i]["end"] = frames[i+1]["start"] |
|
|
else: |
|
|
frames[i]["end"] = round(duration, 2) |
|
|
|
|
|
return frames |
|
|
|
|
|
from collections import Counter, defaultdict |
|
|
|
|
|
|
|
|
def process_frames( |
|
|
frames: List[dict], |
|
|
config: dict, |
|
|
face_col=None, |
|
|
embedding_model=None, |
|
|
) -> Tuple[List[dict], List[int]]: |
|
|
""" |
|
|
Procesa keyframes: |
|
|
- Detecta caras |
|
|
- Genera embeddings con FaceEmbedding |
|
|
- Opcionalmente compara con face_col (KNN top-3) |
|
|
- Opcionalmente ejecuta OCR |
|
|
""" |
|
|
|
|
|
frame_results = [] |
|
|
|
|
|
|
|
|
if embedding_model is None: |
|
|
embedding_model = FaceOfImageEmbedding() |
|
|
|
|
|
for idx, frame in enumerate(frames): |
|
|
frame_path = frame["path"] |
|
|
|
|
|
try: |
|
|
raw_faces = embedding_model.encode_image(Path(frame_path)) |
|
|
except Exception as e: |
|
|
print(f"Error procesando {frame_path}: {e}") |
|
|
raw_faces = None |
|
|
|
|
|
faces = [] |
|
|
if raw_faces is not None: |
|
|
if isinstance(raw_faces[0], list): |
|
|
for e in raw_faces: |
|
|
faces.append({"embedding": e}) |
|
|
else: |
|
|
faces.append({"embedding": raw_faces}) |
|
|
|
|
|
faces_detected = [] |
|
|
for f in faces: |
|
|
embedding = f.get("embedding") |
|
|
identity = "Unknown" |
|
|
knn = [] |
|
|
|
|
|
if face_col is not None and embedding is not None: |
|
|
try: |
|
|
num_embeddings = face_col.count() |
|
|
if num_embeddings < 1: |
|
|
knn = [] |
|
|
identity = "Unknown" |
|
|
|
|
|
else: |
|
|
n_results = min(3, num_embeddings) |
|
|
q = face_col.query( |
|
|
query_embeddings=[embedding], |
|
|
n_results=n_results, |
|
|
include=["metadatas", "distances"] |
|
|
) |
|
|
|
|
|
knn = [] |
|
|
metas = q.get("metadatas", [[]])[0] |
|
|
dists = q.get("distances", [[]])[0] |
|
|
for meta, dist in zip(metas, dists): |
|
|
person_id = meta.get("identity", "Unknown") if isinstance(meta, dict) else "Unknown" |
|
|
knn.append({"identity": person_id, "distance": float(dist)}) |
|
|
|
|
|
if knn and knn[0]["distance"] < 0.6: |
|
|
identity = knn[0]["identity"] |
|
|
else: |
|
|
identity = "Unknown" |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Face KNN failed: {e}") |
|
|
knn = [] |
|
|
identity = "Unknown" |
|
|
|
|
|
faces_detected.append(identity) |
|
|
|
|
|
use_easyocr = True |
|
|
if use_easyocr: |
|
|
try: |
|
|
reader = easyocr.Reader(['en', 'es'], gpu=True) |
|
|
results = reader.readtext(frame_path) |
|
|
ocr_text_easyocr = " ".join([text for _, text, _ in results]).strip() |
|
|
|
|
|
except Exception as e: |
|
|
print(f"OCR error: {e}") |
|
|
|
|
|
frame_results.append({ |
|
|
"id": frame["index"], |
|
|
"start": frame["start"], |
|
|
"end": frame["end"], |
|
|
"image_path": frame_path, |
|
|
"faces": faces_detected, |
|
|
"ocr": ocr_text_easyocr, |
|
|
}) |
|
|
|
|
|
return frame_results |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import argparse |
|
|
ap = argparse.ArgumentParser(description="Veureu — Audio tools (self-contained)") |
|
|
ap.add_argument("--video", required=True) |
|
|
ap.add_argument("--out", default="results") |
|
|
ap.add_argument("--config", default="configs/config_veureu.yaml") |
|
|
args = ap.parse_args() |
|
|
|
|
|
|
|
|
import yaml |
|
|
cfg = {} |
|
|
p = Path(args.config) |
|
|
if p.exists(): |
|
|
cfg = yaml.safe_load(p.read_text(encoding="utf-8")) or {} |
|
|
|
|
|
out_dir = Path(args.out) / Path(args.video).stem |
|
|
out_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
segs, srt = process_audio_for_video(args.video, out_dir, cfg, voice_collection=None) |
|
|
print(json.dumps({ |
|
|
"segments": len(segs), |
|
|
"srt": srt |
|
|
}, indent=2, ensure_ascii=False)) |