""" Madrid Content Analyzer - Hugging Face Spaces Version Main Gradio application This app fetches content from Madrid City Council sources, analyzes language clarity using Aclarador, and displays results in an interactive dashboard. Free hosting on Hugging Face Spaces! """ import gradio as gr import pandas as pd import plotly.express as px import plotly.graph_objects as go from datetime import datetime, timedelta import logging from apscheduler.schedulers.background import BackgroundScheduler # Import our modules from config.database import init_storage, CONTENT_ITEMS_PATH, CLARITY_ANALYSES_PATH, FETCH_LOGS_PATH, get_sources from storage.repository import ContentRepository from schedulers.background_tasks import fetch_and_analyze_content from utils.logger import setup_logging # Setup setup_logging() logger = logging.getLogger(__name__) # Initialize storage (parquet files) init_storage() # Start background scheduler scheduler = BackgroundScheduler() scheduler.add_job( fetch_and_analyze_content, 'interval', hours=6, # Fetch every 6 hours id='content_fetch', replace_existing=True ) scheduler.start() logger.info("Background scheduler started - fetching every 6 hours") # Initialize repository repo = ContentRepository() # ============================================ # Dashboard Functions # ============================================ def get_dashboard_stats(): """Get overall statistics for dashboard""" try: stats = repo.get_statistics() return { "📊 Total Items": f"{stats.get('total_items', 0):,}", "✅ Analyzed": f"{stats.get('analyzed_items', 0):,}", "📈 Avg Clarity Score": f"{stats.get('avg_clarity', 0):.1f}/100", "🕐 Last Fetch": stats.get('last_fetch', 'Never'), "📅 Date Range": f"{stats.get('date_range', 'N/A')}" } except Exception as e: logger.error(f"Error getting stats: {e}") return {"Error": str(e)} def get_clarity_distribution(): """Get clarity score distribution chart""" try: # Use pandas directly to avoid DuckDB segfaults df_analyses = pd.read_parquet(CLARITY_ANALYSES_PATH) if df_analyses.empty: return None # Create score ranges def score_to_range(score): if score < 30: return '0-29 (Poor)' elif score < 50: return '30-49 (Fair)' elif score < 70: return '50-69 (Good)' elif score < 90: return '70-89 (Very Good)' else: return '90-100 (Excellent)' df_analyses['score_range'] = df_analyses['overall_score'].apply(score_to_range) df = df_analyses.groupby('score_range').size().reset_index(name='count') fig = px.bar( df, x='score_range', y='count', title='Clarity Score Distribution', labels={'score_range': 'Score Range', 'count': 'Number of Items'}, color='count', color_continuous_scale='RdYlGn' ) return fig except Exception as e: logger.error(f"Error creating distribution chart: {e}") import traceback traceback.print_exc() return None def get_content_timeline(): """Get content published over time""" try: # Use pandas directly to avoid DuckDB segfaults df_content = pd.read_parquet(CONTENT_ITEMS_PATH) df_analyses = pd.read_parquet(CLARITY_ANALYSES_PATH) # Merge df = df_content.merge(df_analyses[['content_hash', 'overall_score']], on='content_hash', how='left') # Filter last 30 days df['published_at'] = pd.to_datetime(df['published_at']) cutoff = datetime.utcnow() - timedelta(days=30) df = df[df['published_at'] >= cutoff] if df.empty: return None # Group by date df['date'] = df['published_at'].dt.date grouped = df.groupby('date').agg({ 'content_hash': 'count', 'overall_score': 'mean' }).reset_index() grouped.columns = ['date', 'count', 'avg_score'] fig = go.Figure() # Add content count line fig.add_trace(go.Scatter( x=grouped['date'], y=grouped['count'], name='Items Published', yaxis='y1', line=dict(color='blue') )) # Add average clarity line fig.add_trace(go.Scatter( x=grouped['date'], y=grouped['avg_score'], name='Avg Clarity Score', yaxis='y2', line=dict(color='green') )) fig.update_layout( title='Content Published Over Time (Last 30 Days)', xaxis=dict(title='Date'), yaxis=dict(title='Items Published', side='left'), yaxis2=dict(title='Avg Clarity Score', side='right', overlaying='y'), hovermode='x unified' ) return fig except Exception as e: logger.error(f"Error creating timeline: {e}") import traceback traceback.print_exc() return None def get_category_scores(): """Get average scores by category""" try: # Use pandas directly to avoid DuckDB segfaults df_content = pd.read_parquet(CONTENT_ITEMS_PATH) df_analyses = pd.read_parquet(CLARITY_ANALYSES_PATH) # Merge df = df_content.merge(df_analyses[['content_hash', 'overall_score']], on='content_hash', how='left') # Filter out empty categories df = df[(df['category'].notna()) & (df['category'] != '')] if df.empty: return None # Group by category grouped = df.groupby('category').agg({ 'content_hash': 'count', 'overall_score': 'mean' }).reset_index() grouped.columns = ['category', 'count', 'avg_score'] grouped = grouped.sort_values('avg_score', ascending=False) fig = px.bar( grouped, y='category', x='avg_score', orientation='h', title='Average Clarity Score by Category', labels={'category': 'Category', 'avg_score': 'Average Score'}, color='avg_score', color_continuous_scale='RdYlGn', text='count' ) fig.update_traces(texttemplate='%{text} items', textposition='outside') return fig except Exception as e: logger.error(f"Error creating category chart: {e}") import traceback traceback.print_exc() return None # ============================================ # Content Browser Functions # ============================================ def search_content(days=7, category="All", min_clarity=0, max_clarity=100, search_text=""): """Search and filter content""" try: filters = { 'days': days, 'min_clarity': min_clarity, 'max_clarity': max_clarity } if category != "All": filters['category'] = category if search_text: filters['search_text'] = search_text results = repo.search_content(**filters) if not results: return pd.DataFrame({"Message": ["No results found"]}) # Format for display df = pd.DataFrame([ { 'Title': r['title'][:80] + '...' if len(r['title']) > 80 else r['title'], 'Date': r['published_at'].strftime('%Y-%m-%d'), 'Category': r.get('category', 'N/A'), 'Clarity': f"{r.get('clarity_score', 0):.0f}", 'URL': r['url'] } for r in results ]) return df except Exception as e: logger.error(f"Error searching content: {e}") return pd.DataFrame({"Error": [str(e)]}) def get_content_details(url): """Get detailed view of content item""" try: content = repo.get_content_by_url(url) if not content: return "Content not found" details = f""" # {content['title']} **Published**: {content['published_at'].strftime('%Y-%m-%d %H:%M')} **Category**: {content.get('category', 'N/A')} **URL**: {content['url']} ## Clarity Analysis - **Overall Score**: {content.get('clarity_score', 0):.1f}/100 - **Readability**: {content.get('readability_score', 0):.1f}/100 - **Complexity**: {content.get('complexity_score', 0):.1f}/100 ## Content Preview {content.get('content_text', 'N/A')[:500]}... ## Improvement Suggestions {chr(10).join(['- ' + s for s in content.get('suggestions', [])])} """ return details except Exception as e: logger.error(f"Error getting content details: {e}") return f"Error: {str(e)}" # ============================================ # Analytics Functions # ============================================ def get_low_clarity_items(threshold=50): """Get items below clarity threshold""" try: items = repo.get_low_clarity_items(threshold) if not items: return pd.DataFrame({"Message": ["No low clarity items found"]}) df = pd.DataFrame([ { 'Title': i['title'][:60] + '...' if len(i['title']) > 60 else i['title'], 'Score': f"{i['clarity_score']:.0f}", 'Date': i['published_at'].strftime('%Y-%m-%d'), 'Category': i.get('category', 'N/A'), 'Main Issues': ', '.join(i.get('issues', [])[:3]) } for i in items ]) return df except Exception as e: logger.error(f"Error getting low clarity items: {e}") return pd.DataFrame({"Error": [str(e)]}) def export_data(format='csv'): """Export data to file""" try: # Use pandas directly to avoid DuckDB segfaults df_content = pd.read_parquet(CONTENT_ITEMS_PATH) df_analyses = pd.read_parquet(CLARITY_ANALYSES_PATH) # Merge df = df_content.merge( df_analyses[['content_hash', 'overall_score', 'readability_score', 'complexity_score', 'jargon_count']], on='content_hash', how='left' ) # Select and rename columns df = df[['title', 'published_at', 'category', 'url', 'overall_score', 'readability_score', 'complexity_score', 'jargon_count']] df.columns = ['title', 'published_at', 'category', 'url', 'clarity_score', 'readability_score', 'complexity_score', 'jargon_count'] # Sort df = df.sort_values('published_at', ascending=False) # Save to file timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f"madrid_content_{timestamp}.{format}" if format == 'csv': df.to_csv(filename, index=False) elif format == 'json': df.to_json(filename, orient='records', date_format='iso') return f"✅ Data exported to {filename}" except Exception as e: logger.error(f"Error exporting data: {e}") return f"❌ Error: {str(e)}" # ============================================ # Settings Functions # ============================================ def trigger_manual_fetch(): """Manually trigger a content fetch""" try: logger.info("Manual fetch triggered") fetch_and_analyze_content() return "✅ Fetch completed! Refresh dashboard to see new data." except Exception as e: logger.error(f"Error in manual fetch: {e}") return f"❌ Error: {str(e)}" def get_database_stats(): """Get storage statistics""" try: import os from config.database import DATA_DIR # Count records in parquet files df_content = pd.read_parquet(CONTENT_ITEMS_PATH) df_analyses = pd.read_parquet(CLARITY_ANALYSES_PATH) df_logs = pd.read_parquet(FETCH_LOGS_PATH) sources = get_sources() stats = { 'Content Items': len(df_content), 'Analyses': len(df_analyses), 'Sources': len(sources), 'Fetch Logs': len(df_logs), } # Calculate total storage size total_size = 0 for file in [CONTENT_ITEMS_PATH, CLARITY_ANALYSES_PATH, FETCH_LOGS_PATH]: if os.path.exists(file): total_size += os.path.getsize(file) size_mb = total_size / (1024 * 1024) stats['Storage Size'] = f"{size_mb:.2f} MB" stats['Storage Type'] = "Parquet files" stats['Storage Location'] = str(DATA_DIR) return stats except Exception as e: logger.error(f"Error getting storage stats: {e}") import traceback traceback.print_exc() return {"Error": "Could not retrieve stats - storage may be initializing"} def get_recent_logs(): """Get recent fetch logs""" try: logs = repo.get_fetch_logs(limit=10) if not logs: return "No logs yet" log_text = "" for log in logs: status_emoji = "✅" if log['status'] == 'success' else "❌" log_text += f""" {status_emoji} **{log['fetch_start'].strftime('%Y-%m-%d %H:%M')}** - Source: {log['source_name']} - Items: {log['items_fetched']} fetched, {log['items_new']} new - Status: {log['status']} --- """ return log_text except Exception as e: logger.error(f"Error getting logs: {e}") return f"Error: {str(e)}" # ============================================ # Build Gradio Interface # ============================================ # Custom CSS for better styling custom_css = """ .gradio-container { font-family: 'IBM Plex Sans', sans-serif; } .gr-button-primary { background: linear-gradient(90deg, #4b6cb7 0%, #182848 100%) !important; border: none !important; } """ with gr.Blocks(css=custom_css, title="Madrid Content Analyzer", theme=gr.themes.Soft()) as demo: # Header gr.Markdown(""" # 🏛️ Madrid Content Analyzer ### Analyzing language clarity in Madrid City Council communications **Free and open-source** | Powered by Hugging Face Spaces 🤗 """) # ============================================ # Dashboard Tab # ============================================ with gr.Tab("📊 Dashboard"): gr.Markdown("### Overview Statistics") stats_display = gr.JSON(label="Statistics") refresh_stats_btn = gr.Button("🔄 Refresh Statistics", variant="primary") refresh_stats_btn.click(get_dashboard_stats, outputs=stats_display) gr.Markdown("### Visualizations") with gr.Row(): distribution_chart = gr.Plot(label="Clarity Score Distribution") category_chart = gr.Plot(label="Scores by Category") timeline_chart = gr.Plot(label="Content Timeline") refresh_charts_btn = gr.Button("🔄 Refresh Charts", variant="secondary") def refresh_all_charts(): return ( get_clarity_distribution(), get_category_scores(), get_content_timeline() ) refresh_charts_btn.click( refresh_all_charts, outputs=[distribution_chart, category_chart, timeline_chart] ) # Load initial data demo.load( get_dashboard_stats, outputs=stats_display ) demo.load( refresh_all_charts, outputs=[distribution_chart, category_chart, timeline_chart] ) # ============================================ # Content Browser Tab # ============================================ with gr.Tab("📝 Browse Content"): gr.Markdown("### Search and Filter Content") with gr.Row(): days_slider = gr.Slider(1, 90, value=7, step=1, label="Last N Days") category_dropdown = gr.Dropdown( ["All", "Noticias", "Documentos", "Anuncios"], value="All", label="Category" ) with gr.Row(): min_clarity = gr.Slider(0, 100, value=0, label="Min Clarity Score") max_clarity = gr.Slider(0, 100, value=100, label="Max Clarity Score") search_box = gr.Textbox(label="Search Text", placeholder="Enter keywords...") search_btn = gr.Button("🔍 Search", variant="primary") results_table = gr.Dataframe( label="Search Results", interactive=False, wrap=True ) search_btn.click( search_content, inputs=[days_slider, category_dropdown, min_clarity, max_clarity, search_box], outputs=results_table ) # Load initial results demo.load( lambda: search_content(7, "All", 0, 100, ""), outputs=results_table ) # ============================================ # Analytics Tab # ============================================ with gr.Tab("📈 Analytics"): gr.Markdown("### Low Clarity Items") threshold_slider = gr.Slider(0, 100, value=50, label="Clarity Threshold") get_low_clarity_btn = gr.Button("Get Low Clarity Items", variant="primary") low_clarity_table = gr.Dataframe(label="Items Below Threshold") get_low_clarity_btn.click( get_low_clarity_items, inputs=threshold_slider, outputs=low_clarity_table ) gr.Markdown("### Export Data") export_format = gr.Radio(["csv", "json"], value="csv", label="Export Format") export_btn = gr.Button("📥 Export Data", variant="secondary") export_status = gr.Textbox(label="Export Status") export_btn.click( export_data, inputs=export_format, outputs=export_status ) # ============================================ # Settings Tab # ============================================ with gr.Tab("⚙️ Settings"): gr.Markdown("### Manual Operations") fetch_btn = gr.Button("🔄 Trigger Manual Fetch", variant="primary") fetch_status = gr.Textbox(label="Fetch Status") fetch_btn.click(trigger_manual_fetch, outputs=fetch_status) gr.Markdown("### Database Statistics") db_stats_display = gr.JSON(label="Database Info") refresh_db_stats_btn = gr.Button("🔄 Refresh Database Stats") refresh_db_stats_btn.click(get_database_stats, outputs=db_stats_display) gr.Markdown("### Recent Fetch Logs") logs_display = gr.Markdown() refresh_logs_btn = gr.Button("🔄 Refresh Logs") refresh_logs_btn.click(get_recent_logs, outputs=logs_display) # Load initial data - commented out to avoid crashes # demo.load(get_database_stats, outputs=db_stats_display) # demo.load(get_recent_logs, outputs=logs_display) # Footer gr.Markdown(""" --- **Built with**: Python 🐍 | Gradio 🎨 | DuckDB 🦆 | Aclarador 📝 Data updates automatically every 6 hours | [View Source Code](https://github.com/yourusername/madrid-analyzer) """) # ============================================ # Launch App # ============================================ if __name__ == "__main__": logger.info("Starting Madrid Content Analyzer") demo.launch( share=False, # Don't create share link server_name="0.0.0.0", # Listen on all interfaces server_port=7860, # Default Gradio port show_error=True )