jelagat's picture
Update app.py
2a261a2 verified
import re
import gradio as gr
from transformers import pipeline
from langdetect import detect
# ---------- Model (small, CPU-friendly) ----------
# Binary outputs like: [{'label': 'toxic', 'score': ...}, {'label':'not_toxic', ...}]
clf = pipeline(
task="text-classification",
model="citizenlab/distilbert-base-multilingual-cased-toxicity",
top_k=None
)
# Warm-up once so the first click isn't "dead" while weights download
try:
_ = clf("hello")
except Exception:
pass
# ---------- Heuristic cues (demo only) ----------
THREAT_PATTERNS = re.compile(r"\b(kill|stab|shoot|bomb|burn|hang|slap|attack|kuua|kukuchoma|nikukate)\b", re.I)
MINOR_PATTERNS = re.compile(r"\b(minor|under\s?age|under\s?18|child|mtoto|1[0-7]\s?yo|15yo|16yo|17yo)\b", re.I)
# Swahili insult/profanity cues (non-exhaustive)
INSULT_SW = re.compile(
r"\b(mjinga|pumbavu|mshenzi|kumbafu|takataka|malaya|bwege|fala|mbwa|mbuzi|chizi|zezeta)\b",
re.I,
)
# Group / protected-class cues (EN + SW tokens for demo)
HATE_GROUP = re.compile(
r"\b(wanawake|wanaume|wazungu|wachina|wahindi|wasomali|kikuyu|luo|kalenjin|luhya|immigrant|refugee|gay|lesbian|trans|muslim|christian|jew)\b",
re.I,
)
# ---------- Policy thresholds/actions ----------
BASE_POLICY = {
"harassment_toxicity": {"label": "toxicity", "low": 0.40, "high": 0.75,
"actions": {"low": "limit_reach", "high": "remove_and_escalate"}},
"hate_speech": {"label": "hate_targeted", "low": 0.45, "high": 0.70,
"actions": {"low": "limit_reach", "high": "remove_and_escalate"}},
"violent_threats": {"label": "threats", "low": 0.30, "high": 0.60,
"actions": {"low": "escalate_review", "high": "remove_and_escalate"}},
"child_safety": {"label": "sexual_minors", "low": 0.20, "high": 0.40,
"actions": {"low": "escalate_child_safety_team", "high": "remove_and_notify_csirt"}},
}
# ---------- Helpers ----------
def _scores_from_model(text: str):
raw = clf(text)
# Normalize pipeline output into list[dict]
if isinstance(raw, dict):
outs = [raw]
elif isinstance(raw, list):
outs = raw[0] if (len(raw) and isinstance(raw[0], list)) else raw
else:
outs = []
scores = {str(d.get("label", "")).lower(): float(d.get("score", 0.0)) for d in outs if isinstance(d, dict)}
# Robust toxicity from binary labels
if "toxic" in scores:
toxicity = scores["toxic"]
elif "not_toxic" in scores:
toxicity = 1.0 - scores["not_toxic"]
elif "non_toxic" in scores:
toxicity = 1.0 - scores["non_toxic"]
else:
toxicity = max(scores.values()) if scores else 0.0
# This model doesn't output hate/threat explicitly
hate = 0.0
threats = 0.0
return toxicity, hate, threats
def score_labels(text: str):
try:
lang = detect(text)
except Exception:
lang = "unknown"
tox, hate, thr_model = _scores_from_model(text)
thr_rule = 0.9 if THREAT_PATTERNS.search(text or "") else 0.0
minors = 0.9 if MINOR_PATTERNS.search(text or "") else 0.01
probs = {
"toxicity": float(tox),
"hate_targeted": float(hate),
"threats": float(max(thr_model, thr_rule)),
"sexual_minors": float(minors),
"misinfo_flag": 0.0
}
# --- lexical boosts for Swahili/code-switch (demo heuristic) ---
if INSULT_SW.search(text or ""):
probs["toxicity"] = max(probs["toxicity"], 0.75)
# If toxic AND a group term appears → likely hate-targeted
if probs["toxicity"] >= 0.35 and HATE_GROUP.search(text or ""):
probs["hate_targeted"] = max(
probs.get("hate_targeted", 0.0),
min(0.2 + 0.8 * probs["toxicity"], 0.95)
)
return lang, probs
def scaled_policy(scale: float):
pol = {}
for sec, spec in BASE_POLICY.items():
low = min(round(spec["low"] * scale, 3), 0.95)
high = min(round(spec["high"] * scale, 3), 0.99)
pol[sec] = {**spec, "low": low, "high": high}
return pol
def route_policy(probs: dict, policy: dict):
decisions = []
for section, spec in policy.items():
p = probs.get(spec["label"], 0.0)
if p >= spec["high"]:
decisions.append({"section": section, "score": round(p,3),
"action": spec["actions"]["high"], "severity": "high"})
elif p >= spec["low"]:
decisions.append({"section": section, "score": round(p,3),
"action": spec["actions"]["low"], "severity": "medium"})
if not decisions:
return {"action": "allow", "reason": "no policy thresholds met", "decisions": []}
best = max(decisions, key=lambda d: d["score"])
label_map = {
"toxicity": "harassment/toxicity",
"hate_targeted": "hate speech",
"threats": "violent threats",
"sexual_minors": "child safety"
}
reason = f"Triggered by **{label_map.get(BASE_POLICY[best['section']]['label'],'signal')}** with score {best['score']}"
return {"action": best["action"], "primary": best, "decisions": decisions, "reason": reason}
def moderate(text: str, strictness: float):
if not text or not text.strip():
return {"error": "Enter some text."}, {}
lang, probs = score_labels(text)
decision = route_policy(probs, scaled_policy(strictness))
scores_view = {"language": lang, **{k: round(v,3) for k,v in probs.items()}}
return scores_view, decision
# ---------- UI ----------
examples = [
["Wewe ni mjinga kabisa", 1.0],
["Hawa watu wanastahili kuondolewa kabisa", 1.0],
["Nitakuchoma ukijaribu tena", 1.0],
["This child is under 17 and sexy", 1.0],
["Vaccines cause 5G in Nairobi", 1.0],
]
with gr.Blocks(analytics_enabled=False) as demo:
gr.Markdown("# Bilingual Moderation (Swahili + English) · Policy-aware Routing")
gr.Markdown("Single multilingual classifier (toxicity) + simple cues for threats/child-safety, "
"with **policy-aware routing**. Use **Strictness** to tune thresholds.")
inp = gr.Textbox(label="Text", lines=4, placeholder="Andika / Type here…")
strict = gr.Slider(0.7, 1.3, value=1.0, step=0.05, label="Strictness (threshold scale)")
btn = gr.Button("Moderate", variant="primary")
scores = gr.JSON(label="Scores (probabilities)")
decision = gr.JSON(label="Policy decision (includes reason)")
inp.submit(moderate, inputs=[inp, strict], outputs=[scores, decision])
gr.Examples(examples=examples, inputs=[inp, strict], fn=moderate, outputs=[scores, decision],
label="Try examples (auto-run)", cache_examples=False)
btn.click(moderate, inputs=[inp, strict], outputs=[scores, decision])
demo.queue().launch()