import re import gradio as gr from transformers import pipeline from langdetect import detect # ---------- Model (small, CPU-friendly) ---------- # Binary outputs like: [{'label': 'toxic', 'score': ...}, {'label':'not_toxic', ...}] clf = pipeline( task="text-classification", model="citizenlab/distilbert-base-multilingual-cased-toxicity", top_k=None ) # Warm-up once so the first click isn't "dead" while weights download try: _ = clf("hello") except Exception: pass # ---------- Heuristic cues (demo only) ---------- THREAT_PATTERNS = re.compile(r"\b(kill|stab|shoot|bomb|burn|hang|slap|attack|kuua|kukuchoma|nikukate)\b", re.I) MINOR_PATTERNS = re.compile(r"\b(minor|under\s?age|under\s?18|child|mtoto|1[0-7]\s?yo|15yo|16yo|17yo)\b", re.I) # Swahili insult/profanity cues (non-exhaustive) INSULT_SW = re.compile( r"\b(mjinga|pumbavu|mshenzi|kumbafu|takataka|malaya|bwege|fala|mbwa|mbuzi|chizi|zezeta)\b", re.I, ) # Group / protected-class cues (EN + SW tokens for demo) HATE_GROUP = re.compile( r"\b(wanawake|wanaume|wazungu|wachina|wahindi|wasomali|kikuyu|luo|kalenjin|luhya|immigrant|refugee|gay|lesbian|trans|muslim|christian|jew)\b", re.I, ) # ---------- Policy thresholds/actions ---------- BASE_POLICY = { "harassment_toxicity": {"label": "toxicity", "low": 0.40, "high": 0.75, "actions": {"low": "limit_reach", "high": "remove_and_escalate"}}, "hate_speech": {"label": "hate_targeted", "low": 0.45, "high": 0.70, "actions": {"low": "limit_reach", "high": "remove_and_escalate"}}, "violent_threats": {"label": "threats", "low": 0.30, "high": 0.60, "actions": {"low": "escalate_review", "high": "remove_and_escalate"}}, "child_safety": {"label": "sexual_minors", "low": 0.20, "high": 0.40, "actions": {"low": "escalate_child_safety_team", "high": "remove_and_notify_csirt"}}, } # ---------- Helpers ---------- def _scores_from_model(text: str): raw = clf(text) # Normalize pipeline output into list[dict] if isinstance(raw, dict): outs = [raw] elif isinstance(raw, list): outs = raw[0] if (len(raw) and isinstance(raw[0], list)) else raw else: outs = [] scores = {str(d.get("label", "")).lower(): float(d.get("score", 0.0)) for d in outs if isinstance(d, dict)} # Robust toxicity from binary labels if "toxic" in scores: toxicity = scores["toxic"] elif "not_toxic" in scores: toxicity = 1.0 - scores["not_toxic"] elif "non_toxic" in scores: toxicity = 1.0 - scores["non_toxic"] else: toxicity = max(scores.values()) if scores else 0.0 # This model doesn't output hate/threat explicitly hate = 0.0 threats = 0.0 return toxicity, hate, threats def score_labels(text: str): try: lang = detect(text) except Exception: lang = "unknown" tox, hate, thr_model = _scores_from_model(text) thr_rule = 0.9 if THREAT_PATTERNS.search(text or "") else 0.0 minors = 0.9 if MINOR_PATTERNS.search(text or "") else 0.01 probs = { "toxicity": float(tox), "hate_targeted": float(hate), "threats": float(max(thr_model, thr_rule)), "sexual_minors": float(minors), "misinfo_flag": 0.0 } # --- lexical boosts for Swahili/code-switch (demo heuristic) --- if INSULT_SW.search(text or ""): probs["toxicity"] = max(probs["toxicity"], 0.75) # If toxic AND a group term appears → likely hate-targeted if probs["toxicity"] >= 0.35 and HATE_GROUP.search(text or ""): probs["hate_targeted"] = max( probs.get("hate_targeted", 0.0), min(0.2 + 0.8 * probs["toxicity"], 0.95) ) return lang, probs def scaled_policy(scale: float): pol = {} for sec, spec in BASE_POLICY.items(): low = min(round(spec["low"] * scale, 3), 0.95) high = min(round(spec["high"] * scale, 3), 0.99) pol[sec] = {**spec, "low": low, "high": high} return pol def route_policy(probs: dict, policy: dict): decisions = [] for section, spec in policy.items(): p = probs.get(spec["label"], 0.0) if p >= spec["high"]: decisions.append({"section": section, "score": round(p,3), "action": spec["actions"]["high"], "severity": "high"}) elif p >= spec["low"]: decisions.append({"section": section, "score": round(p,3), "action": spec["actions"]["low"], "severity": "medium"}) if not decisions: return {"action": "allow", "reason": "no policy thresholds met", "decisions": []} best = max(decisions, key=lambda d: d["score"]) label_map = { "toxicity": "harassment/toxicity", "hate_targeted": "hate speech", "threats": "violent threats", "sexual_minors": "child safety" } reason = f"Triggered by **{label_map.get(BASE_POLICY[best['section']]['label'],'signal')}** with score {best['score']}" return {"action": best["action"], "primary": best, "decisions": decisions, "reason": reason} def moderate(text: str, strictness: float): if not text or not text.strip(): return {"error": "Enter some text."}, {} lang, probs = score_labels(text) decision = route_policy(probs, scaled_policy(strictness)) scores_view = {"language": lang, **{k: round(v,3) for k,v in probs.items()}} return scores_view, decision # ---------- UI ---------- examples = [ ["Wewe ni mjinga kabisa", 1.0], ["Hawa watu wanastahili kuondolewa kabisa", 1.0], ["Nitakuchoma ukijaribu tena", 1.0], ["This child is under 17 and sexy", 1.0], ["Vaccines cause 5G in Nairobi", 1.0], ] with gr.Blocks(analytics_enabled=False) as demo: gr.Markdown("# Bilingual Moderation (Swahili + English) · Policy-aware Routing") gr.Markdown("Single multilingual classifier (toxicity) + simple cues for threats/child-safety, " "with **policy-aware routing**. Use **Strictness** to tune thresholds.") inp = gr.Textbox(label="Text", lines=4, placeholder="Andika / Type here…") strict = gr.Slider(0.7, 1.3, value=1.0, step=0.05, label="Strictness (threshold scale)") btn = gr.Button("Moderate", variant="primary") scores = gr.JSON(label="Scores (probabilities)") decision = gr.JSON(label="Policy decision (includes reason)") inp.submit(moderate, inputs=[inp, strict], outputs=[scores, decision]) gr.Examples(examples=examples, inputs=[inp, strict], fn=moderate, outputs=[scores, decision], label="Try examples (auto-run)", cache_examples=False) btn.click(moderate, inputs=[inp, strict], outputs=[scores, decision]) demo.queue().launch()