Spaces:
Sleeping
Sleeping
| import re | |
| import gradio as gr | |
| from transformers import pipeline | |
| from langdetect import detect | |
| # ---------- Model (small, CPU-friendly) ---------- | |
| # Binary outputs like: [{'label': 'toxic', 'score': ...}, {'label':'not_toxic', ...}] | |
| clf = pipeline( | |
| task="text-classification", | |
| model="citizenlab/distilbert-base-multilingual-cased-toxicity", | |
| top_k=None | |
| ) | |
| # Warm-up once so the first click isn't "dead" while weights download | |
| try: | |
| _ = clf("hello") | |
| except Exception: | |
| pass | |
| # ---------- Heuristic cues (demo only) ---------- | |
| THREAT_PATTERNS = re.compile(r"\b(kill|stab|shoot|bomb|burn|hang|slap|attack|kuua|kukuchoma|nikukate)\b", re.I) | |
| MINOR_PATTERNS = re.compile(r"\b(minor|under\s?age|under\s?18|child|mtoto|1[0-7]\s?yo|15yo|16yo|17yo)\b", re.I) | |
| # Swahili insult/profanity cues (non-exhaustive) | |
| INSULT_SW = re.compile( | |
| r"\b(mjinga|pumbavu|mshenzi|kumbafu|takataka|malaya|bwege|fala|mbwa|mbuzi|chizi|zezeta)\b", | |
| re.I, | |
| ) | |
| # Group / protected-class cues (EN + SW tokens for demo) | |
| HATE_GROUP = re.compile( | |
| r"\b(wanawake|wanaume|wazungu|wachina|wahindi|wasomali|kikuyu|luo|kalenjin|luhya|immigrant|refugee|gay|lesbian|trans|muslim|christian|jew)\b", | |
| re.I, | |
| ) | |
| # ---------- Policy thresholds/actions ---------- | |
| BASE_POLICY = { | |
| "harassment_toxicity": {"label": "toxicity", "low": 0.40, "high": 0.75, | |
| "actions": {"low": "limit_reach", "high": "remove_and_escalate"}}, | |
| "hate_speech": {"label": "hate_targeted", "low": 0.45, "high": 0.70, | |
| "actions": {"low": "limit_reach", "high": "remove_and_escalate"}}, | |
| "violent_threats": {"label": "threats", "low": 0.30, "high": 0.60, | |
| "actions": {"low": "escalate_review", "high": "remove_and_escalate"}}, | |
| "child_safety": {"label": "sexual_minors", "low": 0.20, "high": 0.40, | |
| "actions": {"low": "escalate_child_safety_team", "high": "remove_and_notify_csirt"}}, | |
| } | |
| # ---------- Helpers ---------- | |
| def _scores_from_model(text: str): | |
| raw = clf(text) | |
| # Normalize pipeline output into list[dict] | |
| if isinstance(raw, dict): | |
| outs = [raw] | |
| elif isinstance(raw, list): | |
| outs = raw[0] if (len(raw) and isinstance(raw[0], list)) else raw | |
| else: | |
| outs = [] | |
| scores = {str(d.get("label", "")).lower(): float(d.get("score", 0.0)) for d in outs if isinstance(d, dict)} | |
| # Robust toxicity from binary labels | |
| if "toxic" in scores: | |
| toxicity = scores["toxic"] | |
| elif "not_toxic" in scores: | |
| toxicity = 1.0 - scores["not_toxic"] | |
| elif "non_toxic" in scores: | |
| toxicity = 1.0 - scores["non_toxic"] | |
| else: | |
| toxicity = max(scores.values()) if scores else 0.0 | |
| # This model doesn't output hate/threat explicitly | |
| hate = 0.0 | |
| threats = 0.0 | |
| return toxicity, hate, threats | |
| def score_labels(text: str): | |
| try: | |
| lang = detect(text) | |
| except Exception: | |
| lang = "unknown" | |
| tox, hate, thr_model = _scores_from_model(text) | |
| thr_rule = 0.9 if THREAT_PATTERNS.search(text or "") else 0.0 | |
| minors = 0.9 if MINOR_PATTERNS.search(text or "") else 0.01 | |
| probs = { | |
| "toxicity": float(tox), | |
| "hate_targeted": float(hate), | |
| "threats": float(max(thr_model, thr_rule)), | |
| "sexual_minors": float(minors), | |
| "misinfo_flag": 0.0 | |
| } | |
| # --- lexical boosts for Swahili/code-switch (demo heuristic) --- | |
| if INSULT_SW.search(text or ""): | |
| probs["toxicity"] = max(probs["toxicity"], 0.75) | |
| # If toxic AND a group term appears → likely hate-targeted | |
| if probs["toxicity"] >= 0.35 and HATE_GROUP.search(text or ""): | |
| probs["hate_targeted"] = max( | |
| probs.get("hate_targeted", 0.0), | |
| min(0.2 + 0.8 * probs["toxicity"], 0.95) | |
| ) | |
| return lang, probs | |
| def scaled_policy(scale: float): | |
| pol = {} | |
| for sec, spec in BASE_POLICY.items(): | |
| low = min(round(spec["low"] * scale, 3), 0.95) | |
| high = min(round(spec["high"] * scale, 3), 0.99) | |
| pol[sec] = {**spec, "low": low, "high": high} | |
| return pol | |
| def route_policy(probs: dict, policy: dict): | |
| decisions = [] | |
| for section, spec in policy.items(): | |
| p = probs.get(spec["label"], 0.0) | |
| if p >= spec["high"]: | |
| decisions.append({"section": section, "score": round(p,3), | |
| "action": spec["actions"]["high"], "severity": "high"}) | |
| elif p >= spec["low"]: | |
| decisions.append({"section": section, "score": round(p,3), | |
| "action": spec["actions"]["low"], "severity": "medium"}) | |
| if not decisions: | |
| return {"action": "allow", "reason": "no policy thresholds met", "decisions": []} | |
| best = max(decisions, key=lambda d: d["score"]) | |
| label_map = { | |
| "toxicity": "harassment/toxicity", | |
| "hate_targeted": "hate speech", | |
| "threats": "violent threats", | |
| "sexual_minors": "child safety" | |
| } | |
| reason = f"Triggered by **{label_map.get(BASE_POLICY[best['section']]['label'],'signal')}** with score {best['score']}" | |
| return {"action": best["action"], "primary": best, "decisions": decisions, "reason": reason} | |
| def moderate(text: str, strictness: float): | |
| if not text or not text.strip(): | |
| return {"error": "Enter some text."}, {} | |
| lang, probs = score_labels(text) | |
| decision = route_policy(probs, scaled_policy(strictness)) | |
| scores_view = {"language": lang, **{k: round(v,3) for k,v in probs.items()}} | |
| return scores_view, decision | |
| # ---------- UI ---------- | |
| examples = [ | |
| ["Wewe ni mjinga kabisa", 1.0], | |
| ["Hawa watu wanastahili kuondolewa kabisa", 1.0], | |
| ["Nitakuchoma ukijaribu tena", 1.0], | |
| ["This child is under 17 and sexy", 1.0], | |
| ["Vaccines cause 5G in Nairobi", 1.0], | |
| ] | |
| with gr.Blocks(analytics_enabled=False) as demo: | |
| gr.Markdown("# Bilingual Moderation (Swahili + English) · Policy-aware Routing") | |
| gr.Markdown("Single multilingual classifier (toxicity) + simple cues for threats/child-safety, " | |
| "with **policy-aware routing**. Use **Strictness** to tune thresholds.") | |
| inp = gr.Textbox(label="Text", lines=4, placeholder="Andika / Type here…") | |
| strict = gr.Slider(0.7, 1.3, value=1.0, step=0.05, label="Strictness (threshold scale)") | |
| btn = gr.Button("Moderate", variant="primary") | |
| scores = gr.JSON(label="Scores (probabilities)") | |
| decision = gr.JSON(label="Policy decision (includes reason)") | |
| inp.submit(moderate, inputs=[inp, strict], outputs=[scores, decision]) | |
| gr.Examples(examples=examples, inputs=[inp, strict], fn=moderate, outputs=[scores, decision], | |
| label="Try examples (auto-run)", cache_examples=False) | |
| btn.click(moderate, inputs=[inp, strict], outputs=[scores, decision]) | |
| demo.queue().launch() | |