engine / casting_loader.py
VeuReu's picture
Upload 11 files
7b5dd83 verified
raw
history blame
14.1 kB
# identity_encoding.py (updated to use libs/*)
# Veureu β€” Identity Encoder (faces, voices, scenarios)
# -----------------------------------------------------------------------------
# This script replaces the original `identity_encoding.py` but **reuses**
# as much as possible the functions already present in `libs/`.
# It respects the project's path structure (identities/*, scenarios, chroma_db,
# results) and maintains the classic pipeline:
# 1) index_faces (ChromaDB)
# 2) identity_features.csv
# 3) index_voices (ChromaDB)
# 4) scenarios_descriptions.csv
# 5) index_scenarios (ChromaDB)
# -----------------------------------------------------------------------------
from __future__ import annotations
import argparse
import csv
import logging
import sys
import uuid
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
# ============================ LOGGING ========================================
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
log = logging.getLogger("identity_encoding")
# ============================ DEPENDENCIES ===================================
# ChromaDB (persistente)
try:
import chromadb
except Exception as e:
chromadb = None # type: ignore
log.error("No se pudo importar chromadb: %s", e)
from vision_tools import FaceAnalyzer
from collections import Counter
# Audio: reuse get_embedding from the existing pipeline
from audio_tools import VoiceEmbedder
from vision_tools import FaceOfImageEmbedding
# Optional
try:
import numpy as np
except Exception:
np = None # type: ignore
# ============================ UTILITIES =====================================
IMG_EXT = {".jpg", ".jpeg", ".png", ".bmp", ".webp"}
AUD_EXT = {".wav", ".mp3", ".flac", ".m4a", ".ogg"}
def list_files(root: Path, exts: Iterable[str]) -> List[Path]:
root = Path(root)
if not root.exists():
return []
return [p for p in root.rglob('*') if p.suffix.lower() in exts]
def ensure_chroma(db_dir: Path):
if chromadb is None:
raise RuntimeError("chromadb no instalado. pip install chromadb")
db_dir.mkdir(parents=True, exist_ok=True)
# Nueva API (>=0.5): cliente persistente directo
client = chromadb.PersistentClient(path=str(db_dir))
return client
# ============================ 1) INDEX FACES =================================
def build_faces_index(faces_dir: Path, client, collection_name: str = "index_faces",
deepface_model: str = 'Facenet512', drop: bool = True) -> int:
# idempotency
if collection_name in [c.name for c in client.list_collections()] and drop:
client.delete_collection(name=collection_name)
col = client.get_or_create_collection(name=collection_name)
be = FaceOfImageEmbedding(deepface_model=deepface_model)
count = 0
registered_identities = set() # πŸ‘ˆ para no repetir nombres
for ident_dir in sorted(Path(faces_dir).iterdir() if Path(faces_dir).exists() else []):
if not ident_dir.is_dir():
continue
ident = ident_dir.name
for img_path in list_files(ident_dir, IMG_EXT):
embeddings = be.encode_image(img_path)
if embeddings is None:
log.warning("No face embedding in %s", img_path)
continue
# Aplanar para que cada embedding sea una lista de floats
for e in (embeddings if isinstance(embeddings[0], list) else [embeddings]):
uid = str(uuid.uuid4())
col.add(ids=[uid], embeddings=[e], metadatas=[{"identity": ident, "path": str(img_path)}])
count += 1
registered_identities.add(ident) # πŸ‘ˆ guardamos el nombre
# Mensajes finales
print("Ha acabado de crear la base de datos.")
print(f"Total de embeddings guardados: {count}")
print("Identidades registradas:")
for name in sorted(registered_identities):
print(f" - {name}")
log.info("index_faces => %d embeddings", count)
return count
# ===================== 2) IDENTITY FEATURES CSV ==============================
def aggregate_face_attributes(faces_dir: Path, out_csv: Path) -> int:
"""
Procesa un directorio de caras por identidad y genera un CSV con edad y gΓ©nero.
Usa FaceAnalyzer para extraer atributos.
"""
# Inicializa el analizador
# FaceAnalyzer already imported at module level
analyzer = FaceAnalyzer()
rows: List[Dict[str, Any]] = []
faces_dir = Path(faces_dir)
if not faces_dir.exists() or not faces_dir.is_dir():
log.error("El directorio de caras no existe: %s", faces_dir)
return 0
def most_common(lst, default="unknown"):
return Counter(lst).most_common(1)[0][0] if lst else default
# Itera sobre cada identidad
for ident_dir in sorted(faces_dir.iterdir()):
if not ident_dir.is_dir():
continue
ident = ident_dir.name
attrs: List[Dict[str, Any]] = []
log.info("Procesando identidad: %s", ident)
for img_path in sorted(list_files(ident_dir, IMG_EXT)):
try:
data = analyzer.analyze_image(str(img_path))
if data:
attrs.append(data)
except Exception as e:
log.warning("Error procesando imagen %s: %s", img_path, e)
genders = [a.get("gender", "unknown") for a in attrs]
ages = [a.get("age", "unknown") for a in attrs]
# Contexto opcional por identidad
context_txt = (faces_dir.parent / "context" / f"{ident}.txt")
identity_context = context_txt.read_text(encoding="utf-8").strip() if context_txt.exists() else ""
rows.append({
"identity": ident,
"samples": len(attrs),
"gender": most_common(genders),
"age_bucket": most_common(ages),
"identity_context": identity_context,
})
log.info("Procesados %d atributos para %s", len(attrs), ident)
# Guardar CSV
out_csv.parent.mkdir(parents=True, exist_ok=True)
with out_csv.open("w", newline='', encoding="utf-8") as f:
fieldnames = list(rows[0].keys()) if rows else ["identity", "identity_context"]
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
log.info("CSV generado correctamente: %s", out_csv)
return len(rows)
# ============================ 3) INDEX VOICES =================================
from pydub import AudioSegment # agregar al inicio de tu archivo junto a otros imports
def build_voices_index(voices_dir: Path, client, collection_name: str = "index_voices", drop: bool = True) -> int:
if collection_name in [c.name for c in client.list_collections()] and drop:
client.delete_collection(name=collection_name)
col = client.get_or_create_collection(name=collection_name)
ve = VoiceEmbedder()
count = 0
for ident_dir in sorted(Path(voices_dir).iterdir() if Path(voices_dir).exists() else []):
if not ident_dir.is_dir():
continue
ident = ident_dir.name
for wav_path in list_files(ident_dir, AUD_EXT):
# Intentar embed directamente
try:
emb = ve.embed(wav_path)
except Exception as e:
log.warning("Error leyendo audio %s: %s. Intentando reconvertir...", wav_path, e)
# ReconversiΓ³n automΓ‘tica a WAV PCM
try:
audio = AudioSegment.from_file(wav_path)
fixed_path = wav_path.with_name(wav_path.stem + "_fixed.wav")
audio.export(fixed_path, format="wav")
log.info("Archivo convertido a WAV compatible: %s", fixed_path)
emb = ve.embed(fixed_path)
except Exception as e2:
log.error("No se pudo generar embedding tras reconversiΓ³n para %s: %s", wav_path, e2)
continue # saltar este archivo
if emb is None:
log.warning("No voice embedding en %s", wav_path)
continue
uid = str(uuid.uuid4())
col.add(ids=[uid], embeddings=[emb], metadatas=[{"identity": ident, "path": str(wav_path)}])
count += 1
log.info("index_voices => %d embeddings", count)
return count
# ============================ 4) SCENARIOS ==================================
@dataclass
class VisionClient:
provider: str = "none" # placeholder to plug in an LLM if desired
def describe(self, image_path: str, prompt: str) -> str:
return (f"Automatic description (placeholder) for {Path(image_path).name}. "
f"{prompt}")
class TextEmbedder:
"""Text embeddings with Sentence-Transformers if available; fallback to TF-IDF."""
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
self.kind = "tfidf"; self.model = None; self.vectorizer = None
try:
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer(model_name)
self.kind = "sbert"
except Exception:
from sklearn.feature_extraction.text import TfidfVectorizer
self.vectorizer = TfidfVectorizer(max_features=768)
def fit(self, texts: List[str]):
if self.vectorizer is not None:
self.vectorizer.fit(texts)
def encode(self, texts: List[str]) -> List[List[float]]:
if self.model is not None:
arr = self.model.encode(texts, convert_to_numpy=True)
return arr.astype(float).tolist()
X = self.vectorizer.transform(texts) if self.vectorizer is not None else None
return (X.toarray().astype(float).tolist() if X is not None else [[0.0]*128 for _ in texts])
def build_scenarios_descriptions(scenarios_dir: Path, out_csv: Path, vision: VisionClient,
sample_per_scenario: int = 12) -> Tuple[int, List[Dict[str, Any]]]:
rows: List[Dict[str, Any]] = []
for scen_dir in sorted(Path(scenarios_dir).iterdir() if Path(scenarios_dir).exists() else []):
if not scen_dir.is_dir():
continue
scen = scen_dir.name
descs: List[str] = []
imgs = list_files(scen_dir, IMG_EXT)[:sample_per_scenario]
for img in imgs:
d = vision.describe(str(img), prompt="Describe location, time period, lighting, and atmosphere without mentioning people or time of day.")
if d:
descs.append(d)
if not descs:
descs = [f"Scenario {scen} (no images)"]
rows.append({"scenario": scen, "descriptions": " \n".join(descs)})
out_csv.parent.mkdir(parents=True, exist_ok=True)
with out_csv.open("w", newline='', encoding="utf-8") as f:
w = csv.DictWriter(f, fieldnames=["scenario", "descriptions"])
w.writeheader(); w.writerows(rows)
log.info("scenarios_descriptions => %s", out_csv)
return len(rows), rows
def build_scenarios_index(client, rows: List[Dict[str, Any]], embedder: TextEmbedder,
collection_name: str = "index_scenarios", drop: bool = True) -> int:
texts = [r["descriptions"] for r in rows]
embedder.fit(texts)
embs = embedder.encode(texts)
if collection_name in [c.name for c in client.list_collections()] and drop:
client.delete_collection(name=collection_name)
col = client.get_or_create_collection(name=collection_name)
for r, e in zip(rows, embs):
col.add(ids=[r["scenario"]], embeddings=[e], metadatas=[{"scenario": r["scenario"]}])
log.info("index_scenarios => %d descriptions", len(rows))
return len(rows)
# ================================ CLI ========================================
def main():
ap = argparse.ArgumentParser(description="Veureu β€” Build identity/scenario indices and CSVs")
ap.add_argument('--faces_dir', default='identities/faces', help='Root directory of face images per identity')
ap.add_argument('--voices_dir', default='identities/voices', help='Root directory of voice clips per identity')
ap.add_argument('--scenarios_dir', default='scenarios', help='Root directory of scenario folders with images')
ap.add_argument('--db_dir', default='chroma_db', help='ChromaDB persistence directory')
ap.add_argument('--out_dir', default='results', help='Output directory for CSVs')
ap.add_argument('--drop_collections', action='store_true', help='Delete collections if they exist before rebuilding')
ap.add_argument('--deepface_model', default='Facenet512', help='DeepFace model to use as fallback')
ap.add_argument('--scenario_samples', type=int, default=12, help='Number of images per scenario to describe')
args = ap.parse_args()
faces_dir = Path(args.faces_dir)
voices_dir = Path(args.voices_dir)
print(voices_dir)
scenarios_dir = Path(args.scenarios_dir)
out_dir = Path(args.out_dir); out_dir.mkdir(parents=True, exist_ok=True)
client = ensure_chroma(Path(args.db_dir))
# 1) Faces index
build_faces_index(faces_dir, client, collection_name="index_faces", deepface_model=args.deepface_model, drop=args.drop_collections)
# 2) Identity features CSV
#id_csv = out_dir / 'identity_features.csv'
#aggregate_face_attributes(faces_dir, id_csv)
# 3) Voices index
build_voices_index(voices_dir, client, collection_name="index_voices", drop=args.drop_collections)
# 4) Scenarios descriptions
#vision = VisionClient()
#scen_csv = out_dir / 'scenarios_descriptions.csv'
#_, scen_rows = build_scenarios_descriptions(scenarios_dir, scen_csv, vision, sample_per_scenario=args.scenario_samples)
# 5) Scenarios index
#embedder = TextEmbedder()
#build_scenarios_index(client, scen_rows, embedder, collection_name="index_scenarios", drop=args.drop_collections)
log.info("βœ… Identity encoding completed.")
if __name__ == '__main__' and '--video' not in sys.argv:
main()