|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations |
|
|
import argparse |
|
|
import csv |
|
|
import logging |
|
|
import sys |
|
|
import uuid |
|
|
from dataclasses import dataclass |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, Iterable, List, Optional, Tuple |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
log = logging.getLogger("identity_encoding") |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
import chromadb |
|
|
except Exception as e: |
|
|
chromadb = None |
|
|
log.error("No se pudo importar chromadb: %s", e) |
|
|
|
|
|
from vision_tools import FaceAnalyzer |
|
|
from collections import Counter |
|
|
|
|
|
|
|
|
from audio_tools import VoiceEmbedder |
|
|
from vision_tools import FaceOfImageEmbedding |
|
|
|
|
|
|
|
|
try: |
|
|
import numpy as np |
|
|
except Exception: |
|
|
np = None |
|
|
|
|
|
|
|
|
IMG_EXT = {".jpg", ".jpeg", ".png", ".bmp", ".webp"} |
|
|
AUD_EXT = {".wav", ".mp3", ".flac", ".m4a", ".ogg"} |
|
|
|
|
|
|
|
|
def list_files(root: Path, exts: Iterable[str]) -> List[Path]: |
|
|
root = Path(root) |
|
|
if not root.exists(): |
|
|
return [] |
|
|
return [p for p in root.rglob('*') if p.suffix.lower() in exts] |
|
|
|
|
|
|
|
|
def ensure_chroma(db_dir: Path): |
|
|
if chromadb is None: |
|
|
raise RuntimeError("chromadb no instalado. pip install chromadb") |
|
|
db_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
client = chromadb.PersistentClient(path=str(db_dir)) |
|
|
return client |
|
|
|
|
|
|
|
|
def build_faces_index(faces_dir: Path, client, collection_name: str = "index_faces", |
|
|
deepface_model: str = 'Facenet512', drop: bool = True) -> int: |
|
|
|
|
|
if collection_name in [c.name for c in client.list_collections()] and drop: |
|
|
client.delete_collection(name=collection_name) |
|
|
col = client.get_or_create_collection(name=collection_name) |
|
|
|
|
|
be = FaceOfImageEmbedding(deepface_model=deepface_model) |
|
|
count = 0 |
|
|
registered_identities = set() |
|
|
|
|
|
for ident_dir in sorted(Path(faces_dir).iterdir() if Path(faces_dir).exists() else []): |
|
|
if not ident_dir.is_dir(): |
|
|
continue |
|
|
ident = ident_dir.name |
|
|
for img_path in list_files(ident_dir, IMG_EXT): |
|
|
embeddings = be.encode_image(img_path) |
|
|
if embeddings is None: |
|
|
log.warning("No face embedding in %s", img_path) |
|
|
continue |
|
|
|
|
|
|
|
|
for e in (embeddings if isinstance(embeddings[0], list) else [embeddings]): |
|
|
uid = str(uuid.uuid4()) |
|
|
col.add(ids=[uid], embeddings=[e], metadatas=[{"identity": ident, "path": str(img_path)}]) |
|
|
count += 1 |
|
|
registered_identities.add(ident) |
|
|
|
|
|
|
|
|
print("Ha acabado de crear la base de datos.") |
|
|
print(f"Total de embeddings guardados: {count}") |
|
|
print("Identidades registradas:") |
|
|
for name in sorted(registered_identities): |
|
|
print(f" - {name}") |
|
|
|
|
|
log.info("index_faces => %d embeddings", count) |
|
|
return count |
|
|
|
|
|
|
|
|
|
|
|
def aggregate_face_attributes(faces_dir: Path, out_csv: Path) -> int: |
|
|
""" |
|
|
Procesa un directorio de caras por identidad y genera un CSV con edad y gΓ©nero. |
|
|
Usa FaceAnalyzer para extraer atributos. |
|
|
""" |
|
|
|
|
|
|
|
|
analyzer = FaceAnalyzer() |
|
|
|
|
|
rows: List[Dict[str, Any]] = [] |
|
|
|
|
|
faces_dir = Path(faces_dir) |
|
|
if not faces_dir.exists() or not faces_dir.is_dir(): |
|
|
log.error("El directorio de caras no existe: %s", faces_dir) |
|
|
return 0 |
|
|
|
|
|
def most_common(lst, default="unknown"): |
|
|
return Counter(lst).most_common(1)[0][0] if lst else default |
|
|
|
|
|
|
|
|
for ident_dir in sorted(faces_dir.iterdir()): |
|
|
if not ident_dir.is_dir(): |
|
|
continue |
|
|
ident = ident_dir.name |
|
|
attrs: List[Dict[str, Any]] = [] |
|
|
|
|
|
log.info("Procesando identidad: %s", ident) |
|
|
|
|
|
for img_path in sorted(list_files(ident_dir, IMG_EXT)): |
|
|
try: |
|
|
data = analyzer.analyze_image(str(img_path)) |
|
|
if data: |
|
|
attrs.append(data) |
|
|
except Exception as e: |
|
|
log.warning("Error procesando imagen %s: %s", img_path, e) |
|
|
|
|
|
genders = [a.get("gender", "unknown") for a in attrs] |
|
|
ages = [a.get("age", "unknown") for a in attrs] |
|
|
|
|
|
|
|
|
context_txt = (faces_dir.parent / "context" / f"{ident}.txt") |
|
|
identity_context = context_txt.read_text(encoding="utf-8").strip() if context_txt.exists() else "" |
|
|
|
|
|
rows.append({ |
|
|
"identity": ident, |
|
|
"samples": len(attrs), |
|
|
"gender": most_common(genders), |
|
|
"age_bucket": most_common(ages), |
|
|
"identity_context": identity_context, |
|
|
}) |
|
|
|
|
|
log.info("Procesados %d atributos para %s", len(attrs), ident) |
|
|
|
|
|
|
|
|
out_csv.parent.mkdir(parents=True, exist_ok=True) |
|
|
with out_csv.open("w", newline='', encoding="utf-8") as f: |
|
|
fieldnames = list(rows[0].keys()) if rows else ["identity", "identity_context"] |
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames) |
|
|
writer.writeheader() |
|
|
writer.writerows(rows) |
|
|
|
|
|
log.info("CSV generado correctamente: %s", out_csv) |
|
|
return len(rows) |
|
|
|
|
|
|
|
|
from pydub import AudioSegment |
|
|
|
|
|
def build_voices_index(voices_dir: Path, client, collection_name: str = "index_voices", drop: bool = True) -> int: |
|
|
if collection_name in [c.name for c in client.list_collections()] and drop: |
|
|
client.delete_collection(name=collection_name) |
|
|
col = client.get_or_create_collection(name=collection_name) |
|
|
|
|
|
ve = VoiceEmbedder() |
|
|
count = 0 |
|
|
|
|
|
for ident_dir in sorted(Path(voices_dir).iterdir() if Path(voices_dir).exists() else []): |
|
|
if not ident_dir.is_dir(): |
|
|
continue |
|
|
ident = ident_dir.name |
|
|
for wav_path in list_files(ident_dir, AUD_EXT): |
|
|
|
|
|
try: |
|
|
emb = ve.embed(wav_path) |
|
|
except Exception as e: |
|
|
log.warning("Error leyendo audio %s: %s. Intentando reconvertir...", wav_path, e) |
|
|
|
|
|
try: |
|
|
audio = AudioSegment.from_file(wav_path) |
|
|
fixed_path = wav_path.with_name(wav_path.stem + "_fixed.wav") |
|
|
audio.export(fixed_path, format="wav") |
|
|
log.info("Archivo convertido a WAV compatible: %s", fixed_path) |
|
|
emb = ve.embed(fixed_path) |
|
|
except Exception as e2: |
|
|
log.error("No se pudo generar embedding tras reconversiΓ³n para %s: %s", wav_path, e2) |
|
|
continue |
|
|
if emb is None: |
|
|
log.warning("No voice embedding en %s", wav_path) |
|
|
continue |
|
|
uid = str(uuid.uuid4()) |
|
|
col.add(ids=[uid], embeddings=[emb], metadatas=[{"identity": ident, "path": str(wav_path)}]) |
|
|
count += 1 |
|
|
|
|
|
log.info("index_voices => %d embeddings", count) |
|
|
return count |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class VisionClient: |
|
|
provider: str = "none" |
|
|
|
|
|
def describe(self, image_path: str, prompt: str) -> str: |
|
|
return (f"Automatic description (placeholder) for {Path(image_path).name}. " |
|
|
f"{prompt}") |
|
|
|
|
|
|
|
|
class TextEmbedder: |
|
|
"""Text embeddings with Sentence-Transformers if available; fallback to TF-IDF.""" |
|
|
def __init__(self, model_name: str = "all-MiniLM-L6-v2"): |
|
|
self.kind = "tfidf"; self.model = None; self.vectorizer = None |
|
|
try: |
|
|
from sentence_transformers import SentenceTransformer |
|
|
self.model = SentenceTransformer(model_name) |
|
|
self.kind = "sbert" |
|
|
except Exception: |
|
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
|
self.vectorizer = TfidfVectorizer(max_features=768) |
|
|
|
|
|
def fit(self, texts: List[str]): |
|
|
if self.vectorizer is not None: |
|
|
self.vectorizer.fit(texts) |
|
|
|
|
|
def encode(self, texts: List[str]) -> List[List[float]]: |
|
|
if self.model is not None: |
|
|
arr = self.model.encode(texts, convert_to_numpy=True) |
|
|
return arr.astype(float).tolist() |
|
|
X = self.vectorizer.transform(texts) if self.vectorizer is not None else None |
|
|
return (X.toarray().astype(float).tolist() if X is not None else [[0.0]*128 for _ in texts]) |
|
|
|
|
|
|
|
|
def build_scenarios_descriptions(scenarios_dir: Path, out_csv: Path, vision: VisionClient, |
|
|
sample_per_scenario: int = 12) -> Tuple[int, List[Dict[str, Any]]]: |
|
|
rows: List[Dict[str, Any]] = [] |
|
|
for scen_dir in sorted(Path(scenarios_dir).iterdir() if Path(scenarios_dir).exists() else []): |
|
|
if not scen_dir.is_dir(): |
|
|
continue |
|
|
scen = scen_dir.name |
|
|
descs: List[str] = [] |
|
|
imgs = list_files(scen_dir, IMG_EXT)[:sample_per_scenario] |
|
|
for img in imgs: |
|
|
d = vision.describe(str(img), prompt="Describe location, time period, lighting, and atmosphere without mentioning people or time of day.") |
|
|
if d: |
|
|
descs.append(d) |
|
|
if not descs: |
|
|
descs = [f"Scenario {scen} (no images)"] |
|
|
rows.append({"scenario": scen, "descriptions": " \n".join(descs)}) |
|
|
|
|
|
out_csv.parent.mkdir(parents=True, exist_ok=True) |
|
|
with out_csv.open("w", newline='', encoding="utf-8") as f: |
|
|
w = csv.DictWriter(f, fieldnames=["scenario", "descriptions"]) |
|
|
w.writeheader(); w.writerows(rows) |
|
|
log.info("scenarios_descriptions => %s", out_csv) |
|
|
return len(rows), rows |
|
|
|
|
|
|
|
|
def build_scenarios_index(client, rows: List[Dict[str, Any]], embedder: TextEmbedder, |
|
|
collection_name: str = "index_scenarios", drop: bool = True) -> int: |
|
|
texts = [r["descriptions"] for r in rows] |
|
|
embedder.fit(texts) |
|
|
embs = embedder.encode(texts) |
|
|
|
|
|
if collection_name in [c.name for c in client.list_collections()] and drop: |
|
|
client.delete_collection(name=collection_name) |
|
|
col = client.get_or_create_collection(name=collection_name) |
|
|
|
|
|
for r, e in zip(rows, embs): |
|
|
col.add(ids=[r["scenario"]], embeddings=[e], metadatas=[{"scenario": r["scenario"]}]) |
|
|
log.info("index_scenarios => %d descriptions", len(rows)) |
|
|
return len(rows) |
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
|
ap = argparse.ArgumentParser(description="Veureu β Build identity/scenario indices and CSVs") |
|
|
ap.add_argument('--faces_dir', default='identities/faces', help='Root directory of face images per identity') |
|
|
ap.add_argument('--voices_dir', default='identities/voices', help='Root directory of voice clips per identity') |
|
|
ap.add_argument('--scenarios_dir', default='scenarios', help='Root directory of scenario folders with images') |
|
|
ap.add_argument('--db_dir', default='chroma_db', help='ChromaDB persistence directory') |
|
|
ap.add_argument('--out_dir', default='results', help='Output directory for CSVs') |
|
|
ap.add_argument('--drop_collections', action='store_true', help='Delete collections if they exist before rebuilding') |
|
|
ap.add_argument('--deepface_model', default='Facenet512', help='DeepFace model to use as fallback') |
|
|
ap.add_argument('--scenario_samples', type=int, default=12, help='Number of images per scenario to describe') |
|
|
|
|
|
args = ap.parse_args() |
|
|
|
|
|
faces_dir = Path(args.faces_dir) |
|
|
voices_dir = Path(args.voices_dir) |
|
|
print(voices_dir) |
|
|
scenarios_dir = Path(args.scenarios_dir) |
|
|
out_dir = Path(args.out_dir); out_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
client = ensure_chroma(Path(args.db_dir)) |
|
|
|
|
|
|
|
|
build_faces_index(faces_dir, client, collection_name="index_faces", deepface_model=args.deepface_model, drop=args.drop_collections) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
build_voices_index(voices_dir, client, collection_name="index_voices", drop=args.drop_collections) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
log.info("β
Identity encoding completed.") |
|
|
|
|
|
|
|
|
if __name__ == '__main__' and '--video' not in sys.argv: |
|
|
main() |
|
|
|