grants-rag / app /ingest.py
michaellupo74's picture
feat(ingest): JS card/grid + scroll container + skip_filters
b53e303
raw
history blame
34.1 kB
# app/ingest.py
from __future__ import annotations
import json
from pathlib import Path
from typing import Dict, List, Any, Tuple, Optional
import yaml
import numpy as np
from sentence_transformers import SentenceTransformer
from app.paths import DOCSTORE_DIR, INDEX_DIR
from .normalize import normalize # ← central normalizer
import re
import time
import hashlib
import requests
from bs4 import BeautifulSoup
from datetime import datetime, timezone
from difflib import SequenceMatcher
from urllib.parse import urljoin
# -------------------- Config --------------------
def load_config(cfg_path: str) -> Dict:
with open(cfg_path, "r", encoding="utf-8") as f:
return yaml.safe_load(f)
# -------------------- Capacity / Geo Filters (config-driven) --------------------
# controls live in config/sources.yaml:
# filters:
# capacity_only: true
# pa_md_only: false
_INCLUDE_PATTERNS = [re.compile(p, re.I) for p in [
r"\bcapacity(?:[-\s]?building)?\b",
r"\btechnical\s+assistance\b",
r"\bTA\b",
r"\borganizational\s+(capacity|effectiveness|development|readiness|stabilization)\b",
r"\borganization(?:al)?\s+infrastructure\b",
r"\bback[-\s]?office\b|\bbackbone\s+organization\b",
r"\bgovernance\b|\bboard\s+development\b|\bboard\s+training\b",
r"\bpre[-\s]?development\b|\bpredevelopment\b|\bplanning\s+grant\b",
r"\bdata\s+systems?\b|\bCRM\b|\bcase\s+management\b",
r"\b(staff|workforce)\s+capacity\b|\bhire\s+(?:staff|positions?)\b",
r"\bscal(?:e|ing)\s+capacity\b|\bexpand\s+capacity\b",
r"\bnonprofit\b|\bfaith[-\s]?based\b|\bcommunity[-\s]?based\b",
# broaden for transportation / human services
r"\bprovider\s+capacity\b|\bservice\s+capacity\b|\borganizational\s+support\b",
]]
_EXCLUDE_PATTERNS = [re.compile(p, re.I) for p in [
r"\bteaching\s+assistant\b|\bTAs\b",
r"\bbench\s+capacity\b|\bmanufacturing\s+capacity\b(?!.*organiz)",
r"\bclinical\s+trial\b|\blaboratory\s+capacity\b(?!.*community)",
r"\b(postsecondary|university|college)\b(?!.*community\s+partner)",
r"\bconstruction\b(?!.*(admin|organiz|back[-\s]?office|governance|systems))",
]]
_PA_MD_HINTS = re.compile(
r"\b("
r"Pennsylvania|PA\b|Harrisburg|Philadelphia|Allegheny|Montgomery County\b|Pittsburgh|Scranton|Erie|"
r"Maryland|MD\b|Annapolis|Baltimore|Prince\s+George'?s|Howard County\b"
r")\b",
re.I,
)
def _doc_text_from_row(rec: Dict[str, Any]) -> str:
title = rec.get("title") or ""
synopsis = rec.get("synopsis") or rec.get("summary") or ""
agency = rec.get("agency") or ""
eligibility = rec.get("eligibility") or ""
categories = " ".join(rec.get("categories") or []) if isinstance(rec.get("categories"), list) else (rec.get("categories") or "")
geo = rec.get("geo") or ""
return "\n".join([title, synopsis, agency, eligibility, categories, geo]).strip()
def _is_capacity_building_text(text: str) -> bool:
if not text:
return False
if any(p.search(text) for p in _EXCLUDE_PATTERNS):
return False
return any(p.search(text) for p in _INCLUDE_PATTERNS)
def _is_pa_md_text(text: str) -> bool:
if not text:
return False
return bool(_PA_MD_HINTS.search(text))
# -------------------- Deadline parsing helpers --------------------
# Common date shapes: "October 15, 2025", "Oct 15, 2025", "10/15/2025", "2025-10-15"
_MONTHS = {
"january":1, "jan":1, "february":2, "feb":2, "march":3, "mar":3, "april":4, "apr":4,
"may":5, "june":6, "jun":6, "july":7, "jul":7, "august":8, "aug":8, "september":9, "sep":9, "sept":9,
"october":10, "oct":10, "november":11, "nov":11, "december":12, "dec":12
}
# Capture label + date snippets
_DEADLINE_LINES = re.compile(
r"(deadline|applications?\s+due|closing\s+date|due\s+date|submission\s+deadline)\s*[:\-]?\s*(.+)",
re.I
)
# Capture absolute dates
_DATE_ISO = re.compile(r"\b(20\d{2})[-/\.](\d{1,2})[-/\.](\d{1,2})\b") # 2025-10-15 or 2025/10/15
_DATE_US = re.compile(r"\b(\d{1,2})[-/\.](\d{1,2})[-/\.](20\d{2})\b") # 10/15/2025
_DATE_LONG = re.compile(
r"\b([A-Za-z]{3,9})\s+(\d{1,2})(?:st|nd|rd|th)?\,?\s+(20\d{2})\b", re.I) # Oct 15, 2025 / October 15 2025
def _to_iso(y:int, m:int, d:int) -> Optional[str]:
try:
return datetime(y, m, d, tzinfo=timezone.utc).date().isoformat()
except Exception:
return None
def _parse_date_fragment(s: str) -> Optional[str]:
s = (s or "").strip()
# ISO first
m = _DATE_ISO.search(s)
if m:
y, mm, dd = int(m.group(1)), int(m.group(2)), int(m.group(3))
return _to_iso(y, mm, dd)
# US 10/15/2025
m = _DATE_US.search(s)
if m:
mm, dd, y = int(m.group(1)), int(m.group(2)), int(m.group(3))
return _to_iso(y, mm, dd)
# Long months
m = _DATE_LONG.search(s)
if m:
mon = m.group(1).lower()
dd = int(m.group(2))
y = int(m.group(3))
mm = _MONTHS.get(mon)
if mm:
return _to_iso(y, mm, dd)
return None
def _extract_deadline(text: str) -> Tuple[Optional[str], Optional[str]]:
"""
Try to locate a deadline-like line, then parse a date.
Returns (deadline_iso, raw_snippet) or (None, None).
"""
if not text:
return None, None
# Look for labeled lines first (strong signal)
for line in text.splitlines():
m = _DEADLINE_LINES.search(line)
if m:
iso = _parse_date_fragment(m.group(2))
if iso:
return iso, line.strip()
# Fallback: scan whole text for any date (first match)
iso = _parse_date_fragment(text)
if iso:
return iso, None
return None, None
def _compute_is_active(deadline_iso: Optional[str]) -> bool:
if not deadline_iso:
return True # treat unknown/TBD as active
try:
return datetime.fromisoformat(deadline_iso).date() >= datetime.utcnow().date()
except Exception:
return True
# -------------------- Grants.gov collector --------------------
def _collect_from_grantsgov_api(src: Dict) -> List[Dict[str, Any]]:
"""
Calls the Grants.gov Search2 client and returns a list of RAW dicts
(adapter may already be close to unified; we'll still run normalize()).
"""
from app.sources.grantsgov_api import search_grants # local import to avoid cycles
api = src.get("api", {})
page_size = int(api.get("page_size", src.get("page_size", 100)))
max_pages = int(api.get("max_pages", src.get("max_pages", 5)))
payload = api.get("payload", src.get("payload", {}))
url = src.get("url", "")
out = search_grants(url, payload, page_size=page_size, max_pages=max_pages)
hits = out.get("hits", []) if isinstance(out, dict) else (out or [])
return [h for h in hits if isinstance(h, dict)]
# -------------------- HTTP helpers --------------------
_HTTP_HEADERS = {
"User-Agent": "grants-rag/1.0 (+https://example.local) requests",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
}
def _http_get(url: str, timeout: int = 20) -> Optional[requests.Response]:
try:
r = requests.get(url, headers=_HTTP_HEADERS, timeout=timeout)
if r.status_code == 200 and r.content:
return r
except requests.RequestException:
return None
return None
def _soup(html: str) -> BeautifulSoup:
# use lxml or html5lib if available for robustness
return BeautifulSoup(html, "lxml")
def _text_from_soup(s: BeautifulSoup, selectors: Optional[List[str]] = None) -> Tuple[str, str]:
"""
Returns (title, text). Uses selectors if provided;
falls back to common content containers.
"""
title = s.title.string.strip() if s.title and s.title.string else ""
nodes = []
if selectors:
for css in selectors:
nodes.extend(s.select(css) or [])
if not nodes:
for css in ("main", "article", "#content", ".content", "[role='main']"):
nodes.extend(s.select(css) or [])
if not nodes:
nodes = [s.body] if s.body else []
parts: List[str] = []
for n in nodes:
if not n:
continue
txt = n.get_text(separator="\n", strip=True)
if txt:
parts.append(txt)
body = "\n\n".join(parts).strip()
return title, body
def _make_id(*fields: str) -> str:
h = hashlib.sha1()
for f in fields:
if f:
h.update(f.encode("utf-8", "ignore"))
h.update(b"|")
return h.hexdigest()
def _normalize_web_record(
source_name: str,
url: str,
title: str,
body: str,
static: Dict[str, Any],
extra: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Produce a record shaped like normalize() output so downstream stays unchanged.
Adds parsed deadline + is_active when available.
"""
extra = extra or {}
# If caller didn't pass a deadline, try to parse from body/title here.
deadline_iso = extra.get("deadline")
deadline_text = extra.get("deadline_text")
if not deadline_iso:
# Try parsing from body first, then from title.
deadline_iso, deadline_text = _extract_deadline(body) or (None, None)
if not deadline_iso and title:
deadline_iso, _ = _extract_deadline(title) or (None, None)
rec = {
"id": extra.get("id") or _make_id(url, title or body[:160]),
"title": title or extra.get("title") or url,
"synopsis": (body or "")[:2000], # clip; embeddings use title+synopsis later
"summary": None,
"url": url,
"source": source_name,
"geo": static.get("geo"),
"categories": static.get("categories"),
"agency": extra.get("agency", ""),
"eligibility": extra.get("eligibility", ""),
# Store ISO (YYYY-MM-DD) in 'deadline' for consistency
"deadline": deadline_iso,
"deadline_text": deadline_text, # keep the raw line we matched (if any)
"program_number": extra.get("program_number"),
"posted_date": extra.get("posted_date"),
}
rec["is_active"] = _compute_is_active(rec["deadline"])
return rec
# -------------------- Collectors: http_html / web_page --------------------
def _collect_from_http_html(entry: Dict, source_name: str, static: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Supports types: 'web_page' and 'http_html'
Config keys supported:
- url (str)
- parse: { follow_links: bool, link_selectors: [..], content_selectors: [..] }
- crawl: { schedule: "...", max_depth: int } # max_depth 0/None = only landing
"""
url = entry.get("url")
if not url:
return []
r = _http_get(url)
if not r:
return []
s = _soup(r.text)
parse = entry.get("parse", {}) or entry.get("extract", {}) or {}
content_selectors = parse.get("content_selectors") or []
title, body = _text_from_soup(s, content_selectors)
rows = []
rows.append(_normalize_web_record(source_name, url, title, body, static, extra={"posted_date": None}))
# follow links?
follow = bool(parse.get("follow_links"))
link_selectors = parse.get("link_selectors") or []
crawl = entry.get("crawl", {}) or {}
max_depth = int(crawl.get("max_depth", 0) or 0)
visited = set([url])
def _enq_links(soup: BeautifulSoup) -> List[str]:
if link_selectors:
links = []
for sel in link_selectors:
for a in soup.select(sel) or []:
href = a.get("href")
if href and href.startswith("http"):
links.append(href)
out, seen = [], set()
for h in links:
if h not in seen:
out.append(h)
seen.add(h)
return out[:40] # polite cap
return []
if follow and max_depth > 0:
frontier = _enq_links(s)
depth = 1
while frontier and depth <= max_depth and len(rows) < 200:
next_frontier = []
for link in frontier:
if link in visited:
continue
visited.add(link)
rr = _http_get(link)
if not rr:
continue
ss = _soup(rr.text)
t2, b2 = _text_from_soup(ss, content_selectors)
if b2:
rows.append(_normalize_web_record(source_name, link, t2, b2, static, extra={"posted_date": None}))
if depth < max_depth:
next_frontier.extend(_enq_links(ss))
time.sleep(0.1) # gentle
frontier = next_frontier
depth += 1
return rows
# -------------------- Collector: http_html_js (Playwright) --------------------
def _collect_from_http_html_js(entry: Dict, source_name: str, static: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
JS-rendered pages using Playwright, with per-card extraction and robust scrolling.
entry.options:
- wait_for (css or ms int)
- scroll (bool)
- scroll_selector (css) # NEW: scroll a container div, not the window
- scroll_times (int) # NEW: default 20
- scroll_wait_ms (int) # NEW: default 400
- min_cards (int) # NEW: wait until at least N cards exist
- click_selector (css)
- max_pages (int)
- timeout_ms (int)
- network_idle (bool)
- debug (bool)
entry.selectors: card, title, link, description, meta
"""
try:
from playwright.sync_api import sync_playwright # type: ignore
except Exception:
print(f"[collect][skip] {source_name}: Playwright not installed.")
return []
url = entry.get("url")
if not url:
return []
options = entry.get("options", {}) or {}
parse = entry.get("parse", {}) or entry.get("extract", {}) or {}
selectors = entry.get("selectors", {}) or {}
content_selectors = parse.get("content_selectors") or []
timeout_ms = int(options.get("timeout_ms", 6000))
network_idle = bool(options.get("network_idle", True))
debug = bool(options.get("debug", False))
max_pages = int(options.get("max_pages", 1))
click_sel = options.get("click_selector") or entry.get("next_selector")
wait_for = options.get("wait_for")
rows: List[Dict[str, Any]] = []
def _text_first(soup: BeautifulSoup, css_list: str) -> str:
if not css_list:
return ""
for css in [c.strip() for c in css_list.split(",")]:
el = soup.select_one(css)
if el:
txt = el.get_text(separator=" ", strip=True)
if txt:
return txt
return ""
def _attr_first(soup: BeautifulSoup, css_list: str, attr: str) -> Optional[str]:
if not css_list:
return None
for css in [c.strip() for c in css_list.split(",")]:
el = soup.select_one(css)
if el:
val = el.get(attr)
if val:
return val
return None
def _parse_cards(page_html: str, base_url: str) -> List[Dict[str, Any]]:
s = _soup(page_html)
card_css = selectors.get("card", "")
if not card_css:
title, body = _text_from_soup(s, content_selectors)
return [_normalize_web_record(source_name, base_url, title, body, static, extra={"posted_date": None})]
out: List[Dict[str, Any]] = []
for card in s.select(card_css) or []:
csoup = BeautifulSoup(str(card), "lxml")
title = _text_first(csoup, selectors.get("title", "h1, h2, h3"))
href = _attr_first(csoup, selectors.get("link", "a"), "href")
link = urljoin(base_url, href) if href else base_url
desc = _text_first(csoup, selectors.get("description", "p, .summary, .excerpt, .card-text"))
meta = _text_first(csoup, selectors.get("meta", ".meta, .tags, .badge, .location"))
body = "\n".join([p for p in (desc, meta) if p]).strip()
if not (title or body):
continue
out.append(_normalize_web_record(source_name, link, title or link, body, static, extra={"posted_date": None}))
return out
def _wait_page_ready(page, *, wait_for, timeout_ms, options, selectors):
# wait_for can be CSS or milliseconds
if isinstance(wait_for, int):
page.wait_for_timeout(wait_for)
elif isinstance(wait_for, str) and wait_for:
page.wait_for_selector(wait_for, timeout=min(timeout_ms, 15000))
# Scroll window or a container div
if options.get("scroll"):
scroll_sel = options.get("scroll_selector")
scroll_times = int(options.get("scroll_times", 20))
scroll_wait = int(options.get("scroll_wait_ms", 400))
if scroll_sel:
page.evaluate(
"""(sel, times, wait) => new Promise(res => {
const el = document.querySelector(sel);
if (!el) { res(); return; }
let i = 0;
const t = setInterval(() => {
const y = el.scrollTop;
el.scrollTop = el.scrollHeight;
i++;
if (el.scrollTop === y || i >= times) { clearInterval(t); res(); }
}, wait);
})""",
scroll_sel, scroll_times, scroll_wait
)
else:
page.evaluate(
"""(times, wait) => new Promise(res => {
let i = 0;
const t = setInterval(() => {
const y = window.scrollY;
window.scrollBy(0, document.body.scrollHeight);
i++;
if (window.scrollY === y || i >= times) { clearInterval(t); res(); }
}, wait);
})""",
scroll_times, scroll_wait
)
# Optionally wait for a minimum number of cards (virtualized lists)
min_cards = int(options.get("min_cards", 0))
card_css = (selectors or {}).get("card", "")
if min_cards and card_css:
try:
page.wait_for_function(
"""([sel, target]) => {
const els = document.querySelectorAll(sel);
return els && els.length >= target;
}""",
arg=[card_css, min_cards],
timeout=min(timeout_ms, 15000),
)
except Exception:
pass # best-effort
def _try_once(p):
def _route(route):
r = route.request
if r.resource_type in {"image", "media", "font"}:
return route.abort()
return route.continue_()
browser = p.chromium.launch(headless=not debug)
context = browser.new_context(user_agent=(
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0 Safari/537.36"
))
context.route("**/*", _route)
page = context.new_page()
try:
page.set_default_timeout(timeout_ms)
page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms)
if network_idle:
page.wait_for_load_state("networkidle", timeout=min(timeout_ms, 15000))
_wait_page_ready(page, wait_for=wait_for, timeout_ms=timeout_ms, options=options, selectors=selectors)
htmls = [page.content()]
# pagination
for _ in range(max_pages - 1):
sel = click_sel
if not sel or page.locator(sel).count() == 0:
break
page.click(sel)
page.wait_for_load_state("domcontentloaded")
if network_idle:
page.wait_for_load_state("networkidle", timeout=min(timeout_ms, 15000))
_wait_page_ready(page, wait_for=wait_for, timeout_ms=timeout_ms, options=options, selectors=selectors)
htmls.append(page.content())
for html in htmls:
found = _parse_cards(html, url)
rows.extend(found)
print(f"[collect][cards] {source_name}: found {len(found)} cards on this page")
browser.close()
return True
except Exception as e:
if debug:
try:
snap = DOCSTORE_DIR / f"playwright_error_{hashlib.sha1(url.encode()).hexdigest()}.png"
page.screenshot(path=str(snap))
print(f"[collect][debug] Saved screenshot: {snap}")
except Exception:
pass
print(f"[collect][warn] {source_name}: {e.__class__.__name__}: {e}")
try:
browser.close()
except Exception:
pass
return False
from playwright.sync_api import sync_playwright # late import for clarity
with sync_playwright() as p:
ok = _try_once(p)
if not ok:
time.sleep(1.5)
_try_once(p)
if not rows:
print(f"[collect][skip] {source_name}: no content after retries.")
return rows
# -------------------- PDF collector --------------------
def _collect_from_http_pdf(entry: Dict, source_name: str, static: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
type: 'http_pdf'
keys:
- url (single PDF fetch)
"""
url = entry.get("url")
if not url:
return []
try:
from pdfminer.high_level import extract_text # lazy import
except Exception:
return []
rows = []
r = _http_get(url, timeout=40)
if not r:
return rows
tmp = DOCSTORE_DIR / (hashlib.sha1(url.encode("utf-8")).hexdigest() + ".pdf")
try:
DOCSTORE_DIR.mkdir(parents=True, exist_ok=True)
tmp.write_bytes(r.content)
body = extract_text(str(tmp)) or ""
finally:
try:
tmp.unlink(missing_ok=True)
except Exception:
pass
title = entry.get("name") or "PDF Document"
rows.append(_normalize_web_record(source_name, url, title, body, static, extra={"posted_date": None}))
return rows
# -------------------- De-dup helpers (5.2) --------------------
def _norm(text: str) -> str:
t = (text or "").lower()
t = re.sub(r'[^a-z0-9 ]+', ' ', t)
return re.sub(r'\s+', ' ', t).strip()
def _hash_fingerprint(title: str, agency: str, deadline: str) -> str:
base = f"{_norm(title)}|{_norm(agency)}|{deadline or ''}"
return hashlib.sha1(base.encode()).hexdigest()
def _near_duplicate(a: Dict[str, Any], b: Dict[str, Any]) -> bool:
# Deadlines equal or both missing
dates_close = (a.get("deadline") == b.get("deadline")) or (not a.get("deadline") and not b.get("deadline"))
t_sim = SequenceMatcher(None, _norm(a.get("title","")), _norm(b.get("title",""))).ratio()
ag_sim = SequenceMatcher(None, _norm(a.get("agency","")), _norm(b.get("agency",""))).ratio()
return dates_close and (t_sim > 0.88) and (ag_sim > 0.75)
def _merge_records(primary: Dict[str, Any], other: Dict[str, Any]) -> Dict[str, Any]:
"""Merge fields while preserving best data and provenance."""
merged = dict(primary)
# Prefer non-empty fields; combine categories; keep earliest posted_date; keep earliest deadline if different.
def choose(a, b):
return a if (a not in (None, "", [], {})) else b
merged["url"] = choose(primary.get("url"), other.get("url"))
merged["title"] = choose(primary.get("title"), other.get("title"))
merged["synopsis"] = choose(primary.get("synopsis"), other.get("synopsis"))
merged["summary"] = choose(primary.get("summary"), other.get("summary"))
merged["agency"] = choose(primary.get("agency"), other.get("agency"))
merged["eligibility"] = choose(primary.get("eligibility"), other.get("eligibility"))
merged["program_number"] = choose(primary.get("program_number"), other.get("program_number"))
merged["geo"] = choose(primary.get("geo"), other.get("geo"))
# categories β†’ union (list)
cats_a = primary.get("categories") or []
cats_b = other.get("categories") or []
if not isinstance(cats_a, list): cats_a = [cats_a]
if not isinstance(cats_b, list): cats_b = [cats_b]
merged["categories"] = sorted(set([c for c in cats_a + cats_b if c]))
# deadline: choose earlier known date (safer to surface sooner one)
da, db = primary.get("deadline"), other.get("deadline")
if da and db:
merged["deadline"] = min(da, db)
else:
merged["deadline"] = da or db
# carry a deadline_text if any
merged["deadline_text"] = choose(primary.get("deadline_text"), other.get("deadline_text"))
merged["is_active"] = _compute_is_active(merged.get("deadline"))
# posted_date: keep earliest if both
pa, pb = primary.get("posted_date"), other.get("posted_date")
merged["posted_date"] = min(pa, pb) if (pa and pb) else (pa or pb)
# provenance: combine sources + urls
prov_sources = set()
for s in (primary.get("source"), other.get("source")):
if not s: continue
if isinstance(s, list): prov_sources.update(s)
else: prov_sources.add(s)
merged["source"] = sorted(prov_sources) if prov_sources else None
prov_urls = set()
for u in (primary.get("url"), other.get("url")):
if u: prov_urls.add(u)
# keep a list of all discovered urls
merged["all_urls"] = sorted(prov_urls.union(set(primary.get("all_urls") or []), set(other.get("all_urls") or [])))
# recompute ID based on merged fingerprint (title/agency/deadline)
merged["id"] = _hash_fingerprint(merged.get("title",""), merged.get("agency",""), merged.get("deadline",""))
return merged
def _dedupe_and_merge(rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Exact fingerprint + fuzzy near-dup consolidation across sources."""
uniques: List[Dict[str, Any]] = []
by_fp: Dict[str, int] = {}
for r in rows:
fp = _hash_fingerprint(r.get("title",""), r.get("agency",""), r.get("deadline",""))
if fp in by_fp:
# exact dup: merge into existing
idx = by_fp[fp]
uniques[idx] = _merge_records(uniques[idx], r)
continue
# fuzzy check against current uniques
found_idx = None
for i, u in enumerate(uniques):
if _near_duplicate(r, u):
found_idx = i
break
if found_idx is not None:
uniques[found_idx] = _merge_records(uniques[found_idx], r)
# also index its fingerprint (so later exact matches land here)
new_fp = _hash_fingerprint(uniques[found_idx].get("title",""),
uniques[found_idx].get("agency",""),
uniques[found_idx].get("deadline",""))
by_fp[new_fp] = found_idx
else:
by_fp[fp] = len(uniques)
# initialize provenance
r.setdefault("all_urls", [r.get("url")] if r.get("url") else [])
uniques.append(r)
return uniques
# -------------------- Write docstore & build index --------------------
def _save_docstore(recs: List[Dict, Any]) -> str:
DOCSTORE_DIR.mkdir(parents=True, exist_ok=True)
path = DOCSTORE_DIR / "docstore.jsonl"
with path.open("w", encoding="utf-8") as f:
for r in recs:
f.write(json.dumps(r, ensure_ascii=False) + "\n")
return str(path)
def _build_index_from_docstore() -> int:
ds_path = DOCSTORE_DIR / "docstore.jsonl"
if not ds_path.exists():
raise RuntimeError("Docstore not found. Run ingest first.")
texts: List[str] = []
metas: List[Dict[str, Any]] = []
with ds_path.open("r", encoding="utf-8") as f:
for line in f:
rec = json.loads(line)
title = rec.get("title") or ""
synopsis = rec.get("synopsis") or rec.get("summary") or ""
agency = rec.get("agency") or ""
eligibility = rec.get("eligibility") or ""
txt = "\n".join([title, synopsis, agency, eligibility]).strip()
if not txt:
continue
texts.append(txt)
metas.append({
"id": rec.get("id"),
"title": title,
"url": rec.get("url"),
"source": rec.get("source"),
"geo": rec.get("geo"),
"categories": rec.get("categories"),
"agency": agency,
"deadline": rec.get("deadline"), # ISO if available
"deadline_text": rec.get("deadline_text"),
"is_active": rec.get("is_active"),
"program_number": rec.get("program_number"),
"posted_date": rec.get("posted_date"),
"all_urls": rec.get("all_urls"),
})
print(f"[index] Rows loaded from docstore: {len(texts)}")
if not texts:
INDEX_DIR.mkdir(parents=True, exist_ok=True)
(INDEX_DIR / "meta.json").write_text(json.dumps([], ensure_ascii=False))
print("[index] No texts to embed. Wrote empty meta.json.")
return 0
# Embed (CPU default; portable)
model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2")
model.max_seq_length = 256
batch = max(8, min(32, len(texts)))
emb = model.encode(
texts,
convert_to_numpy=True,
normalize_embeddings=True,
show_progress_bar=True,
batch_size=batch,
).astype(np.float32, copy=False)
# FAISS index (Inner Product for cosine on normalized vectors)
import faiss
dim = emb.shape[1]
index = faiss.IndexFlatIP(dim)
index.add(emb)
INDEX_DIR.mkdir(parents=True, exist_ok=True)
faiss.write_index(index, str(INDEX_DIR / "faiss.index"))
(INDEX_DIR / "meta.json").write_text(json.dumps(metas, ensure_ascii=False))
print(f"[index] Wrote FAISS index with {emb.shape[0]} vectors (dim={dim}).")
return len(texts)
# -------------------- Public API: ingest --------------------
__all__ = ["ingest"]
def ingest(cfg_path: str = "config/sources.yaml", env: Dict | None = None):
"""
Reads config, fetches from enabled sources via adapters, normalizes to a single schema,
applies filters (capacity / PA-MD), de-dupes, writes docstore, and builds the FAISS index.
Returns (docstore_path, n_indexed).
"""
cfg = load_config(cfg_path)
# ---- Filters from config ----
f_cfg = (cfg or {}).get("filters", {}) or {}
capacity_only = bool(f_cfg.get("capacity_only", True))
pa_md_only = bool(f_cfg.get("pa_md_only", False))
print(f"[filters] capacity_only = {'TRUE' if capacity_only else 'FALSE'}")
print(f"[filters] pa_md_only = {'TRUE' if pa_md_only else 'FALSE'}")
all_rows: List[Dict[str, Any]] = []
for entry in cfg.get("sources", []):
if not entry.get("enabled"):
continue
name = entry.get("name", "<source>")
geo = entry.get("geo") or "US"
cats = entry.get("categories") or []
static = {"geo": geo, "categories": cats}
typ = entry.get("type")
rows: List[Dict[str, Any]] = []
# -------- Collect from each adapter --------
if typ == "grantsgov_api":
raw_hits = _collect_from_grantsgov_api(entry)
rows = [normalize("grants_gov", h, static) for h in raw_hits]
elif typ in ("web_page", "http_html"):
rows = _collect_from_http_html(entry, name, static)
elif typ == "http_html_js":
rows = _collect_from_http_html_js(entry, name, static)
elif typ == "http_pdf":
rows = _collect_from_http_pdf(entry, name, static)
elif typ == "local_sample":
p = Path(entry["path"]).expanduser()
blob = json.loads(p.read_text(encoding="utf-8"))
items = blob.get("opportunities") or []
rows = [normalize("local_sample", op, static) for op in items]
else:
print(f"[collect] {name}: unknown type '{typ}', skipping.")
continue
print(f"[collect] {name}: fetched_rows={len(rows)}")
# ---- Apply capacity / geo filters BEFORE indexing (allow per-source bypass) ----
if rows:
if entry.get("skip_filters"):
print(f"[filter] {name}: skip_filters=true β†’ keeping all {len(rows)}")
else:
pre = len(rows)
filtered = []
for r in rows:
t = _doc_text_from_row(r)
if capacity_only and not _is_capacity_building_text(t):
continue
if pa_md_only and not _is_pa_md_text(t):
continue
filtered.append(r)
print(
f"[filter] {name}: kept {len(filtered)}/{pre} after filters "
f"(capacity_only={capacity_only}, pa_md_only={pa_md_only})"
)
rows = filtered
print(f"[collect] {name} β†’ rows_after_filters={len(rows)}")
all_rows.extend(rows)
# ---- Cross-source DEDUPE + MERGE ----
unique = _dedupe_and_merge(all_rows)
print(f"[ingest] Unique records to index: {len(unique)}")
path = _save_docstore(unique)
n = _build_index_from_docstore()
return path, n
# -------------------- CLI --------------------
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("--config", default="config/sources.yaml")
args = ap.parse_args()
p, n = ingest(args.config)
print(f"Ingested {n} records. Docstore at {p}")