import os import re import torch import logging import gc import sys import pwd # Added for monkey patch from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Dict, List, Optional from transformers import AutoTokenizer, AutoModelForSequenceClassification from tokenizers.normalizers import Sequence, Replace, Strip from tokenizers import Regex from huggingface_hub import hf_hub_download # Added for reliable HF downloads # ===================================================== # 🛠️ Monkey Patch for Docker/Container UID Issue # ===================================================== # Fix for 'getpwuid(): uid not found: 1000' in containerized environments def patched_getpwuid(uid_num): try: return original_getpwuid(uid_num) except KeyError: if uid_num == os.getuid(): # Create fake user entry return pwd.struct_pwent( name='dockeruser', passwd='x', uid=uid_num, gid=os.getgid(), gecos='Docker User', dir='/tmp', shell='/bin/sh' ) raise original_getpwuid = pwd.getpwuid pwd.getpwuid = patched_getpwuid # Set fallback env vars to avoid user-dependent paths os.environ.setdefault('HOME', '/tmp') os.environ.setdefault('USER', 'dockeruser') # ===================================================== # 🔧 تكوين البيئة والإعدادات # ===================================================== logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # إعدادات الذاكرة والكاش CACHE_DIR = "/tmp/huggingface_cache" os.makedirs(CACHE_DIR, exist_ok=True) # تكوين متغيرات البيئة لـ Hugging Face os.environ.update({ "HF_HOME": CACHE_DIR, "TRANSFORMERS_CACHE": CACHE_DIR, "HF_DATASETS_CACHE": CACHE_DIR, "HUGGINGFACE_HUB_CACHE": CACHE_DIR, "TORCH_HOME": CACHE_DIR, "TOKENIZERS_PARALLELISM": "false", # منع مشاكل threading "TRANSFORMERS_OFFLINE": "0", # السماح بالتحميل من الإنترنت }) # إعدادات PyTorch للذاكرة if torch.cuda.is_available(): os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'max_split_size_mb:128' torch.backends.cudnn.benchmark = True # ===================================================== # 🚀 تحديد الجهاز (GPU أو CPU) # ===================================================== device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') logger.info(f"🖥️ Using device: {device}") if torch.cuda.is_available(): logger.info(f"🎮 CUDA Device: {torch.cuda.get_device_name(0)}") logger.info(f"💾 CUDA Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB") # ===================================================== # 📊 خريطة الموديلات # ===================================================== label_mapping = { 0: '13B', 1: '30B', 2: '65B', 3: '7B', 4: 'GLM130B', 5: 'bloom_7b', 6: 'bloomz', 7: 'cohere', 8: 'davinci', 9: 'dolly', 10: 'dolly-v2-12b', 11: 'flan_t5_base', 12: 'flan_t5_large', 13: 'flan_t5_small', 14: 'flan_t5_xl', 15: 'flan_t5_xxl', 16: 'gemma-7b-it', 17: 'gemma2-9b-it', 18: 'gpt-3.5-turbo', 19: 'gpt-35', 20: 'gpt4', 21: 'gpt4o', 22: 'gpt_j', 23: 'gpt_neox', 24: 'human', 25: 'llama3-70b', 26: 'llama3-8b', 27: 'mixtral-8x7b', 28: 'opt_1.3b', 29: 'opt_125m', 30: 'opt_13b', 31: 'opt_2.7b', 32: 'opt_30b', 33: 'opt_350m', 34: 'opt_6.7b', 35: 'opt_iml_30b', 36: 'opt_iml_max_1.3b', 37: 't0_11b', 38: 't0_3b', 39: 'text-davinci-002', 40: 'text-davinci-003' } # ===================================================== # 🤖 Model Manager - إدارة الموديلات # ===================================================== class ModelManager: def __init__(self): self.tokenizer = None self.models = [] self.models_loaded = False self.model_urls = [ "https://huggingface.co/mihalykiss/modernbert_2/resolve/main/Model_groups_3class_seed12", "https://huggingface.co/mihalykiss/modernbert_2/resolve/main/Model_groups_3class_seed22" ] self.base_model_id = "answerdotai/ModernBERT-base" # Primary self.fallback_model_id = "bert-base-uncased" # Fallback if ModernBERT fails self.using_fallback = False def load_tokenizer(self): """تحميل الـ Tokenizer مع fallback""" try: logger.info(f"📝 Loading tokenizer from {self.base_model_id}...") self.tokenizer = AutoTokenizer.from_pretrained( self.base_model_id, cache_dir=CACHE_DIR, use_fast=True, trust_remote_code=False ) logger.info("✅ Primary tokenizer loaded successfully") except Exception as e: logger.warning(f"⚠️ Failed to load primary tokenizer: {e}") try: logger.info(f"🔄 Falling back to {self.fallback_model_id}...") self.tokenizer = AutoTokenizer.from_pretrained( self.fallback_model_id, cache_dir=CACHE_DIR, use_fast=True, trust_remote_code=False ) self.using_fallback = True logger.info("✅ Fallback tokenizer loaded successfully") except Exception as fallback_e: logger.error(f"❌ Failed to load fallback tokenizer: {fallback_e}") return False # إعداد معالج النصوص try: newline_to_space = Replace(Regex(r'\s*\n\s*'), " ") join_hyphen_break = Replace(Regex(r'(\w+)[--]\s*\n\s*(\w+)'), r"\1\2") self.tokenizer.backend_tokenizer.normalizer = Sequence([ self.tokenizer.backend_tokenizer.normalizer, join_hyphen_break, newline_to_space, Strip() ]) except Exception as e: logger.warning(f"⚠️ Could not set custom normalizer: {e}") return True def load_single_model(self, model_url=None, model_path=None, model_name="Model"): """تحميل موديل واحد مع fallback ومعالجة شاملة للأخطاء""" base_model = None try: logger.info(f"🤖 Loading base {model_name} from {self.base_model_id}...") # محاولة تحميل الموديل الأساسي الرئيسي base_model = AutoModelForSequenceClassification.from_pretrained( self.base_model_id, num_labels=41, cache_dir=CACHE_DIR, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, low_cpu_mem_usage=True, trust_remote_code=False ) logger.info("✅ Primary base model loaded") except Exception as e: logger.warning(f"⚠️ Failed to load primary base model: {e}") try: logger.info(f"🔄 Falling back to {self.fallback_model_id}...") base_model = AutoModelForSequenceClassification.from_pretrained( self.fallback_model_id, num_labels=41, cache_dir=CACHE_DIR, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, low_cpu_mem_usage=True, trust_remote_code=False ) self.using_fallback = True logger.info("✅ Fallback base model loaded (note: weights may not be compatible)") except Exception as fallback_e: logger.error(f"❌ Failed to load fallback base model: {fallback_e}") return None # محاولة تحميل الأوزان (فقط إذا لم نستخدم fallback، أو إذا كانت متوافقة) try: if model_path and os.path.exists(model_path): logger.info(f"📁 Loading from local file: {model_path}") state_dict = torch.load(model_path, map_location=device, weights_only=True) base_model.load_state_dict(state_dict, strict=False) elif model_url: # استخدام hf_hub_download بدلاً من torch.hub للـ HF repos logger.info(f"🌐 Downloading weights from HF repo...") repo_id = "mihalykiss/modernbert_2" filename = model_url.split("/")[-1] local_path = hf_hub_download( repo_id=repo_id, filename=filename, cache_dir=CACHE_DIR ) logger.info(f"✅ Downloaded to {local_path}") state_dict = torch.load(local_path, map_location=device, weights_only=True) base_model.load_state_dict(state_dict, strict=False) logger.info(f"✅ {model_name} weights loaded successfully") except Exception as e: logger.warning(f"⚠️ Could not load custom weights for {model_name}: {e}") logger.info("📌 Using base model without fine-tuned weights") # نقل للجهاز وضبط الوضع try: base_model = base_model.to(device) base_model.eval() logger.info(f"✅ {model_name} moved to {device} and set to eval mode") return base_model except Exception as e: logger.error(f"❌ Failed to prepare {model_name}: {e}") return None def load_models(self): """تحميل جميع الموديلات""" if self.models_loaded: return True try: # تحميل tokenizer if not self.load_tokenizer(): return False # تحميل كل موديل for i, model_url in enumerate(self.model_urls): model = self.load_single_model( model_url=model_url, model_name=f"Model {i+1}" ) if model is None: logger.warning(f"⚠️ Failed to load model {i+1}") continue self.models.append(model) if len(self.models) == 0: logger.error("❌ No models loaded successfully") return False self.models_loaded = True logger.info(f"✅ Successfully loaded {len(self.models)} model(s)") return True except Exception as e: logger.error(f"❌ Model loading error: {e}", exc_info=True) return False def classify_text(self, text: str, max_length: int = 512) -> Dict: """تصنيف النص""" if not self.models_loaded or not self.tokenizer: raise RuntimeError("Models or tokenizer not loaded") try: # Tokenization inputs = self.tokenizer( text, return_tensors="pt", truncation=True, max_length=max_length, padding=True ).to(device) # التنبؤ باستخدام جميع الموديلات all_logits = [] with torch.no_grad(): for model in self.models: outputs = model(**inputs) all_logits.append(outputs.logits) # حساب المتوسط avg_logits = torch.mean(torch.stack(all_logits), dim=0) probabilities = torch.nn.functional.softmax(avg_logits, dim=-1) # الحصول على أعلى التنبؤات top_probs, top_indices = torch.topk(probabilities[0], k=5) # حساب احتمالات AI vs Human ai_prob = 1.0 - probabilities[0][24].item() # 24 = human human_prob = probabilities[0][24].item() # الموديل المتوقع predicted_idx = top_indices[0].item() predicted_model = label_mapping.get(predicted_idx, "unknown") # Top 5 predictions top_5 = [ { "model": label_mapping.get(idx.item(), "unknown"), "probability": prob.item() } for prob, idx in zip(top_probs, top_indices) ] return { "ai_percentage": round(ai_prob * 100, 2), "human_percentage": round(human_prob * 100, 2), "predicted_model": predicted_model, "top_5_predictions": top_5, "models_used": len(self.models), "using_fallback": self.using_fallback } except Exception as e: logger.error(f"Classification error: {e}", exc_info=True) raise # ===================================================== # 🆕 ADVANCED ACCURACY FEATURES # ===================================================== def calculate_perplexity_score(text: str) -> float: """ Calculate text perplexity (complexity/predictability) AI text tends to have lower perplexity (more predictable) Human text has higher perplexity (more varied/unpredictable) """ words = text.split() if len(words) < 10: return 0.0 # Calculate word length variance word_lengths = [len(w) for w in words] avg_length = sum(word_lengths) / len(word_lengths) variance = sum((l - avg_length) ** 2 for l in word_lengths) / len(word_lengths) # Calculate unique word ratio unique_ratio = len(set(words)) / len(words) # Combine metrics (normalized 0-1, higher = more human-like) perplexity = (variance / 20) * 0.5 + unique_ratio * 0.5 return min(max(perplexity, 0), 1) def analyze_sentence_structure(text: str) -> Dict: """ Analyze sentence patterns AI tends to have: - More uniform sentence lengths - Consistent punctuation patterns - Regular structure """ sentences = re.split(r'[.!?]+', text) sentences = [s.strip() for s in sentences if s.strip()] if len(sentences) < 2: return {"uniformity": 0.5, "variance": 0.5} # Sentence lengths lengths = [len(s.split()) for s in sentences] avg_length = sum(lengths) / len(lengths) # Calculate variance (low variance = more uniform = AI-like) variance = sum((l - avg_length) ** 2 for l in lengths) / len(lengths) uniformity = 1 / (1 + variance / 10) # Normalize return { "uniformity": round(uniformity, 3), "variance": round(variance, 2), "avg_sentence_length": round(avg_length, 1), "sentence_count": len(sentences) } def detect_repetition_patterns(text: str) -> Dict: """ Detect repetitive patterns common in AI text AI often repeats: - Similar phrases - Sentence structures - Transition words """ words = text.lower().split() # Check for bigram repetition bigrams = [f"{words[i]} {words[i+1]}" for i in range(len(words)-1)] bigram_repetition = 1 - (len(set(bigrams)) / len(bigrams)) if bigrams else 0 # Check for trigram repetition trigrams = [f"{words[i]} {words[i+1]} {words[i+2]}" for i in range(len(words)-2)] trigram_repetition = 1 - (len(set(trigrams)) / len(trigrams)) if trigrams else 0 # Common AI transition phrases ai_phrases = [ 'furthermore', 'moreover', 'additionally', 'consequently', 'in conclusion', 'to summarize', 'it is important to note', 'it should be noted', 'in other words', 'as a result' ] ai_phrase_count = sum(1 for phrase in ai_phrases if phrase in text.lower()) ai_phrase_density = ai_phrase_count / max(len(words) / 100, 1) # per 100 words return { "bigram_repetition": round(bigram_repetition, 3), "trigram_repetition": round(trigram_repetition, 3), "ai_phrase_density": round(ai_phrase_density, 2), "ai_phrase_count": ai_phrase_count } def analyze_vocabulary_richness(text: str) -> Dict: """ Analyze vocabulary complexity AI tends to: - Use more formal vocabulary - Less slang/informal words - More technical terms """ words = [w.lower() for w in re.findall(r'\b[a-z]+\b', text.lower())] if len(words) < 10: return {"richness": 0.5, "formality": 0.5} # Type-token ratio (vocabulary diversity) ttr = len(set(words)) / len(words) # Informal markers (human-like) informal_markers = [ 'lol', 'omg', 'btw', 'tbh', 'imo', 'gonna', 'wanna', 'gotta', 'yeah', 'nah', 'yep', 'nope', 'kinda', 'sorta', 'dunno' ] informal_count = sum(1 for marker in informal_markers if marker in words) # Formal markers (AI-like) formal_markers = [ 'furthermore', 'nevertheless', 'consequently', 'substantially', 'primarily', 'significantly', 'comprehensive', 'fundamental', 'demonstrate', 'facilitate', 'optimize', 'leverage' ] formal_count = sum(1 for marker in formal_markers if marker in words) # Formality score (0 = informal/human, 1 = formal/AI) formality = formal_count / max(formal_count + informal_count, 1) return { "type_token_ratio": round(ttr, 3), "informal_markers": informal_count, "formal_markers": formal_count, "formality_score": round(formality, 3), "unique_words": len(set(words)) } def detect_human_errors(text: str) -> Dict: """ Detect common human typing patterns Humans tend to have: - Typos and spelling errors - Inconsistent punctuation - Emotional expressions """ # Emotional markers (very human) emotions = ['!', '?', '!!', '???', '...', 'haha', 'lmao', 'wow'] emotion_count = sum(text.lower().count(e) for e in emotions) # Repeated punctuation (human typo pattern) repeated_punct = len(re.findall(r'([!?.])\1+', text)) # ALL CAPS words (emotional emphasis, human-like) caps_words = len(re.findall(r'\b[A-Z]{2,}\b', text)) # Inconsistent spacing (human error) spacing_issues = len(re.findall(r'\s{2,}|[a-z][A-Z]', text)) return { "emotion_markers": emotion_count, "repeated_punctuation": repeated_punct, "caps_emphasis": caps_words, "spacing_inconsistencies": spacing_issues, "human_error_score": round((emotion_count + repeated_punct + caps_words) / max(len(text.split()) / 50, 1), 2) } def calculate_burstiness(text: str) -> float: """ Calculate burstiness (variation in sentence/word patterns) AI: Low burstiness (consistent) Human: High burstiness (varied, unpredictable) """ sentences = re.split(r'[.!?]+', text) sentences = [s.strip() for s in sentences if s.strip()] if len(sentences) < 3: return 0.5 lengths = [len(s.split()) for s in sentences] # Calculate burstiness score mean_length = sum(lengths) / len(lengths) variance = sum((l - mean_length) ** 2 for l in lengths) / len(lengths) # Higher variance = more bursty = more human burstiness = min(variance / 50, 1.0) # Normalize return round(burstiness, 3) def advanced_linguistic_analysis(text: str) -> Dict: """ Comprehensive linguistic analysis combining all methods Returns a confidence boost/penalty based on linguistic features """ try: perplexity = calculate_perplexity_score(text) structure = analyze_sentence_structure(text) repetition = detect_repetition_patterns(text) vocabulary = analyze_vocabulary_richness(text) human_errors = detect_human_errors(text) burstiness = calculate_burstiness(text) # Calculate AI likelihood from linguistic features # Higher score = more AI-like ai_indicators = [ structure["uniformity"], # High uniformity = AI repetition["bigram_repetition"] * 2, # High repetition = AI repetition["ai_phrase_density"] / 5, # Many AI phrases = AI vocabulary["formality_score"], # High formality = AI (1 - burstiness), # Low burstiness = AI (1 - perplexity), # Low perplexity = AI ] # Calculate human likelihood from linguistic features human_indicators = [ human_errors["human_error_score"], # Errors = human vocabulary["informal_markers"] / 10, # Informal = human burstiness, # High burstiness = human perplexity, # High perplexity = human ] linguistic_ai_score = sum(ai_indicators) / len(ai_indicators) linguistic_human_score = sum(human_indicators) / len(human_indicators) # Normalize to 0-100 scale linguistic_ai_percentage = round(linguistic_ai_score * 100, 2) linguistic_human_percentage = round(linguistic_human_score * 100, 2) return { "linguistic_features": { "perplexity": perplexity, "sentence_structure": structure, "repetition_patterns": repetition, "vocabulary_analysis": vocabulary, "human_error_patterns": human_errors, "burstiness": burstiness }, "linguistic_ai_score": linguistic_ai_percentage, "linguistic_human_score": linguistic_human_percentage, "confidence_modifier": { "ai_indicators_strength": round(linguistic_ai_score, 3), "human_indicators_strength": round(linguistic_human_score, 3), "combined_confidence": round(abs(linguistic_ai_score - linguistic_human_score), 3) } } except Exception as e: logger.warning(f"Advanced linguistic analysis failed: {e}") return { "linguistic_features": {}, "linguistic_ai_score": 50, "linguistic_human_score": 50, "confidence_modifier": {"error": str(e)} } # ===================================================== # 🆕 ADVANCED ACCURACY FEATURES # ===================================================== def clean_content_for_analysis(text: str, min_line_length: int = 30) -> str: """ Clean content by removing short lines (headlines, etc.) Args: text: Original text min_line_length: Minimum character length for a line to be kept (default: 30) Returns: Cleaned text with only substantial content lines """ lines = text.split('\n') cleaned_lines = [] for line in lines: stripped = line.strip() # Keep lines that are longer than min_line_length if len(stripped) >= min_line_length: cleaned_lines.append(stripped) return ' '.join(cleaned_lines) def split_content_in_half(text: str) -> tuple: """ Split cleaned content into two halves Args: text: Cleaned text Returns: Tuple of (first_half, second_half) """ words = text.split() mid_point = len(words) // 2 first_half = ' '.join(words[:mid_point]) second_half = ' '.join(words[mid_point:]) return first_half, second_half def analyze_content_halves(model_manager, text: str, overall_result: Dict = None) -> Dict: """ Analyze text by splitting it into two halves after cleaning. Uses BOTH models for ensemble predictions on each half for improved accuracy PLUS advanced linguistic analysis for enhanced confidence. """ try: logger.info("🔬 Running advanced linguistic analysis...") linguistic_analysis = advanced_linguistic_analysis(text) cleaned_text = clean_content_for_analysis(text) if not cleaned_text or len(cleaned_text.split()) < 10: return { "halves_analysis_available": False, "reason": "Content too short after cleaning", "linguistic_analysis": linguistic_analysis } # Split text into halves first_half, second_half = split_content_in_half(cleaned_text) # Linguistic analysis for each half first_half_linguistic = advanced_linguistic_analysis(first_half) second_half_linguistic = advanced_linguistic_analysis(second_half) # Ensemble model predictions first_half_result = model_manager.classify_text(first_half) second_half_result = model_manager.classify_text(second_half) first_ai = first_half_result["ai_percentage"] second_ai = second_half_result["ai_percentage"] first_model = first_half_result["predicted_model"] second_model = second_half_result["predicted_model"] first_top5 = first_half_result.get("top_5_predictions", []) second_top5 = second_half_result.get("top_5_predictions", []) first_half_words = len(first_half.split()) second_half_words = len(second_half.split()) # Stats avg_halves_ai_score = (first_ai + second_ai) / 2 variance_between_halves = abs(first_ai - second_ai) overall_ai_prob = ( overall_result["ai_percentage"] / 100 if overall_result else avg_halves_ai_score / 100 ) models_agree = first_model == second_model models_used = first_half_result.get("models_used", 1) ensemble_confidence_boost = "High" if models_used > 1 else "Low" # Linguistic AI/Human scores ling_ai = linguistic_analysis.get("linguistic_ai_score", 50) ling_human = linguistic_analysis.get("linguistic_human_score", 50) # Some fallback linguistic details burstiness = linguistic_analysis.get("burstiness", 0.5) formality_score = linguistic_analysis.get("formality_score", 0.5) human_error_score = linguistic_analysis.get("human_error_score", 0.5) emotion_markers = linguistic_analysis.get("emotion_markers", 0) # Weighted average between model and linguistic results combined_avg_ai = (avg_halves_ai_score * 0.7) + (ling_ai * 0.3) model_ling_agreement = abs(avg_halves_ai_score - ling_ai) < 20 # ----- Final Decision Logic ----- verdict = "UNCERTAIN" confidence = "Low" accuracy_percentage = 60 reasoning = "" # HUMAN if first_ai < 50 and second_ai < 50 and second_model.lower() == "human": verdict = "HUMAN" if ling_human > ling_ai: confidence = "Very High" accuracy_percentage = 95 elif variance_between_halves < 15: confidence = "High" accuracy_percentage = 85 else: confidence = "Medium" accuracy_percentage = 75 reasoning = ( f"Both halves scored below 50% AI probability (First: {first_ai}%, Second: {second_ai}%). " f"Linguistic analysis confirms with {ling_human:.1f}% human indicators. " f"The text shows {emotion_markers} emotional markers and a human error score of {human_error_score:.2f}. " f"Variance between halves is {variance_between_halves:.2f}%, indicating consistent human patterns. " ) # AI elif first_ai > 50 and second_ai > 50 and second_model.lower() != "human": verdict = "AI" if first_ai > 80 and second_ai > 80 and model_ling_agreement: confidence = "Very High" accuracy_percentage = 98 elif first_ai > 70 and second_ai > 70: confidence = "High" accuracy_percentage = 90 else: confidence = "Medium" accuracy_percentage = 80 reasoning = ( f"Both halves scored above 50% AI probability (First: {first_ai}%, Second: {second_ai}%). " f"Linguistic analysis confirms with {ling_ai:.1f}% AI indicators. " f"Detected high formality score ({formality_score:.2f}) and low burstiness ({burstiness:.2f}), typical of AI generation. " f"Variance between halves: {variance_between_halves:.2f}%. " f"Models {'agree' if models_agree else 'disagree'} across halves." ) # MIXED elif (first_ai > 50 and second_ai < 50) or (first_ai < 50 and second_ai > 50): verdict = "MIXED" confidence = "Medium" if variance_between_halves > 30 else "Low" accuracy_percentage = 75 reasoning = ( f"Mixed signals detected. First half: {first_ai}% AI ({first_model}), " f"Second half: {second_ai}% AI ({second_model}). " f"Linguistic AI score: {ling_ai:.1f}%. " f"Variance between halves ({variance_between_halves:.2f}%) supports mixed authorship." ) # Borderline else: if second_model.lower() == "human" or ling_human > ling_ai: verdict = "LIKELY_HUMAN" confidence = "Medium" accuracy_percentage = 70 else: verdict = "LIKELY_AI" confidence = "Medium" accuracy_percentage = 70 reasoning = ( f"Borderline case: scores near 50%. " f"Linguistic analysis leans toward {'human' if ling_human > ling_ai else 'AI'} writing. " f"Variance: {variance_between_halves:.2f}%." ) # ----- Final Output ----- final_decision = { "verdict": verdict, "confidence": confidence, "accuracy_percentage": accuracy_percentage, "reasoning": reasoning, "supporting_data": { "overall_ai_prob": round(overall_ai_prob, 3), "avg_halves_ai_score": round(avg_halves_ai_score / 100, 3), "variance_between_halves": round(variance_between_halves, 2), "first_half_model": first_model, "second_half_model": second_model, "models_agree": models_agree, "ensemble_models_used": models_used, "ensemble_confidence": ensemble_confidence_boost, "linguistic_ai_score": ling_ai, "linguistic_human_score": ling_human, "model_linguistic_agreement": model_ling_agreement, "combined_ai_score": round(combined_avg_ai, 2), }, } return { "halves_analysis_available": True, "cleaned_content": { "total_words": len(cleaned_text.split()), "first_half_words": first_half_words, "second_half_words": second_half_words, }, "first_half": { "ai_percentage": first_ai, "human_percentage": first_half_result["human_percentage"], "predicted_model": first_model, "word_count": first_half_words, "preview": first_half[:200] + "..." if len(first_half) > 200 else first_half, "top_5_predictions": first_top5, "models_used": models_used, "linguistic_analysis": first_half_linguistic, }, "second_half": { "ai_percentage": second_ai, "human_percentage": second_half_result["human_percentage"], "predicted_model": second_model, "word_count": second_half_words, "preview": second_half[:200] + "..." if len(second_half) > 200 else second_half, "top_5_predictions": second_top5, "models_used": models_used, "linguistic_analysis": second_half_linguistic, }, "final_decision": final_decision, "overall_linguistic_analysis": linguistic_analysis, } except Exception as e: logger.error(f"Error in halves analysis: {e}", exc_info=True) return { "halves_analysis_available": False, "error": str(e) } # ===================================================== # 📝 Pydantic Models # ===================================================== class TextInput(BaseModel): text: str analyze_paragraphs: bool = False class SimpleTextInput(BaseModel): text: str class DetectionResult(BaseModel): success: bool code: int message: str data: Dict # ===================================================== # 🔧 مساعدات # ===================================================== def split_into_paragraphs(text: str, min_length: int = 100) -> List[str]: """تقسيم النص إلى فقرات""" paragraphs = re.split(r'\n\s*\n', text) return [p.strip() for p in paragraphs if len(p.strip()) >= min_length] # ===================================================== # 🌐 FastAPI Application # ===================================================== app = FastAPI( title="ModernBERT AI Text Detector API", description="API for detecting AI-generated text using ModernBERT", version="2.0.0" ) # CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Model Manager Instance model_manager = ModelManager() # ===================================================== # 🚀 Startup Event # ===================================================== @app.on_event("startup") async def startup_event(): """تحميل الموديلات عند بدء التطبيق""" logger.info("🚀 Starting application...") logger.info("📦 Loading models...") success = model_manager.load_models() if success: logger.info("✅ Application ready! (Fallback mode: %s)", model_manager.using_fallback) else: logger.error("⚠️ Failed to load models - API will return errors") logger.info("💡 Tip: Ensure 'transformers>=4.45.0' and 'huggingface_hub' are installed. Run: pip install --upgrade transformers huggingface_hub") @app.get("/") async def root(): """الصفحة الرئيسية""" return { "message": "ModernBERT AI Text Detector API", "status": "online" if model_manager.models_loaded else "initializing", "models_loaded": len(model_manager.models), "using_fallback": model_manager.using_fallback, "device": str(device), "endpoints": { "analyze": "/analyze", "simple": "/analyze-simple", "health": "/health", "docs": "/docs" } } @app.get("/health") async def health_check(): """فحص صحة الخدمة""" memory_info = {} if torch.cuda.is_available(): memory_info = { "gpu_allocated_gb": round(torch.cuda.memory_allocated() / 1024**3, 2), "gpu_reserved_gb": round(torch.cuda.memory_reserved() / 1024**3, 2) } return { "status": "healthy" if model_manager.models_loaded else "unhealthy", "models_loaded": len(model_manager.models), "using_fallback": model_manager.using_fallback, "device": str(device), "cuda_available": torch.cuda.is_available(), "memory_info": memory_info } @app.post("/analyze", response_model=DetectionResult) async def analyze_text(data: TextInput): """ تحليل النص للكشف عن AI يحاكي نفس وظيفة Gradio classify_text """ try: # التحقق من النص text = data.text.strip() if not text: return DetectionResult( success=False, code=400, message="Empty input text", data={} ) # التأكد من تحميل الموديلات if not model_manager.models_loaded: # محاولة تحميل الموديلات if not model_manager.load_models(): return DetectionResult( success=False, code=503, message="Models not available. Check logs for details.", data={} ) # حساب عدد الكلمات total_words = len(text.split()) # التحليل الأساسي result = model_manager.classify_text(text) # النتائج الأساسية ai_percentage = result["ai_percentage"] human_percentage = result["human_percentage"] ai_words = int(total_words * (ai_percentage / 100)) # تحليل الفقرات إذا طُلب ذلك paragraphs_analysis = [] if data.analyze_paragraphs and ai_percentage > 50: paragraphs = split_into_paragraphs(text) recalc_ai_words = 0 recalc_total_words = 0 for para in paragraphs[:10]: # حد أقصى 10 فقرات if para.strip(): try: para_result = model_manager.classify_text(para) para_words = len(para.split()) recalc_total_words += para_words recalc_ai_words += para_words * (para_result["ai_percentage"] / 100) paragraphs_analysis.append({ "paragraph": para[:200] + "..." if len(para) > 200 else para, "ai_generated_score": para_result["ai_percentage"] / 100, "human_written_score": para_result["human_percentage"] / 100, "predicted_model": para_result["predicted_model"] }) except Exception as e: logger.warning(f"Failed to analyze paragraph: {e}") # إعادة حساب النسب بناءً على الفقرات if recalc_total_words > 0: ai_percentage = round((recalc_ai_words / recalc_total_words) * 100, 2) human_percentage = round(100 - ai_percentage, 2) ai_words = int(recalc_ai_words) # 🆕 NEW FEATURE: Analyze content by halves (pass overall result for variance calculation) halves_analysis = analyze_content_halves(model_manager, text, result) # إنشاء رسالة التغذية الراجعة if ai_percentage > 50: feedback = "Most of Your Text is AI/GPT Generated" else: feedback = "Most of Your Text Appears Human-Written" # إرجاع النتائج بنفس تنسيق الكود الأصلي + إضافة تحليل النصفين return DetectionResult( success=True, code=200, message="analysis completed", data={ "fakePercentage": ai_percentage, "isHuman": human_percentage, "textWords": total_words, "aiWords": ai_words, "paragraphs": paragraphs_analysis, "predicted_model": result["predicted_model"], "feedback": feedback, "input_text": text[:500] + "..." if len(text) > 500 else text, "detected_language": "en", "top_5_predictions": result.get("top_5_predictions", []), "models_used": result.get("models_used", 1), "using_fallback": result.get("using_fallback", False), # 🆕 NEW: Halves analysis appended to response "halves_analysis": halves_analysis } ) except Exception as e: logger.error(f"Analysis error: {e}", exc_info=True) return DetectionResult( success=False, code=500, message=f"Analysis failed: {str(e)}", data={} ) @app.post("/analyze-simple") async def analyze_simple(data: SimpleTextInput): """ تحليل مبسط - يرجع النتائج الأساسية فقط """ try: text = data.text.strip() if not text: raise HTTPException(status_code=400, detail="Empty text") if not model_manager.models_loaded: if not model_manager.load_models(): raise HTTPException(status_code=503, detail="Models not available") result = model_manager.classify_text(text) return { "is_ai": result["ai_percentage"] > 50, "ai_score": result["ai_percentage"], "human_score": result["human_percentage"], "detected_model": result["predicted_model"] if result["ai_percentage"] > 50 else None, "confidence": max(result["ai_percentage"], result["human_percentage"]), "using_fallback": result.get("using_fallback", False) } except HTTPException: raise except Exception as e: logger.error(f"Simple analysis error: {e}") raise HTTPException(status_code=500, detail=str(e)) # ===================================================== # 🏃 تشغيل التطبيق # ===================================================== if __name__ == "__main__": import uvicorn # الحصول على الإعدادات من البيئة port = int(os.environ.get("PORT", 8000)) host = os.environ.get("HOST", "0.0.0.0") workers = int(os.environ.get("WORKERS", 1)) logger.info("=" * 50) logger.info(f"🌐 Starting server on {host}:{port}") logger.info(f"👷 Workers: {workers}") logger.info(f"📚 Documentation: http://{host}:{port}/docs") logger.info("=" * 50) uvicorn.run( "main:app", # Assuming this file is named main.py host=host, port=port, workers=workers, reload=False # Set to True for dev )