grants-rag / app /ingest.py
michaellupo74's picture
feat(tags): global Matthew 25 tagging + expose tags in index
c66bdcc
# app/ingest.py
from __future__ import annotations
import json
from pathlib import Path
from typing import Dict, List, Any, Tuple, Optional
import yaml
import numpy as np
from sentence_transformers import SentenceTransformer
from app.paths import DOCSTORE_DIR, INDEX_DIR
from .normalize import normalize # ← central normalizer
import re
import time
import hashlib
import requests
from bs4 import BeautifulSoup
from datetime import datetime, timezone
from difflib import SequenceMatcher
from urllib.parse import urljoin
from app.tags import infer_matt25_tags # ← NEW
# -------------------- Config --------------------
def load_config(cfg_path: str) -> Dict:
with open(cfg_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
# -------------------- Capacity / Geo Filters (config-driven) --------------------
# controls live in config/sources.yaml:
# filters:
# capacity_only: true
# pa_md_only: false
_INCLUDE_PATTERNS = [re.compile(p, re.I) for p in [
r"\bcapacity(?:[-\s]?building)?\b",
r"\btechnical\s+assistance\b",
r"\bTA\b",
r"\borganizational\s+(capacity|effectiveness|development|readiness|stabilization)\b",
r"\borganization(?:al)?\s+infrastructure\b",
r"\bback[-\s]?office\b|\bbackbone\s+organization\b",
r"\bgovernance\b|\bboard\s+development\b|\bboard\s+training\b",
r"\bpre[-\s]?development\b|\bpredevelopment\b|\bplanning\s+grant\b",
r"\bdata\s+systems?\b|\bCRM\b|\bcase\s+management\b",
r"\b(staff|workforce)\s+capacity\b|\bhire\s+(?:staff|positions?)\b",
r"\bscal(?:e|ing)\s+capacity\b|\bexpand\s+capacity\b",
r"\bnonprofit\b|\bfaith[-\s]?based\b|\bcommunity[-\s]?based\b",
# broaden for transportation / human services
r"\bprovider\s+capacity\b|\bservice\s+capacity\b|\borganizational\s+support\b",
]]
_EXCLUDE_PATTERNS = [re.compile(p, re.I) for p in [
r"\bteaching\s+assistant\b|\bTAs\b",
r"\bbench\s+capacity\b|\bmanufacturing\s+capacity\b(?!.*organiz)",
r"\bclinical\s+trial\b|\blaboratory\s+capacity\b(?!.*community)",
r"\b(postsecondary|university|college)\b(?!.*community\s+partner)",
r"\bconstruction\b(?!.*(admin|organiz|back[-\s]?office|governance|systems))",
]]
_PA_MD_HINTS = re.compile(
r"\b("
r"Pennsylvania|PA\b|Harrisburg|Philadelphia|Allegheny|Montgomery County\b|Pittsburgh|Scranton|Erie|"
r"Maryland|MD\b|Annapolis|Baltimore|Prince\s+George'?s|Howard County\b"
r")\b",
re.I,
)
def _doc_text_from_row(rec: Dict[str, Any]) -> str:
title = rec.get("title") or ""
synopsis = rec.get("synopsis") or rec.get("summary") or ""
agency = rec.get("agency") or ""
eligibility = rec.get("eligibility") or ""
categories = " ".join(rec.get("categories") or []) if isinstance(rec.get("categories"), list) else (rec.get("categories") or "")
geo = rec.get("geo") or ""
return "\n".join([title, synopsis, agency, eligibility, categories, geo]).strip()
def _is_capacity_building_text(text: str) -> bool:
if not text:
return False
if any(p.search(text) for p in _EXCLUDE_PATTERNS):
return False
return any(p.search(text) for p in _INCLUDE_PATTERNS)
def _is_pa_md_text(text: str) -> bool:
if not text:
return False
return bool(_PA_MD_HINTS.search(text))
# -------------------- Deadline parsing helpers --------------------
# Common date shapes: "October 15, 2025", "Oct 15, 2025", "10/15/2025", "2025-10-15"
_MONTHS = {
"january":1, "jan":1, "february":2, "feb":2, "march":3, "mar":3, "april":4, "apr":4,
"may":5, "june":6, "jun":6, "july":7, "jul":7, "august":8, "aug":8, "september":9, "sep":9, "sept":9,
"october":10, "oct":10, "november":11, "nov":11, "december":12, "dec":12
}
# Capture label + date snippets
_DEADLINE_LINES = re.compile(
r"(deadline|applications?\s+due|closing\s+date|due\s+date|submission\s+deadline)\s*[:\-]?\s*(.+)",
re.I
)
# Capture absolute dates
_DATE_ISO = re.compile(r"\b(20\d{2})[-/\.](\d{1,2})[-/\.](\d{1,2})\b") # 2025-10-15 or 2025/10/15
_DATE_US = re.compile(r"\b(\d{1,2})[-/\.](\d{1,2})[-/\.](20\d{2})\b") # 10/15/2025
_DATE_LONG = re.compile(
r"\b([A-Za-z]{3,9})\s+(\d{1,2})(?:st|nd|rd|th)?\,?\s+(20\d{2})\b", re.I) # Oct 15, 2025 / October 15 2025
def _to_iso(y:int, m:int, d:int) -> Optional[str]:
try:
return datetime(y, m, d, tzinfo=timezone.utc).date().isoformat()
except Exception:
return None
def _parse_date_fragment(s: str) -> Optional[str]:
s = (s or "").strip()
# ISO first
m = _DATE_ISO.search(s)
if m:
y, mm, dd = int(m.group(1)), int(m.group(2)), int(m.group(3))
return _to_iso(y, mm, dd)
# US 10/15/2025
m = _DATE_US.search(s)
if m:
mm, dd, y = int(m.group(1)), int(m.group(2)), int(m.group(3))
return _to_iso(y, mm, dd)
# Long months
m = _DATE_LONG.search(s)
if m:
mon = m.group(1).lower()
dd = int(m.group(2))
y = int(m.group(3))
mm = _MONTHS.get(mon)
if mm:
return _to_iso(y, mm, dd)
return None
def _extract_deadline(text: str) -> Tuple[Optional[str], Optional[str]]:
"""
Try to locate a deadline-like line, then parse a date.
Returns (deadline_iso, raw_snippet) or (None, None).
"""
if not text:
return None, None
# Look for labeled lines first (strong signal)
for line in text.splitlines():
m = _DEADLINE_LINES.search(line)
if m:
iso = _parse_date_fragment(m.group(2))
if iso:
return iso, line.strip()
# Fallback: scan whole text for any date (first match)
iso = _parse_date_fragment(text)
if iso:
return iso, None
return None, None
def _compute_is_active(deadline_iso: Optional[str]) -> bool:
if not deadline_iso:
return True # treat unknown/TBD as active
try:
return datetime.fromisoformat(deadline_iso).date() >= datetime.utcnow().date()
except Exception:
return True
def _attach_matt25_tags(rec: Dict[str, Any]) -> None:
"""Add Matthew 25 tags by inspecting core text fields; preserve any manual tags."""
blob = " ".join(filter(None, [
rec.get("title", ""),
rec.get("synopsis") or rec.get("summary") or "",
rec.get("eligibility", ""),
rec.get("agency", ""),
]))
manual = rec.get("tags") or []
if not isinstance(manual, list):
manual = [manual]
auto = infer_matt25_tags(blob)
rec["tags"] = sorted(set(manual) | set(auto))
# -------------------- Grants.gov collector --------------------
def _collect_from_grantsgov_api(src: Dict) -> List[Dict[str, Any]]:
"""
Calls the Grants.gov Search2 client and returns a list of RAW dicts
(adapter may already be close to unified; we'll still run normalize()).
"""
from app.sources.grantsgov_api import search_grants # local import to avoid cycles
api = src.get("api", {})
page_size = int(api.get("page_size", src.get("page_size", 100)))
max_pages = int(api.get("max_pages", src.get("max_pages", 5)))
payload = api.get("payload", src.get("payload", {}))
url = src.get("url", "")
out = search_grants(url, payload, page_size=page_size, max_pages=max_pages)
hits = out.get("hits", []) if isinstance(out, dict) else (out or [])
return [h for h in hits if isinstance(h, dict)]
# -------------------- HTTP helpers --------------------
_HTTP_HEADERS = {
"User-Agent": "grants-rag/1.0 (+https://example.local) requests",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}
def _http_get(url: str, timeout: int = 20) -> Optional[requests.Response]:
try:
r = requests.get(url, headers=_HTTP_HEADERS, timeout=timeout)
if r.status_code == 200 and r.content:
return r
except requests.RequestException:
return None
return None
def _soup(html: str) -> BeautifulSoup:
# use lxml or html5lib if available for robustness
return BeautifulSoup(html, "lxml")
def _text_from_soup(s: BeautifulSoup, selectors: Optional[List[str]] = None) -> Tuple[str, str]:
"""
Returns (title, text). Uses selectors if provided;
falls back to common content containers.
"""
title = s.title.string.strip() if s.title and s.title.string else ""
nodes = []
if selectors:
for css in selectors:
nodes.extend(s.select(css) or [])
if not nodes:
for css in ("main", "article", "#content", ".content", "[role='main']"):
nodes.extend(s.select(css) or [])
if not nodes:
nodes = [s.body] if s.body else []
parts: List[str] = []
for n in nodes:
if not n:
continue
txt = n.get_text(separator="\n", strip=True)
if txt:
parts.append(txt)
body = "\n\n".join(parts).strip()
return title, body
def _make_id(*fields: str) -> str:
h = hashlib.sha1()
for f in fields:
if f:
h.update(f.encode("utf-8", "ignore"))
h.update(b"|")
return h.hexdigest()
def _normalize_web_record(
source_name: str,
url: str,
title: str,
body: str,
static: Dict[str, Any],
extra: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Produce a record shaped like normalize() output so downstream stays unchanged.
Adds parsed deadline + is_active when available.
"""
extra = extra or {}
# If caller didn't pass a deadline, try to parse from body/title here.
deadline_iso = extra.get("deadline")
deadline_text = extra.get("deadline_text")
if not deadline_iso:
# Try parsing from body first, then from title.
deadline_iso, deadline_text = _extract_deadline(body) or (None, None)
if not deadline_iso and title:
deadline_iso, _ = _extract_deadline(title) or (None, None)
rec = {
"id": extra.get("id") or _make_id(url, title or body[:160]),
"title": title or extra.get("title") or url,
"synopsis": (body or "")[:2000], # clip; embeddings use title+synopsis later
"summary": None,
"url": url,
"source": source_name,
"geo": static.get("geo"),
"categories": static.get("categories"),
"agency": extra.get("agency", ""),
"eligibility": extra.get("eligibility", ""),
# Store ISO (YYYY-MM-DD) in 'deadline' for consistency
"deadline": deadline_iso,
"deadline_text": deadline_text, # keep the raw line we matched (if any)
"program_number": extra.get("program_number"),
"posted_date": extra.get("posted_date"),
}
rec["is_active"] = _compute_is_active(rec["deadline"])
# NEW: add Matthew 25 tags based on title/synopsis/eligibility text
_attach_matt25_tags(rec)
return rec
# -------------------- Collectors: http_html / web_page --------------------
def _collect_from_http_html(entry: Dict, source_name: str, static: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Supports types: 'web_page' and 'http_html'
Config keys supported:
- url (str)
- parse: { follow_links: bool, link_selectors: [..], content_selectors: [..] }
- crawl: { schedule: "...", max_depth: int } # max_depth 0/None = only landing
"""
url = entry.get("url")
if not url:
return []
r = _http_get(url)
if not r:
return []
s = _soup(r.text)
parse = entry.get("parse", {}) or entry.get("extract", {}) or {}
content_selectors = parse.get("content_selectors") or []
title, body = _text_from_soup(s, content_selectors)
rows = []
rows.append(_normalize_web_record(source_name, url, title, body, static, extra={"posted_date": None}))
# follow links?
follow = bool(parse.get("follow_links"))
link_selectors = parse.get("link_selectors") or []
crawl = entry.get("crawl", {}) or {}
max_depth = int(crawl.get("max_depth", 0) or 0)
visited = set([url])
def _enq_links(soup: BeautifulSoup) -> List[str]:
if link_selectors:
links = []
for sel in link_selectors:
for a in soup.select(sel) or []:
href = a.get("href")
if href and href.startswith("http"):
links.append(href)
out, seen = [], set()
for h in links:
if h not in seen:
out.append(h)
seen.add(h)
return out[:40] # polite cap
return []
if follow and max_depth > 0:
frontier = _enq_links(s)
depth = 1
while frontier and depth <= max_depth and len(rows) < 200:
next_frontier = []
for link in frontier:
if link in visited:
continue
visited.add(link)
rr = _http_get(link)
if not rr:
continue
ss = _soup(rr.text)
t2, b2 = _text_from_soup(ss, content_selectors)
if b2:
rows.append(_normalize_web_record(source_name, link, t2, b2, static, extra={"posted_date": None}))
if depth < max_depth:
next_frontier.extend(_enq_links(ss))
time.sleep(0.1) # gentle
frontier = next_frontier
depth += 1
return rows
# -------------------- Collector: http_html_js (Playwright) --------------------
def _collect_from_http_html_js(entry: Dict, source_name: str, static: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
JS-rendered pages using Playwright, with per-card extraction and robust scrolling.
entry.options:
- wait_for (css or ms int)
- scroll (bool)
- scroll_selector (css) # NEW: scroll a container div, not the window
- scroll_times (int) # NEW: default 20
- scroll_wait_ms (int) # NEW: default 400
- min_cards (int) # NEW: wait until at least N cards exist
- click_selector (css)
- max_pages (int)
- timeout_ms (int)
- network_idle (bool)
- debug (bool)
entry.selectors: card, title, link, description, meta
"""
try:
from playwright.sync_api import sync_playwright # type: ignore
except Exception:
print(f"[collect][skip] {source_name}: Playwright not installed.")
return []
url = entry.get("url")
if not url:
return []
options = entry.get("options", {}) or {}
parse = entry.get("parse", {}) or entry.get("extract", {}) or {}
selectors = entry.get("selectors", {}) or {}
content_selectors = parse.get("content_selectors") or []
timeout_ms = int(options.get("timeout_ms", 6000))
network_idle = bool(options.get("network_idle", True))
debug = bool(options.get("debug", False))
max_pages = int(options.get("max_pages", 1))
click_sel = options.get("click_selector") or entry.get("next_selector")
wait_for = options.get("wait_for")
rows: List[Dict[str, Any]] = []
def _text_first(soup: BeautifulSoup, css_list: str) -> str:
if not css_list:
return ""
for css in [c.strip() for c in css_list.split(",")]:
el = soup.select_one(css)
if el:
txt = el.get_text(separator=" ", strip=True)
if txt:
return txt
return ""
def _attr_first(soup: BeautifulSoup, css_list: str, attr: str) -> Optional[str]:
if not css_list:
return None
for css in [c.strip() for c in css_list.split(",")]:
el = soup.select_one(css)
if el:
val = el.get(attr)
if val:
return val
return None
def _parse_cards(page_html: str, base_url: str) -> List[Dict[str, Any]]:
s = _soup(page_html)
card_css = selectors.get("card", "")
if not card_css:
title, body = _text_from_soup(s, content_selectors)
return [_normalize_web_record(source_name, base_url, title, body, static, extra={"posted_date": None})]
out: List[Dict[str, Any]] = []
for card in s.select(card_css) or []:
csoup = BeautifulSoup(str(card), "lxml")
title = _text_first(csoup, selectors.get("title", "h1, h2, h3"))
href = _attr_first(csoup, selectors.get("link", "a"), "href")
link = urljoin(base_url, href) if href else base_url
desc = _text_first(csoup, selectors.get("description", "p, .summary, .excerpt, .card-text"))
meta = _text_first(csoup, selectors.get("meta", ".meta, .tags, .badge, .location"))
body = "\n".join([p for p in (desc, meta) if p]).strip()
if not (title or body):
continue
out.append(_normalize_web_record(source_name, link, title or link, body, static, extra={"posted_date": None}))
return out
def _wait_page_ready(page, *, wait_for, timeout_ms, options, selectors):
# wait_for can be CSS or milliseconds
if isinstance(wait_for, int):
page.wait_for_timeout(wait_for)
elif isinstance(wait_for, str) and wait_for:
page.wait_for_selector(wait_for, timeout=min(timeout_ms, 15000))
# Scroll window or a container div
if options.get("scroll"):
scroll_sel = options.get("scroll_selector")
scroll_times = int(options.get("scroll_times", 20))
scroll_wait = int(options.get("scroll_wait_ms", 400))
if scroll_sel:
page.evaluate(
"""(sel, times, wait) => new Promise(res => {
const el = document.querySelector(sel);
if (!el) { res(); return; }
let i = 0;
const t = setInterval(() => {
const y = el.scrollTop;
el.scrollTop = el.scrollHeight;
i++;
if (el.scrollTop === y || i >= times) { clearInterval(t); res(); }
}, wait);
})""",
scroll_sel, scroll_times, scroll_wait
)
else:
page.evaluate(
"""(times, wait) => new Promise(res => {
let i = 0;
const t = setInterval(() => {
const y = window.scrollY;
window.scrollBy(0, document.body.scrollHeight);
i++;
if (window.scrollY === y || i >= times) { clearInterval(t); res(); }
}, wait);
})""",
scroll_times, scroll_wait
)
# Optionally wait for a minimum number of cards (virtualized lists)
min_cards = int(options.get("min_cards", 0))
card_css = (selectors or {}).get("card", "")
if min_cards and card_css:
try:
page.wait_for_function(
"""([sel, target]) => {
const els = document.querySelectorAll(sel);
return els && els.length >= target;
}""",
arg=[card_css, min_cards],
timeout=min(timeout_ms, 15000),
)
except Exception:
pass # best-effort
def _try_once(p):
def _route(route):
r = route.request
if r.resource_type in {"image", "media", "font"}:
return route.abort()
return route.continue_()
browser = p.chromium.launch(headless=not debug)
context = browser.new_context(user_agent=(
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0 Safari/537.36"
))
context.route("**/*", _route)
page = context.new_page()
try:
page.set_default_timeout(timeout_ms)
page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms)
if network_idle:
page.wait_for_load_state("networkidle", timeout=min(timeout_ms, 15000))
_wait_page_ready(page, wait_for=wait_for, timeout_ms=timeout_ms, options=options, selectors=selectors)
htmls = [page.content()]
# pagination
for _ in range(max_pages - 1):
sel = click_sel
if not sel or page.locator(sel).count() == 0:
break
page.click(sel)
page.wait_for_load_state("domcontentloaded")
if network_idle:
page.wait_for_load_state("networkidle", timeout=min(timeout_ms, 15000))
_wait_page_ready(page, wait_for=wait_for, timeout_ms=timeout_ms, options=options, selectors=selectors)
htmls.append(page.content())
for html in htmls:
found = _parse_cards(html, url)
rows.extend(found)
print(f"[collect][cards] {source_name}: found {len(found)} cards on this page")
browser.close()
return True
except Exception as e:
if debug:
try:
snap = DOCSTORE_DIR / f"playwright_error_{hashlib.sha1(url.encode()).hexdigest()}.png"
page.screenshot(path=str(snap))
print(f"[collect][debug] Saved screenshot: {snap}")
except Exception:
pass
print(f"[collect][warn] {source_name}: {e.__class__.__name__}: {e}")
try:
browser.close()
except Exception:
pass
return False
from playwright.sync_api import sync_playwright # late import for clarity
with sync_playwright() as p:
ok = _try_once(p)
if not ok:
time.sleep(1.5)
_try_once(p)
if not rows:
print(f"[collect][skip] {source_name}: no content after retries.")
return rows
# -------------------- PDF collector --------------------
def _collect_from_http_pdf(entry: Dict, source_name: str, static: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
type: 'http_pdf'
keys:
- url (single PDF fetch)
"""
url = entry.get("url")
if not url:
return []
try:
from pdfminer.high_level import extract_text # lazy import
except Exception:
return []
rows = []
r = _http_get(url, timeout=40)
if not r:
return rows
tmp = DOCSTORE_DIR / (hashlib.sha1(url.encode("utf-8")).hexdigest() + ".pdf")
try:
DOCSTORE_DIR.mkdir(parents=True, exist_ok=True)
tmp.write_bytes(r.content)
body = extract_text(str(tmp)) or ""
finally:
try:
tmp.unlink(missing_ok=True)
except Exception:
pass
title = entry.get("name") or "PDF Document"
rows.append(_normalize_web_record(source_name, url, title, body, static, extra={"posted_date": None}))
return rows
# -------------------- De-dup helpers (5.2) --------------------
def _norm(text: str) -> str:
t = (text or "").lower()
t = re.sub(r'[^a-z0-9 ]+', ' ', t)
return re.sub(r'\s+', ' ', t).strip()
def _hash_fingerprint(title: str, agency: str, deadline: str) -> str:
base = f"{_norm(title)}|{_norm(agency)}|{deadline or ''}"
return hashlib.sha1(base.encode()).hexdigest()
def _near_duplicate(a: Dict[str, Any], b: Dict[str, Any]) -> bool:
# Deadlines equal or both missing
dates_close = (a.get("deadline") == b.get("deadline")) or (not a.get("deadline") and not b.get("deadline"))
t_sim = SequenceMatcher(None, _norm(a.get("title","")), _norm(b.get("title",""))).ratio()
ag_sim = SequenceMatcher(None, _norm(a.get("agency","")), _norm(b.get("agency",""))).ratio()
return dates_close and (t_sim > 0.88) and (ag_sim > 0.75)
def _merge_records(primary: Dict[str, Any], other: Dict[str, Any]) -> Dict[str, Any]:
"""Merge fields while preserving best data and provenance."""
merged = dict(primary)
# Prefer non-empty fields; combine categories; keep earliest posted_date; keep earliest deadline if different.
def choose(a, b):
return a if (a not in (None, "", [], {})) else b
merged["url"] = choose(primary.get("url"), other.get("url"))
merged["title"] = choose(primary.get("title"), other.get("title"))
merged["synopsis"] = choose(primary.get("synopsis"), other.get("synopsis"))
merged["summary"] = choose(primary.get("summary"), other.get("summary"))
merged["agency"] = choose(primary.get("agency"), other.get("agency"))
merged["eligibility"] = choose(primary.get("eligibility"), other.get("eligibility"))
merged["program_number"] = choose(primary.get("program_number"), other.get("program_number"))
merged["geo"] = choose(primary.get("geo"), other.get("geo"))
# categories β†’ union (list)
cats_a = primary.get("categories") or []
cats_b = other.get("categories") or []
if not isinstance(cats_a, list): cats_a = [cats_a]
if not isinstance(cats_b, list): cats_b = [cats_b]
merged["categories"] = sorted(set([c for c in cats_a + cats_b if c]))
# deadline: choose earlier known date (safer to surface sooner one)
da, db = primary.get("deadline"), other.get("deadline")
if da and db:
merged["deadline"] = min(da, db)
else:
merged["deadline"] = da or db
# carry a deadline_text if any
merged["deadline_text"] = choose(primary.get("deadline_text"), other.get("deadline_text"))
merged["is_active"] = _compute_is_active(merged.get("deadline"))
# posted_date: keep earliest if both
pa, pb = primary.get("posted_date"), other.get("posted_date")
merged["posted_date"] = min(pa, pb) if (pa and pb) else (pa or pb)
# provenance: combine sources + urls
prov_sources = set()
for s in (primary.get("source"), other.get("source")):
if not s: continue
if isinstance(s, list): prov_sources.update(s)
else: prov_sources.add(s)
merged["source"] = sorted(prov_sources) if prov_sources else None
prov_urls = set()
for u in (primary.get("url"), other.get("url")):
if u: prov_urls.add(u)
# keep a list of all discovered urls
merged["all_urls"] = sorted(prov_urls.union(set(primary.get("all_urls") or []), set(other.get("all_urls") or [])))
# recompute ID based on merged fingerprint (title/agency/deadline)
merged["id"] = _hash_fingerprint(merged.get("title",""), merged.get("agency",""), merged.get("deadline",""))
return merged
def _dedupe_and_merge(rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Exact fingerprint + fuzzy near-dup consolidation across sources."""
uniques: List[Dict[str, Any]] = []
by_fp: Dict[str, int] = {}
for r in rows:
fp = _hash_fingerprint(r.get("title",""), r.get("agency",""), r.get("deadline",""))
if fp in by_fp:
# exact dup: merge into existing
idx = by_fp[fp]
uniques[idx] = _merge_records(uniques[idx], r)
continue
# fuzzy check against current uniques
found_idx = None
for i, u in enumerate(uniques):
if _near_duplicate(r, u):
found_idx = i
break
if found_idx is not None:
uniques[found_idx] = _merge_records(uniques[found_idx], r)
# also index its fingerprint (so later exact matches land here)
new_fp = _hash_fingerprint(uniques[found_idx].get("title",""),
uniques[found_idx].get("agency",""),
uniques[found_idx].get("deadline",""))
by_fp[new_fp] = found_idx
else:
by_fp[fp] = len(uniques)
# initialize provenance
r.setdefault("all_urls", [r.get("url")] if r.get("url") else [])
uniques.append(r)
return uniques
# -------------------- Write docstore & build index --------------------
def _save_docstore(recs: List[Dict, Any]) -> str:
DOCSTORE_DIR.mkdir(parents=True, exist_ok=True)
path = DOCSTORE_DIR / "docstore.jsonl"
with path.open("w", encoding="utf-8") as f:
for r in recs:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
return str(path)
def _build_index_from_docstore() -> int:
ds_path = DOCSTORE_DIR / "docstore.jsonl"
if not ds_path.exists():
raise RuntimeError("Docstore not found. Run ingest first.")
texts: List[str] = []
metas: List[Dict[str, Any]] = []
with ds_path.open("r", encoding="utf-8") as f:
for line in f:
rec = json.loads(line)
title = rec.get("title") or ""
synopsis = rec.get("synopsis") or rec.get("summary") or ""
agency = rec.get("agency") or ""
eligibility = rec.get("eligibility") or ""
txt = "\n".join([title, synopsis, agency, eligibility]).strip()
if not txt:
continue
texts.append(txt)
metas.append({
"id": rec.get("id"),
"title": title,
"url": rec.get("url"),
"source": rec.get("source"),
"tags": rec.get("tags"),
"geo": rec.get("geo"),
"categories": rec.get("categories"),
"agency": agency,
"deadline": rec.get("deadline"), # ISO if available
"deadline_text": rec.get("deadline_text"),
"is_active": rec.get("is_active"),
"program_number": rec.get("program_number"),
"posted_date": rec.get("posted_date"),
"all_urls": rec.get("all_urls"),
})
print(f"[index] Rows loaded from docstore: {len(texts)}")
if not texts:
INDEX_DIR.mkdir(parents=True, exist_ok=True)
(INDEX_DIR / "meta.json").write_text(json.dumps([], ensure_ascii=False))
print("[index] No texts to embed. Wrote empty meta.json.")
return 0
# Embed (CPU default; portable)
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
model.max_seq_length = 256
batch = max(8, min(32, len(texts)))
emb = model.encode(
texts,
convert_to_numpy=True,
normalize_embeddings=True,
show_progress_bar=True,
batch_size=batch,
).astype(np.float32, copy=False)
# FAISS index (Inner Product for cosine on normalized vectors)
import faiss
dim = emb.shape[1]
index = faiss.IndexFlatIP(dim)
index.add(emb)
INDEX_DIR.mkdir(parents=True, exist_ok=True)
faiss.write_index(index, str(INDEX_DIR / "faiss.index"))
(INDEX_DIR / "meta.json").write_text(json.dumps(metas, ensure_ascii=False))
print(f"[index] Wrote FAISS index with {emb.shape[0]} vectors (dim={dim}).")
return len(texts)
# -------------------- Public API: ingest --------------------
__all__ = ["ingest"]
def ingest(cfg_path: str = "config/sources.yaml", env: Dict | None = None):
"""
Reads config, fetches from enabled sources via adapters, normalizes to a single schema,
applies filters (capacity / PA-MD), de-dupes, writes docstore, and builds the FAISS index.
Returns (docstore_path, n_indexed).
"""
cfg = load_config(cfg_path)
# ---- Filters from config ----
f_cfg = (cfg or {}).get("filters", {}) or {}
capacity_only = bool(f_cfg.get("capacity_only", True))
pa_md_only = bool(f_cfg.get("pa_md_only", False))
print(f"[filters] capacity_only = {'TRUE' if capacity_only else 'FALSE'}")
print(f"[filters] pa_md_only = {'TRUE' if pa_md_only else 'FALSE'}")
all_rows: List[Dict[str, Any]] = []
for entry in cfg.get("sources", []):
if not entry.get("enabled"):
continue
name = entry.get("name", "<source>")
geo = entry.get("geo") or "US"
cats = entry.get("categories") or []
static = {"geo": geo, "categories": cats}
typ = entry.get("type")
rows: List[Dict[str, Any]] = []
# -------- Collect from each adapter --------
if typ == "grantsgov_api":
raw_hits = _collect_from_grantsgov_api(entry)
rows = [normalize("grants_gov", h, static) for h in raw_hits]
elif typ in ("web_page", "http_html"):
rows = _collect_from_http_html(entry, name, static)
elif typ == "http_html_js":
rows = _collect_from_http_html_js(entry, name, static)
elif typ == "http_pdf":
rows = _collect_from_http_pdf(entry, name, static)
elif typ == "local_sample":
p = Path(entry["path"]).expanduser()
blob = json.loads(p.read_text(encoding="utf-8"))
items = blob.get("opportunities") or []
rows = [normalize("local_sample", op, static) for op in items]
else:
print(f"[collect] {name}: unknown type '{typ}', skipping.")
continue
print(f"[collect] {name}: fetched_rows={len(rows)}")
# ---- Apply capacity / geo filters BEFORE indexing (allow per-source bypass) ----
if rows:
if entry.get("skip_filters"):
print(f"[filter] {name}: skip_filters=true β†’ keeping all {len(rows)}")
else:
pre = len(rows)
filtered = []
for r in rows:
t = _doc_text_from_row(r)
if capacity_only and not _is_capacity_building_text(t):
continue
if pa_md_only and not _is_pa_md_text(t):
continue
filtered.append(r)
print(
f"[filter] {name}: kept {len(filtered)}/{pre} after filters "
f"(capacity_only={capacity_only}, pa_md_only={pa_md_only})"
)
rows = filtered
# ---- Apply capacity / geo filters BEFORE indexing (allow per-source bypass) ----
if rows:
if entry.get("skip_filters"):
print(f"[filter] {name}: skip_filters=true β†’ keeping all {len(rows)}")
else:
pre = len(rows)
filtered = []
for r in rows:
t = _doc_text_from_row(r)
if capacity_only and not _is_capacity_building_text(t):
continue
if pa_md_only and not _is_pa_md_text(t):
continue
filtered.append(r)
print(
f"[filter] {name}: kept {len(filtered)}/{pre} after filters "
f"(capacity_only={capacity_only}, pa_md_only={pa_md_only})"
)
rows = filtered
# >>> GLOBAL MATT 25 TAGGING (safer, one place) <<<
for r in rows:
# If you truly want to leave Grants.gov untouched, keep this guard:
if r.get("source") == "grants_gov":
continue
_attach_matt25_tags(r)
print(f"[collect] {name} β†’ rows_after_filters={len(rows)}")
all_rows.extend(rows)
# ---- Cross-source DEDUPE + MERGE ----
unique = _dedupe_and_merge(all_rows)
print(f"[ingest] Unique records to index: {len(unique)}")
path = _save_docstore(unique)
n = _build_index_from_docstore()
return path, n
# -------------------- CLI --------------------
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("--config", default="config/sources.yaml")
args = ap.parse_args()
p, n = ingest(args.config)
print(f"Ingested {n} records. Docstore at {p}")