Spaces:
Runtime error
Runtime error
| import streamlit as st | |
| import json | |
| import faiss | |
| import numpy as np | |
| from sentence_transformers import SentenceTransformer | |
| import base64 | |
| from PIL import Image | |
| import io | |
| import cv2 | |
| from insightface.app import FaceAnalysis | |
| from moviepy.editor import VideoFileClip | |
| from sklearn.cluster import DBSCAN | |
| from sklearn.decomposition import PCA | |
| import plotly.graph_objs as go | |
| from collections import defaultdict | |
| # Load models | |
| def load_models(): | |
| text_model = SentenceTransformer("all-MiniLM-L6-v2") | |
| image_model = SentenceTransformer("clip-ViT-B-32") | |
| face_app = FaceAnalysis(providers=['CPUExecutionProvider']) | |
| face_app.prepare(ctx_id=0, det_size=(640, 640)) | |
| return text_model, image_model, face_app | |
| text_model, image_model, face_app = load_models() | |
| # Load data | |
| def load_data(video_id, output_dir): | |
| with open(f"{output_dir}/{video_id}_summary.json", "r") as f: | |
| summary = json.load(f) | |
| with open(f"{output_dir}/{video_id}_transcription.json", "r") as f: | |
| transcription = json.load(f) | |
| with open(f"{output_dir}/{video_id}_text_metadata.json", "r") as f: | |
| text_metadata = json.load(f) | |
| with open(f"{output_dir}/{video_id}_image_metadata.json", "r") as f: | |
| image_metadata = json.load(f) | |
| with open(f"{output_dir}/{video_id}_face_metadata.json", "r") as f: | |
| face_metadata = json.load(f) | |
| face_index = faiss.read_index(f"{output_dir}/{video_id}_face_index.faiss") | |
| return summary, transcription, text_metadata, image_metadata, face_metadata, face_index | |
| video_id = "IMFUOexuEXw" | |
| output_dir = "./" | |
| video_path = "avengers_interview.mp4" | |
| summary, transcription, text_metadata, image_metadata, face_metadata, face_index = load_data(video_id, output_dir) | |
| # Load FAISS indexes | |
| def load_indexes(video_id, output_dir): | |
| text_index = faiss.read_index(f"{output_dir}/{video_id}_text_index.faiss") | |
| image_index = faiss.read_index(f"{output_dir}/{video_id}_image_index.faiss") | |
| return text_index, image_index | |
| text_index, image_index = load_indexes(video_id, output_dir) | |
| def create_comprehensive_face_summary(face_index, face_metadata, eps=0.5, min_samples=3, top_k=5): | |
| # Extract face embeddings | |
| face_embeddings = face_index.reconstruct_n(0, face_index.ntotal) | |
| # Normalize embeddings | |
| face_embeddings = face_embeddings / np.linalg.norm(face_embeddings, axis=1)[:, np.newaxis] | |
| # Perform DBSCAN clustering | |
| clustering = DBSCAN(eps=eps, min_samples=min_samples, metric='cosine').fit(face_embeddings) | |
| # Group faces by cluster | |
| face_clusters = defaultdict(list) | |
| for i, label in enumerate(clustering.labels_): | |
| face_clusters[label].append(face_metadata[i]) | |
| # Sort clusters by size | |
| sorted_clusters = sorted(face_clusters.items(), key=lambda x: len(x[1]), reverse=True) | |
| all_faces_summary = [] | |
| prominent_faces = [] | |
| for i, (label, cluster) in enumerate(sorted_clusters): | |
| if label != -1: # Ignore noise points | |
| # Collect all appearances | |
| appearances = [ | |
| { | |
| 'start': face['start'], | |
| 'end': face['end'], | |
| 'size_ratio': face.get('size_ratio', 1.0) # Use 1.0 as default if size_ratio is not present | |
| } | |
| for face in cluster | |
| ] | |
| # Sort appearances by start time | |
| appearances.sort(key=lambda x: x['start']) | |
| # Select representative face (e.g., largest face in the cluster) | |
| representative_face = max(cluster, key=lambda f: f.get('size_ratio', 1.0)) | |
| face_summary = { | |
| "id": f"face_{i}", | |
| "cluster_id": f"cluster_{label}", | |
| "bbox": representative_face.get('bbox', []), | |
| "embedding": representative_face.get('embedding', []), | |
| "appearances": appearances, | |
| "total_appearances": len(appearances), | |
| "total_screen_time": sum(app['end'] - app['start'] for app in appearances), | |
| "first_appearance": appearances[0]['start'], | |
| "last_appearance": appearances[-1]['end'], | |
| "thumbnail": representative_face.get('thumbnail', '') | |
| } | |
| all_faces_summary.append(face_summary) | |
| if i < top_k: | |
| prominent_faces.append(face_summary) | |
| return all_faces_summary, prominent_faces, face_embeddings, clustering.labels_ | |
| # Usage in the main Streamlit app: | |
| all_faces_summary, prominent_faces, face_embeddings, face_labels = create_comprehensive_face_summary(face_index, face_metadata) | |
| # Face cluster visualization | |
| # Update the face cluster visualization function | |
| def plot_face_clusters_interactive(face_embeddings, face_labels, all_faces_summary, prominent_faces): | |
| pca = PCA(n_components=3) | |
| embeddings_3d = pca.fit_transform(face_embeddings) | |
| unique_labels = set(face_labels) | |
| colors = [f'rgb({int(r*255)},{int(g*255)},{int(b*255)})' | |
| for r, g, b, _ in plt.cm.rainbow(np.linspace(0, 1, len(unique_labels)))] | |
| traces = [] | |
| for label, color in zip(unique_labels, colors): | |
| if label == -1: | |
| continue # Skip noise points | |
| cluster_points = embeddings_3d[face_labels == label] | |
| cluster_faces = [face for face in all_faces_summary if face['cluster_id'] == f'cluster_{label}'] | |
| hover_text = [ | |
| f"Cluster {label}<br>" | |
| f"Time: {face['appearances'][0]['start']:.2f}s - {face['appearances'][-1]['end']:.2f}s<br>" | |
| f"Appearances: {face['total_appearances']}" | |
| for face in cluster_faces | |
| ] | |
| trace = go.Scatter3d( | |
| x=cluster_points[:, 0], | |
| y=cluster_points[:, 1], | |
| z=cluster_points[:, 2], | |
| mode='markers', | |
| name=f'Cluster {label}', | |
| marker=dict(size=5, color=color, opacity=0.8), | |
| text=hover_text, | |
| hoverinfo='text' | |
| ) | |
| traces.append(trace) | |
| # Add markers for prominent faces | |
| prominent_points = [embeddings_3d[face_labels == int(face['cluster_id'].split('_')[1])][0] for face in prominent_faces] | |
| prominent_trace = go.Scatter3d( | |
| x=[p[0] for p in prominent_points], | |
| y=[p[1] for p in prominent_points], | |
| z=[p[2] for p in prominent_points], | |
| mode='markers', | |
| name='Prominent Faces', | |
| marker=dict(size=10, color='red', symbol='star'), | |
| text=[f"Prominent Face<br>Cluster {face['cluster_id']}" for face in prominent_faces], | |
| hoverinfo='text' | |
| ) | |
| traces.append(prominent_trace) | |
| layout = go.Layout( | |
| title='Face Clusters Visualization', | |
| scene=dict(xaxis_title='PCA 1', yaxis_title='PCA 2', zaxis_title='PCA 3'), | |
| margin=dict(r=0, b=0, l=0, t=40) | |
| ) | |
| fig = go.Figure(data=traces, layout=layout) | |
| return fig | |
| # Search functions | |
| def combined_search(query, text_index, image_index, text_metadata, image_metadata, text_model, image_model, n_results=5): | |
| if isinstance(query, str): | |
| text_vector = text_model.encode([query], convert_to_tensor=True).cpu().numpy() | |
| image_vector = image_model.encode([query], convert_to_tensor=True).cpu().numpy() | |
| else: # Assume it's an image | |
| image_vector = image_model.encode(query, convert_to_tensor=True).cpu().numpy() | |
| text_vector = image_vector # Use the same vector for text search in this case | |
| text_D, text_I = text_index.search(text_vector, n_results) | |
| image_D, image_I = image_index.search(image_vector, n_results) | |
| text_results = [{'data': text_metadata[i], 'distance': d, 'type': 'text'} for i, d in zip(text_I[0], text_D[0])] | |
| image_results = [{'data': image_metadata[i], 'distance': d, 'type': 'image'} for i, d in zip(image_I[0], image_D[0])] | |
| combined_results = sorted(text_results + image_results, key=lambda x: x['distance']) | |
| return combined_results[:n_results] | |
| def face_search(face_embedding, face_index, face_metadata, n_results=5): | |
| D, I = face_index.search(np.array([face_embedding]), n_results) | |
| results = [face_metadata[i] for i in I[0]] | |
| return results, D[0] | |
| def detect_and_embed_face(image, face_app): | |
| img_array = np.array(image) | |
| faces = face_app.get(img_array) | |
| if len(faces) == 0: | |
| return None | |
| largest_face = max(faces, key=lambda x: (x.bbox[2] - x.bbox[0]) * (x.bbox[3] - x.bbox[1])) | |
| return largest_face.embedding | |
| def create_video_clip(video_path, start_time, end_time, output_path): | |
| with VideoFileClip(video_path) as video: | |
| new_clip = video.subclip(start_time, end_time) | |
| new_clip.write_videofile(output_path, codec="libx264", audio_codec="aac") | |
| return output_path | |
| # Streamlit UI | |
| st.title("Video Analysis Dashboard") | |
| # Sidebar with full video and scrollable transcript | |
| st.sidebar.header("Full Video") | |
| st.sidebar.video(video_path) | |
| st.sidebar.header("Video Transcript") | |
| transcript_text = transcription['transcription'] | |
| st.sidebar.text_area("Full Transcript", transcript_text, height=300) | |
| # Main content | |
| st.header("Video Summary") | |
| # Face Clusters | |
| st.subheader("Prominent Face Clusters") | |
| for face in prominent_faces: # Use prominent_faces instead of face_summary | |
| st.write(f"Face Cluster {face['cluster_id']}:") | |
| st.write(f" Total appearances: {face['total_appearances']}") | |
| st.write(f" Total screen time: {face['total_screen_time']:.2f} seconds") | |
| st.write(f" First appearance: {face['first_appearance']:.2f} seconds") | |
| st.write(f" Last appearance: {face['last_appearance']:.2f} seconds") | |
| st.write(f" Timeline: {len(face['appearances'])} appearances") | |
| st.write(" First 5 appearances:") | |
| for app in face['appearances'][:5]: | |
| st.write(f" {app['start']:.2f}s - {app['end']:.2f}s") | |
| if face['thumbnail']: | |
| image = Image.open(io.BytesIO(base64.b64decode(face['thumbnail']))) | |
| st.image(image, caption=f"Representative face for {face['cluster_id']}", width=100) | |
| st.write("---") | |
| # Face Cluster Visualization | |
| st.subheader("Face Cluster Visualization") | |
| fig = plot_face_clusters_interactive(face_embeddings, face_labels, all_faces_summary, prominent_faces) | |
| st.plotly_chart(fig) | |
| # Themes | |
| st.subheader("Themes") | |
| for theme in summary['themes']: | |
| st.write(f"Theme ID: {theme['id']}, Keywords: {', '.join(theme['keywords'])}") | |
| # Search functionality | |
| st.header("Search") | |
| search_type = st.selectbox("Select search type", ["Combined", "Face"]) | |
| if search_type == "Combined": | |
| search_method = st.radio("Choose search method", ["Text", "Image"]) | |
| if search_method == "Text": | |
| query = st.text_input("Enter your search query") | |
| if st.button("Search"): | |
| results = combined_search(query, text_index, image_index, text_metadata, image_metadata, text_model, image_model) | |
| st.subheader("Search Results") | |
| for result in results: | |
| st.write(f"Type: {result['type']}, Time: {result['data']['start']:.2f}s - {result['data']['end']:.2f}s, Distance: {result['distance']:.4f}") | |
| if 'text' in result['data']: | |
| st.write(f"Text: {result['data']['text']}") | |
| clip_path = create_video_clip(video_path, result['data']['start'], result['data']['end'], f"temp_clip_{result['data']['start']}.mp4") | |
| st.video(clip_path) | |
| st.write("---") | |
| else: | |
| uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "jpeg", "png"]) | |
| if uploaded_file is not None: | |
| image = Image.open(uploaded_file) | |
| st.image(image, caption="Uploaded Image", use_column_width=True) | |
| if st.button("Search"): | |
| results = combined_search(image, text_index, image_index, text_metadata, image_metadata, text_model, image_model) | |
| st.subheader("Image Search Results") | |
| for result in results: | |
| st.write(f"Type: {result['type']}, Time: {result['data']['start']:.2f}s - {result['data']['end']:.2f}s, Distance: {result['distance']:.4f}") | |
| clip_path = create_video_clip(video_path, result['data']['start'], result['data']['end'], f"temp_clip_{result['data']['start']}.mp4") | |
| st.video(clip_path) | |
| st.write("---") | |
| elif search_type == "Face": | |
| face_search_type = st.radio("Choose face search method", ["Select from clusters", "Upload image"]) | |
| if face_search_type == "Select from clusters": | |
| cluster_id = st.selectbox("Select a face cluster", [cluster['cluster_id'] for cluster in face_summary]) | |
| if st.button("Search"): | |
| selected_cluster = next(cluster for cluster in face_summary if cluster['cluster_id'] == cluster_id) | |
| st.subheader("Face Cluster Search Results") | |
| for appearance in selected_cluster['appearances'][:5]: # Show first 5 appearances | |
| st.write(f"Time: {appearance['start']:.2f}s - {appearance['end']:.2f}s") | |
| clip_path = create_video_clip(video_path, appearance['start'], appearance['end'], f"temp_face_clip_{appearance['start']}.mp4") | |
| st.video(clip_path) | |
| st.write("---") | |
| else: | |
| uploaded_file = st.file_uploader("Choose a face image...", type=["jpg", "jpeg", "png"]) | |
| if uploaded_file is not None: | |
| image = Image.open(uploaded_file) | |
| st.image(image, caption="Uploaded Image", use_column_width=True) | |
| if st.button("Search"): | |
| face_embedding = detect_and_embed_face(image, face_app) | |
| if face_embedding is not None: | |
| face_results, face_distances = face_search(face_embedding, face_index, face_metadata) | |
| st.subheader("Face Search Results") | |
| for result, distance in zip(face_results, face_distances): | |
| st.write(f"Time: {result['start']:.2f}s - {result['end']:.2f}s, Distance: {distance:.4f}") | |
| clip_path = create_video_clip(video_path, result['start'], result['end'], f"temp_face_clip_{result['start']}.mp4") | |
| st.video(clip_path) | |
| st.write("---") | |
| else: | |
| st.error("No face detected in the uploaded image. Please try another image.") |