# identity_encoding.py (updated to use libs/*) # Veureu — Identity Encoder (faces, voices, scenarios) # ----------------------------------------------------------------------------- # This script replaces the original `identity_encoding.py` but **reuses** # as much as possible the functions already present in `libs/`. # It respects the project's path structure (identities/*, scenarios, chroma_db, # results) and maintains the classic pipeline: # 1) index_faces (ChromaDB) # 2) identity_features.csv # 3) index_voices (ChromaDB) # 4) scenarios_descriptions.csv # 5) index_scenarios (ChromaDB) # ----------------------------------------------------------------------------- from __future__ import annotations import argparse import csv import logging import sys import uuid from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Tuple # ============================ LOGGING ======================================== logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') log = logging.getLogger("identity_encoding") # ============================ DEPENDENCIES =================================== # ChromaDB (persistente) try: import chromadb from chromadb.config import Settings # noqa: F401 except Exception as e: chromadb = None # type: ignore log.error("No se pudo importar chromadb: %s", e) from vision_tools import FaceAnalyzer from collections import Counter # Audio: reuse get_embedding from the existing pipeline from audio_tools import VoiceEmbedder from vision_tools import FaceOfImageEmbedding # Optional try: import numpy as np except Exception: np = None # type: ignore # ============================ UTILITIES ===================================== IMG_EXT = {".jpg", ".jpeg", ".png", ".bmp", ".webp"} AUD_EXT = {".wav", ".mp3", ".flac", ".m4a", ".ogg"} def list_files(root: Path, exts: Iterable[str]) -> List[Path]: root = Path(root) if not root.exists(): return [] return [p for p in root.rglob('*') if p.suffix.lower() in exts] def ensure_chroma(db_dir: Path): if chromadb is None: raise RuntimeError("chromadb no instalado. pip install chromadb") db_dir.mkdir(parents=True, exist_ok=True) # Nueva forma de crear un cliente persistente client = chromadb.Client(Settings( chroma_db_impl="duckdb+parquet", persist_directory=str(db_dir) )) return client # ============================ 1) INDEX FACES ================================= def build_faces_index(faces_dir: Path, client, collection_name: str = "index_faces", deepface_model: str = 'Facenet512', drop: bool = True) -> int: # idempotency if collection_name in [c.name for c in client.list_collections()] and drop: client.delete_collection(name=collection_name) col = client.get_or_create_collection(name=collection_name) be = FaceOfImageEmbedding(deepface_model=deepface_model) count = 0 registered_identities = set() # 👈 para no repetir nombres for ident_dir in sorted(Path(faces_dir).iterdir() if Path(faces_dir).exists() else []): if not ident_dir.is_dir(): continue ident = ident_dir.name for img_path in list_files(ident_dir, IMG_EXT): embeddings = be.encode_image(img_path) if embeddings is None: log.warning("No face embedding in %s", img_path) continue # Aplanar para que cada embedding sea una lista de floats for e in (embeddings if isinstance(embeddings[0], list) else [embeddings]): uid = str(uuid.uuid4()) col.add(ids=[uid], embeddings=[e], metadatas=[{"identity": ident, "path": str(img_path)}]) count += 1 registered_identities.add(ident) # 👈 guardamos el nombre # Mensajes finales print("Ha acabado de crear la base de datos.") print(f"Total de embeddings guardados: {count}") print("Identidades registradas:") for name in sorted(registered_identities): print(f" - {name}") log.info("index_faces => %d embeddings", count) return count # ===================== 2) IDENTITY FEATURES CSV ============================== def aggregate_face_attributes(faces_dir: Path, out_csv: Path) -> int: """ Procesa un directorio de caras por identidad y genera un CSV con edad y género. Usa FaceAnalyzer para extraer atributos. """ # Inicializa el analizador # FaceAnalyzer already imported at module level analyzer = FaceAnalyzer() rows: List[Dict[str, Any]] = [] faces_dir = Path(faces_dir) if not faces_dir.exists() or not faces_dir.is_dir(): log.error("El directorio de caras no existe: %s", faces_dir) return 0 def most_common(lst, default="unknown"): return Counter(lst).most_common(1)[0][0] if lst else default # Itera sobre cada identidad for ident_dir in sorted(faces_dir.iterdir()): if not ident_dir.is_dir(): continue ident = ident_dir.name attrs: List[Dict[str, Any]] = [] log.info("Procesando identidad: %s", ident) for img_path in sorted(list_files(ident_dir, IMG_EXT)): try: data = analyzer.analyze_image(str(img_path)) if data: attrs.append(data) except Exception as e: log.warning("Error procesando imagen %s: %s", img_path, e) genders = [a.get("gender", "unknown") for a in attrs] ages = [a.get("age", "unknown") for a in attrs] # Contexto opcional por identidad context_txt = (faces_dir.parent / "context" / f"{ident}.txt") identity_context = context_txt.read_text(encoding="utf-8").strip() if context_txt.exists() else "" rows.append({ "identity": ident, "samples": len(attrs), "gender": most_common(genders), "age_bucket": most_common(ages), "identity_context": identity_context, }) log.info("Procesados %d atributos para %s", len(attrs), ident) # Guardar CSV out_csv.parent.mkdir(parents=True, exist_ok=True) with out_csv.open("w", newline='', encoding="utf-8") as f: fieldnames = list(rows[0].keys()) if rows else ["identity", "identity_context"] writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) log.info("CSV generado correctamente: %s", out_csv) return len(rows) # ============================ 3) INDEX VOICES ================================= from pydub import AudioSegment # agregar al inicio de tu archivo junto a otros imports def build_voices_index(voices_dir: Path, client, collection_name: str = "index_voices", drop: bool = True) -> int: if collection_name in [c.name for c in client.list_collections()] and drop: client.delete_collection(name=collection_name) col = client.get_or_create_collection(name=collection_name) ve = VoiceEmbedder() count = 0 for ident_dir in sorted(Path(voices_dir).iterdir() if Path(voices_dir).exists() else []): if not ident_dir.is_dir(): continue ident = ident_dir.name for wav_path in list_files(ident_dir, AUD_EXT): # Intentar embed directamente try: emb = ve.embed(wav_path) except Exception as e: log.warning("Error leyendo audio %s: %s. Intentando reconvertir...", wav_path, e) # Reconversión automática a WAV PCM try: audio = AudioSegment.from_file(wav_path) fixed_path = wav_path.with_name(wav_path.stem + "_fixed.wav") audio.export(fixed_path, format="wav") log.info("Archivo convertido a WAV compatible: %s", fixed_path) emb = ve.embed(fixed_path) except Exception as e2: log.error("No se pudo generar embedding tras reconversión para %s: %s", wav_path, e2) continue # saltar este archivo if emb is None: log.warning("No voice embedding en %s", wav_path) continue uid = str(uuid.uuid4()) col.add(ids=[uid], embeddings=[emb], metadatas=[{"identity": ident, "path": str(wav_path)}]) count += 1 log.info("index_voices => %d embeddings", count) return count # ============================ 4) SCENARIOS ================================== @dataclass class VisionClient: provider: str = "none" # placeholder to plug in an LLM if desired def describe(self, image_path: str, prompt: str) -> str: return (f"Automatic description (placeholder) for {Path(image_path).name}. " f"{prompt}") class TextEmbedder: """Text embeddings with Sentence-Transformers if available; fallback to TF-IDF.""" def __init__(self, model_name: str = "all-MiniLM-L6-v2"): self.kind = "tfidf"; self.model = None; self.vectorizer = None try: from sentence_transformers import SentenceTransformer self.model = SentenceTransformer(model_name) self.kind = "sbert" except Exception: from sklearn.feature_extraction.text import TfidfVectorizer self.vectorizer = TfidfVectorizer(max_features=768) def fit(self, texts: List[str]): if self.vectorizer is not None: self.vectorizer.fit(texts) def encode(self, texts: List[str]) -> List[List[float]]: if self.model is not None: arr = self.model.encode(texts, convert_to_numpy=True) return arr.astype(float).tolist() X = self.vectorizer.transform(texts) if self.vectorizer is not None else None return (X.toarray().astype(float).tolist() if X is not None else [[0.0]*128 for _ in texts]) def build_scenarios_descriptions(scenarios_dir: Path, out_csv: Path, vision: VisionClient, sample_per_scenario: int = 12) -> Tuple[int, List[Dict[str, Any]]]: rows: List[Dict[str, Any]] = [] for scen_dir in sorted(Path(scenarios_dir).iterdir() if Path(scenarios_dir).exists() else []): if not scen_dir.is_dir(): continue scen = scen_dir.name descs: List[str] = [] imgs = list_files(scen_dir, IMG_EXT)[:sample_per_scenario] for img in imgs: d = vision.describe(str(img), prompt="Describe location, time period, lighting, and atmosphere without mentioning people or time of day.") if d: descs.append(d) if not descs: descs = [f"Scenario {scen} (no images)"] rows.append({"scenario": scen, "descriptions": " \n".join(descs)}) out_csv.parent.mkdir(parents=True, exist_ok=True) with out_csv.open("w", newline='', encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=["scenario", "descriptions"]) w.writeheader(); w.writerows(rows) log.info("scenarios_descriptions => %s", out_csv) return len(rows), rows def build_scenarios_index(client, rows: List[Dict[str, Any]], embedder: TextEmbedder, collection_name: str = "index_scenarios", drop: bool = True) -> int: texts = [r["descriptions"] for r in rows] embedder.fit(texts) embs = embedder.encode(texts) if collection_name in [c.name for c in client.list_collections()] and drop: client.delete_collection(name=collection_name) col = client.get_or_create_collection(name=collection_name) for r, e in zip(rows, embs): col.add(ids=[r["scenario"]], embeddings=[e], metadatas=[{"scenario": r["scenario"]}]) log.info("index_scenarios => %d descriptions", len(rows)) return len(rows) # ================================ CLI ======================================== def main(): ap = argparse.ArgumentParser(description="Veureu — Build identity/scenario indices and CSVs") ap.add_argument('--faces_dir', default='identities/faces', help='Root directory of face images per identity') ap.add_argument('--voices_dir', default='identities/voices', help='Root directory of voice clips per identity') ap.add_argument('--scenarios_dir', default='scenarios', help='Root directory of scenario folders with images') ap.add_argument('--db_dir', default='chroma_db', help='ChromaDB persistence directory') ap.add_argument('--out_dir', default='results', help='Output directory for CSVs') ap.add_argument('--drop_collections', action='store_true', help='Delete collections if they exist before rebuilding') ap.add_argument('--deepface_model', default='Facenet512', help='DeepFace model to use as fallback') ap.add_argument('--scenario_samples', type=int, default=12, help='Number of images per scenario to describe') args = ap.parse_args() faces_dir = Path(args.faces_dir) voices_dir = Path(args.voices_dir) print(voices_dir) scenarios_dir = Path(args.scenarios_dir) out_dir = Path(args.out_dir); out_dir.mkdir(parents=True, exist_ok=True) client = ensure_chroma(Path(args.db_dir)) # 1) Faces index build_faces_index(faces_dir, client, collection_name="index_faces", deepface_model=args.deepface_model, drop=args.drop_collections) # 2) Identity features CSV #id_csv = out_dir / 'identity_features.csv' #aggregate_face_attributes(faces_dir, id_csv) # 3) Voices index build_voices_index(voices_dir, client, collection_name="index_voices", drop=args.drop_collections) # 4) Scenarios descriptions #vision = VisionClient() #scen_csv = out_dir / 'scenarios_descriptions.csv' #_, scen_rows = build_scenarios_descriptions(scenarios_dir, scen_csv, vision, sample_per_scenario=args.scenario_samples) # 5) Scenarios index #embedder = TextEmbedder() #build_scenarios_index(client, scen_rows, embedder, collection_name="index_scenarios", drop=args.drop_collections) log.info("✅ Identity encoding completed.") if __name__ == '__main__' and '--video' not in sys.argv: main()