# app/ingest.py from __future__ import annotations import json from pathlib import Path from typing import Dict, List, Any, Tuple, Optional import yaml import numpy as np from sentence_transformers import SentenceTransformer from app.paths import DOCSTORE_DIR, INDEX_DIR from .normalize import normalize # ← central normalizer import re import time import hashlib import requests from bs4 import BeautifulSoup from datetime import datetime, timezone from difflib import SequenceMatcher from urllib.parse import urljoin from app.tags import infer_matt25_tags # ← NEW # -------------------- Config -------------------- def load_config(cfg_path: str) -> Dict: with open(cfg_path, "r", encoding="utf-8") as f: return yaml.safe_load(f) # -------------------- Capacity / Geo Filters (config-driven) -------------------- # controls live in config/sources.yaml: # filters: # capacity_only: true # pa_md_only: false _INCLUDE_PATTERNS = [re.compile(p, re.I) for p in [ r"\bcapacity(?:[-\s]?building)?\b", r"\btechnical\s+assistance\b", r"\bTA\b", r"\borganizational\s+(capacity|effectiveness|development|readiness|stabilization)\b", r"\borganization(?:al)?\s+infrastructure\b", r"\bback[-\s]?office\b|\bbackbone\s+organization\b", r"\bgovernance\b|\bboard\s+development\b|\bboard\s+training\b", r"\bpre[-\s]?development\b|\bpredevelopment\b|\bplanning\s+grant\b", r"\bdata\s+systems?\b|\bCRM\b|\bcase\s+management\b", r"\b(staff|workforce)\s+capacity\b|\bhire\s+(?:staff|positions?)\b", r"\bscal(?:e|ing)\s+capacity\b|\bexpand\s+capacity\b", r"\bnonprofit\b|\bfaith[-\s]?based\b|\bcommunity[-\s]?based\b", # broaden for transportation / human services r"\bprovider\s+capacity\b|\bservice\s+capacity\b|\borganizational\s+support\b", ]] _EXCLUDE_PATTERNS = [re.compile(p, re.I) for p in [ r"\bteaching\s+assistant\b|\bTAs\b", r"\bbench\s+capacity\b|\bmanufacturing\s+capacity\b(?!.*organiz)", r"\bclinical\s+trial\b|\blaboratory\s+capacity\b(?!.*community)", r"\b(postsecondary|university|college)\b(?!.*community\s+partner)", r"\bconstruction\b(?!.*(admin|organiz|back[-\s]?office|governance|systems))", ]] _PA_MD_HINTS = re.compile( r"\b(" r"Pennsylvania|PA\b|Harrisburg|Philadelphia|Allegheny|Montgomery County\b|Pittsburgh|Scranton|Erie|" r"Maryland|MD\b|Annapolis|Baltimore|Prince\s+George'?s|Howard County\b" r")\b", re.I, ) def _doc_text_from_row(rec: Dict[str, Any]) -> str: title = rec.get("title") or "" synopsis = rec.get("synopsis") or rec.get("summary") or "" agency = rec.get("agency") or "" eligibility = rec.get("eligibility") or "" categories = " ".join(rec.get("categories") or []) if isinstance(rec.get("categories"), list) else (rec.get("categories") or "") geo = rec.get("geo") or "" return "\n".join([title, synopsis, agency, eligibility, categories, geo]).strip() def _is_capacity_building_text(text: str) -> bool: if not text: return False if any(p.search(text) for p in _EXCLUDE_PATTERNS): return False return any(p.search(text) for p in _INCLUDE_PATTERNS) def _is_pa_md_text(text: str) -> bool: if not text: return False return bool(_PA_MD_HINTS.search(text)) # -------------------- Deadline parsing helpers -------------------- # Common date shapes: "October 15, 2025", "Oct 15, 2025", "10/15/2025", "2025-10-15" _MONTHS = { "january":1, "jan":1, "february":2, "feb":2, "march":3, "mar":3, "april":4, "apr":4, "may":5, "june":6, "jun":6, "july":7, "jul":7, "august":8, "aug":8, "september":9, "sep":9, "sept":9, "october":10, "oct":10, "november":11, "nov":11, "december":12, "dec":12 } # Capture label + date snippets _DEADLINE_LINES = re.compile( r"(deadline|applications?\s+due|closing\s+date|due\s+date|submission\s+deadline)\s*[:\-]?\s*(.+)", re.I ) # Capture absolute dates _DATE_ISO = re.compile(r"\b(20\d{2})[-/\.](\d{1,2})[-/\.](\d{1,2})\b") # 2025-10-15 or 2025/10/15 _DATE_US = re.compile(r"\b(\d{1,2})[-/\.](\d{1,2})[-/\.](20\d{2})\b") # 10/15/2025 _DATE_LONG = re.compile( r"\b([A-Za-z]{3,9})\s+(\d{1,2})(?:st|nd|rd|th)?\,?\s+(20\d{2})\b", re.I) # Oct 15, 2025 / October 15 2025 def _to_iso(y:int, m:int, d:int) -> Optional[str]: try: return datetime(y, m, d, tzinfo=timezone.utc).date().isoformat() except Exception: return None def _parse_date_fragment(s: str) -> Optional[str]: s = (s or "").strip() # ISO first m = _DATE_ISO.search(s) if m: y, mm, dd = int(m.group(1)), int(m.group(2)), int(m.group(3)) return _to_iso(y, mm, dd) # US 10/15/2025 m = _DATE_US.search(s) if m: mm, dd, y = int(m.group(1)), int(m.group(2)), int(m.group(3)) return _to_iso(y, mm, dd) # Long months m = _DATE_LONG.search(s) if m: mon = m.group(1).lower() dd = int(m.group(2)) y = int(m.group(3)) mm = _MONTHS.get(mon) if mm: return _to_iso(y, mm, dd) return None def _extract_deadline(text: str) -> Tuple[Optional[str], Optional[str]]: """ Try to locate a deadline-like line, then parse a date. Returns (deadline_iso, raw_snippet) or (None, None). """ if not text: return None, None # Look for labeled lines first (strong signal) for line in text.splitlines(): m = _DEADLINE_LINES.search(line) if m: iso = _parse_date_fragment(m.group(2)) if iso: return iso, line.strip() # Fallback: scan whole text for any date (first match) iso = _parse_date_fragment(text) if iso: return iso, None return None, None def _compute_is_active(deadline_iso: Optional[str]) -> bool: if not deadline_iso: return True # treat unknown/TBD as active try: return datetime.fromisoformat(deadline_iso).date() >= datetime.utcnow().date() except Exception: return True def _attach_matt25_tags(rec: Dict[str, Any]) -> None: """Add Matthew 25 tags by inspecting core text fields; preserve any manual tags.""" blob = " ".join(filter(None, [ rec.get("title", ""), rec.get("synopsis") or rec.get("summary") or "", rec.get("eligibility", ""), rec.get("agency", ""), ])) manual = rec.get("tags") or [] if not isinstance(manual, list): manual = [manual] auto = infer_matt25_tags(blob) rec["tags"] = sorted(set(manual) | set(auto)) # -------------------- Grants.gov collector -------------------- def _collect_from_grantsgov_api(src: Dict) -> List[Dict[str, Any]]: """ Calls the Grants.gov Search2 client and returns a list of RAW dicts (adapter may already be close to unified; we'll still run normalize()). """ from app.sources.grantsgov_api import search_grants # local import to avoid cycles api = src.get("api", {}) page_size = int(api.get("page_size", src.get("page_size", 100))) max_pages = int(api.get("max_pages", src.get("max_pages", 5))) payload = api.get("payload", src.get("payload", {})) url = src.get("url", "") out = search_grants(url, payload, page_size=page_size, max_pages=max_pages) hits = out.get("hits", []) if isinstance(out, dict) else (out or []) return [h for h in hits if isinstance(h, dict)] # -------------------- HTTP helpers -------------------- _HTTP_HEADERS = { "User-Agent": "grants-rag/1.0 (+https://example.local) requests", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", } def _http_get(url: str, timeout: int = 20) -> Optional[requests.Response]: try: r = requests.get(url, headers=_HTTP_HEADERS, timeout=timeout) if r.status_code == 200 and r.content: return r except requests.RequestException: return None return None def _soup(html: str) -> BeautifulSoup: # use lxml or html5lib if available for robustness return BeautifulSoup(html, "lxml") def _text_from_soup(s: BeautifulSoup, selectors: Optional[List[str]] = None) -> Tuple[str, str]: """ Returns (title, text). Uses selectors if provided; falls back to common content containers. """ title = s.title.string.strip() if s.title and s.title.string else "" nodes = [] if selectors: for css in selectors: nodes.extend(s.select(css) or []) if not nodes: for css in ("main", "article", "#content", ".content", "[role='main']"): nodes.extend(s.select(css) or []) if not nodes: nodes = [s.body] if s.body else [] parts: List[str] = [] for n in nodes: if not n: continue txt = n.get_text(separator="\n", strip=True) if txt: parts.append(txt) body = "\n\n".join(parts).strip() return title, body def _make_id(*fields: str) -> str: h = hashlib.sha1() for f in fields: if f: h.update(f.encode("utf-8", "ignore")) h.update(b"|") return h.hexdigest() def _normalize_web_record( source_name: str, url: str, title: str, body: str, static: Dict[str, Any], extra: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """ Produce a record shaped like normalize() output so downstream stays unchanged. Adds parsed deadline + is_active when available. """ extra = extra or {} # If caller didn't pass a deadline, try to parse from body/title here. deadline_iso = extra.get("deadline") deadline_text = extra.get("deadline_text") if not deadline_iso: # Try parsing from body first, then from title. deadline_iso, deadline_text = _extract_deadline(body) or (None, None) if not deadline_iso and title: deadline_iso, _ = _extract_deadline(title) or (None, None) rec = { "id": extra.get("id") or _make_id(url, title or body[:160]), "title": title or extra.get("title") or url, "synopsis": (body or "")[:2000], # clip; embeddings use title+synopsis later "summary": None, "url": url, "source": source_name, "geo": static.get("geo"), "categories": static.get("categories"), "agency": extra.get("agency", ""), "eligibility": extra.get("eligibility", ""), # Store ISO (YYYY-MM-DD) in 'deadline' for consistency "deadline": deadline_iso, "deadline_text": deadline_text, # keep the raw line we matched (if any) "program_number": extra.get("program_number"), "posted_date": extra.get("posted_date"), } rec["is_active"] = _compute_is_active(rec["deadline"]) # NEW: add Matthew 25 tags based on title/synopsis/eligibility text _attach_matt25_tags(rec) return rec # -------------------- Collectors: http_html / web_page -------------------- def _collect_from_http_html(entry: Dict, source_name: str, static: Dict[str, Any]) -> List[Dict[str, Any]]: """ Supports types: 'web_page' and 'http_html' Config keys supported: - url (str) - parse: { follow_links: bool, link_selectors: [..], content_selectors: [..] } - crawl: { schedule: "...", max_depth: int } # max_depth 0/None = only landing """ url = entry.get("url") if not url: return [] r = _http_get(url) if not r: return [] s = _soup(r.text) parse = entry.get("parse", {}) or entry.get("extract", {}) or {} content_selectors = parse.get("content_selectors") or [] title, body = _text_from_soup(s, content_selectors) rows = [] rows.append(_normalize_web_record(source_name, url, title, body, static, extra={"posted_date": None})) # follow links? follow = bool(parse.get("follow_links")) link_selectors = parse.get("link_selectors") or [] crawl = entry.get("crawl", {}) or {} max_depth = int(crawl.get("max_depth", 0) or 0) visited = set([url]) def _enq_links(soup: BeautifulSoup) -> List[str]: if link_selectors: links = [] for sel in link_selectors: for a in soup.select(sel) or []: href = a.get("href") if href and href.startswith("http"): links.append(href) out, seen = [], set() for h in links: if h not in seen: out.append(h) seen.add(h) return out[:40] # polite cap return [] if follow and max_depth > 0: frontier = _enq_links(s) depth = 1 while frontier and depth <= max_depth and len(rows) < 200: next_frontier = [] for link in frontier: if link in visited: continue visited.add(link) rr = _http_get(link) if not rr: continue ss = _soup(rr.text) t2, b2 = _text_from_soup(ss, content_selectors) if b2: rows.append(_normalize_web_record(source_name, link, t2, b2, static, extra={"posted_date": None})) if depth < max_depth: next_frontier.extend(_enq_links(ss)) time.sleep(0.1) # gentle frontier = next_frontier depth += 1 return rows # -------------------- Collector: http_html_js (Playwright) -------------------- def _collect_from_http_html_js(entry: Dict, source_name: str, static: Dict[str, Any]) -> List[Dict[str, Any]]: """ JS-rendered pages using Playwright, with per-card extraction and robust scrolling. entry.options: - wait_for (css or ms int) - scroll (bool) - scroll_selector (css) # NEW: scroll a container div, not the window - scroll_times (int) # NEW: default 20 - scroll_wait_ms (int) # NEW: default 400 - min_cards (int) # NEW: wait until at least N cards exist - click_selector (css) - max_pages (int) - timeout_ms (int) - network_idle (bool) - debug (bool) entry.selectors: card, title, link, description, meta """ try: from playwright.sync_api import sync_playwright # type: ignore except Exception: print(f"[collect][skip] {source_name}: Playwright not installed.") return [] url = entry.get("url") if not url: return [] options = entry.get("options", {}) or {} parse = entry.get("parse", {}) or entry.get("extract", {}) or {} selectors = entry.get("selectors", {}) or {} content_selectors = parse.get("content_selectors") or [] timeout_ms = int(options.get("timeout_ms", 6000)) network_idle = bool(options.get("network_idle", True)) debug = bool(options.get("debug", False)) max_pages = int(options.get("max_pages", 1)) click_sel = options.get("click_selector") or entry.get("next_selector") wait_for = options.get("wait_for") rows: List[Dict[str, Any]] = [] def _text_first(soup: BeautifulSoup, css_list: str) -> str: if not css_list: return "" for css in [c.strip() for c in css_list.split(",")]: el = soup.select_one(css) if el: txt = el.get_text(separator=" ", strip=True) if txt: return txt return "" def _attr_first(soup: BeautifulSoup, css_list: str, attr: str) -> Optional[str]: if not css_list: return None for css in [c.strip() for c in css_list.split(",")]: el = soup.select_one(css) if el: val = el.get(attr) if val: return val return None def _parse_cards(page_html: str, base_url: str) -> List[Dict[str, Any]]: s = _soup(page_html) card_css = selectors.get("card", "") if not card_css: title, body = _text_from_soup(s, content_selectors) return [_normalize_web_record(source_name, base_url, title, body, static, extra={"posted_date": None})] out: List[Dict[str, Any]] = [] for card in s.select(card_css) or []: csoup = BeautifulSoup(str(card), "lxml") title = _text_first(csoup, selectors.get("title", "h1, h2, h3")) href = _attr_first(csoup, selectors.get("link", "a"), "href") link = urljoin(base_url, href) if href else base_url desc = _text_first(csoup, selectors.get("description", "p, .summary, .excerpt, .card-text")) meta = _text_first(csoup, selectors.get("meta", ".meta, .tags, .badge, .location")) body = "\n".join([p for p in (desc, meta) if p]).strip() if not (title or body): continue out.append(_normalize_web_record(source_name, link, title or link, body, static, extra={"posted_date": None})) return out def _wait_page_ready(page, *, wait_for, timeout_ms, options, selectors): # wait_for can be CSS or milliseconds if isinstance(wait_for, int): page.wait_for_timeout(wait_for) elif isinstance(wait_for, str) and wait_for: page.wait_for_selector(wait_for, timeout=min(timeout_ms, 15000)) # Scroll window or a container div if options.get("scroll"): scroll_sel = options.get("scroll_selector") scroll_times = int(options.get("scroll_times", 20)) scroll_wait = int(options.get("scroll_wait_ms", 400)) if scroll_sel: page.evaluate( """(sel, times, wait) => new Promise(res => { const el = document.querySelector(sel); if (!el) { res(); return; } let i = 0; const t = setInterval(() => { const y = el.scrollTop; el.scrollTop = el.scrollHeight; i++; if (el.scrollTop === y || i >= times) { clearInterval(t); res(); } }, wait); })""", scroll_sel, scroll_times, scroll_wait ) else: page.evaluate( """(times, wait) => new Promise(res => { let i = 0; const t = setInterval(() => { const y = window.scrollY; window.scrollBy(0, document.body.scrollHeight); i++; if (window.scrollY === y || i >= times) { clearInterval(t); res(); } }, wait); })""", scroll_times, scroll_wait ) # Optionally wait for a minimum number of cards (virtualized lists) min_cards = int(options.get("min_cards", 0)) card_css = (selectors or {}).get("card", "") if min_cards and card_css: try: page.wait_for_function( """([sel, target]) => { const els = document.querySelectorAll(sel); return els && els.length >= target; }""", arg=[card_css, min_cards], timeout=min(timeout_ms, 15000), ) except Exception: pass # best-effort def _try_once(p): def _route(route): r = route.request if r.resource_type in {"image", "media", "font"}: return route.abort() return route.continue_() browser = p.chromium.launch(headless=not debug) context = browser.new_context(user_agent=( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0 Safari/537.36" )) context.route("**/*", _route) page = context.new_page() try: page.set_default_timeout(timeout_ms) page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms) if network_idle: page.wait_for_load_state("networkidle", timeout=min(timeout_ms, 15000)) _wait_page_ready(page, wait_for=wait_for, timeout_ms=timeout_ms, options=options, selectors=selectors) htmls = [page.content()] # pagination for _ in range(max_pages - 1): sel = click_sel if not sel or page.locator(sel).count() == 0: break page.click(sel) page.wait_for_load_state("domcontentloaded") if network_idle: page.wait_for_load_state("networkidle", timeout=min(timeout_ms, 15000)) _wait_page_ready(page, wait_for=wait_for, timeout_ms=timeout_ms, options=options, selectors=selectors) htmls.append(page.content()) for html in htmls: found = _parse_cards(html, url) rows.extend(found) print(f"[collect][cards] {source_name}: found {len(found)} cards on this page") browser.close() return True except Exception as e: if debug: try: snap = DOCSTORE_DIR / f"playwright_error_{hashlib.sha1(url.encode()).hexdigest()}.png" page.screenshot(path=str(snap)) print(f"[collect][debug] Saved screenshot: {snap}") except Exception: pass print(f"[collect][warn] {source_name}: {e.__class__.__name__}: {e}") try: browser.close() except Exception: pass return False from playwright.sync_api import sync_playwright # late import for clarity with sync_playwright() as p: ok = _try_once(p) if not ok: time.sleep(1.5) _try_once(p) if not rows: print(f"[collect][skip] {source_name}: no content after retries.") return rows # -------------------- PDF collector -------------------- def _collect_from_http_pdf(entry: Dict, source_name: str, static: Dict[str, Any]) -> List[Dict[str, Any]]: """ type: 'http_pdf' keys: - url (single PDF fetch) """ url = entry.get("url") if not url: return [] try: from pdfminer.high_level import extract_text # lazy import except Exception: return [] rows = [] r = _http_get(url, timeout=40) if not r: return rows tmp = DOCSTORE_DIR / (hashlib.sha1(url.encode("utf-8")).hexdigest() + ".pdf") try: DOCSTORE_DIR.mkdir(parents=True, exist_ok=True) tmp.write_bytes(r.content) body = extract_text(str(tmp)) or "" finally: try: tmp.unlink(missing_ok=True) except Exception: pass title = entry.get("name") or "PDF Document" rows.append(_normalize_web_record(source_name, url, title, body, static, extra={"posted_date": None})) return rows # -------------------- De-dup helpers (5.2) -------------------- def _norm(text: str) -> str: t = (text or "").lower() t = re.sub(r'[^a-z0-9 ]+', ' ', t) return re.sub(r'\s+', ' ', t).strip() def _hash_fingerprint(title: str, agency: str, deadline: str) -> str: base = f"{_norm(title)}|{_norm(agency)}|{deadline or ''}" return hashlib.sha1(base.encode()).hexdigest() def _near_duplicate(a: Dict[str, Any], b: Dict[str, Any]) -> bool: # Deadlines equal or both missing dates_close = (a.get("deadline") == b.get("deadline")) or (not a.get("deadline") and not b.get("deadline")) t_sim = SequenceMatcher(None, _norm(a.get("title","")), _norm(b.get("title",""))).ratio() ag_sim = SequenceMatcher(None, _norm(a.get("agency","")), _norm(b.get("agency",""))).ratio() return dates_close and (t_sim > 0.88) and (ag_sim > 0.75) def _merge_records(primary: Dict[str, Any], other: Dict[str, Any]) -> Dict[str, Any]: """Merge fields while preserving best data and provenance.""" merged = dict(primary) # Prefer non-empty fields; combine categories; keep earliest posted_date; keep earliest deadline if different. def choose(a, b): return a if (a not in (None, "", [], {})) else b merged["url"] = choose(primary.get("url"), other.get("url")) merged["title"] = choose(primary.get("title"), other.get("title")) merged["synopsis"] = choose(primary.get("synopsis"), other.get("synopsis")) merged["summary"] = choose(primary.get("summary"), other.get("summary")) merged["agency"] = choose(primary.get("agency"), other.get("agency")) merged["eligibility"] = choose(primary.get("eligibility"), other.get("eligibility")) merged["program_number"] = choose(primary.get("program_number"), other.get("program_number")) merged["geo"] = choose(primary.get("geo"), other.get("geo")) # categories → union (list) cats_a = primary.get("categories") or [] cats_b = other.get("categories") or [] if not isinstance(cats_a, list): cats_a = [cats_a] if not isinstance(cats_b, list): cats_b = [cats_b] merged["categories"] = sorted(set([c for c in cats_a + cats_b if c])) # deadline: choose earlier known date (safer to surface sooner one) da, db = primary.get("deadline"), other.get("deadline") if da and db: merged["deadline"] = min(da, db) else: merged["deadline"] = da or db # carry a deadline_text if any merged["deadline_text"] = choose(primary.get("deadline_text"), other.get("deadline_text")) merged["is_active"] = _compute_is_active(merged.get("deadline")) # posted_date: keep earliest if both pa, pb = primary.get("posted_date"), other.get("posted_date") merged["posted_date"] = min(pa, pb) if (pa and pb) else (pa or pb) # provenance: combine sources + urls prov_sources = set() for s in (primary.get("source"), other.get("source")): if not s: continue if isinstance(s, list): prov_sources.update(s) else: prov_sources.add(s) merged["source"] = sorted(prov_sources) if prov_sources else None prov_urls = set() for u in (primary.get("url"), other.get("url")): if u: prov_urls.add(u) # keep a list of all discovered urls merged["all_urls"] = sorted(prov_urls.union(set(primary.get("all_urls") or []), set(other.get("all_urls") or []))) # recompute ID based on merged fingerprint (title/agency/deadline) merged["id"] = _hash_fingerprint(merged.get("title",""), merged.get("agency",""), merged.get("deadline","")) return merged def _dedupe_and_merge(rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Exact fingerprint + fuzzy near-dup consolidation across sources.""" uniques: List[Dict[str, Any]] = [] by_fp: Dict[str, int] = {} for r in rows: fp = _hash_fingerprint(r.get("title",""), r.get("agency",""), r.get("deadline","")) if fp in by_fp: # exact dup: merge into existing idx = by_fp[fp] uniques[idx] = _merge_records(uniques[idx], r) continue # fuzzy check against current uniques found_idx = None for i, u in enumerate(uniques): if _near_duplicate(r, u): found_idx = i break if found_idx is not None: uniques[found_idx] = _merge_records(uniques[found_idx], r) # also index its fingerprint (so later exact matches land here) new_fp = _hash_fingerprint(uniques[found_idx].get("title",""), uniques[found_idx].get("agency",""), uniques[found_idx].get("deadline","")) by_fp[new_fp] = found_idx else: by_fp[fp] = len(uniques) # initialize provenance r.setdefault("all_urls", [r.get("url")] if r.get("url") else []) uniques.append(r) return uniques # -------------------- Write docstore & build index -------------------- def _save_docstore(recs: List[Dict, Any]) -> str: DOCSTORE_DIR.mkdir(parents=True, exist_ok=True) path = DOCSTORE_DIR / "docstore.jsonl" with path.open("w", encoding="utf-8") as f: for r in recs: f.write(json.dumps(r, ensure_ascii=False) + "\n") return str(path) def _build_index_from_docstore() -> int: ds_path = DOCSTORE_DIR / "docstore.jsonl" if not ds_path.exists(): raise RuntimeError("Docstore not found. Run ingest first.") texts: List[str] = [] metas: List[Dict[str, Any]] = [] with ds_path.open("r", encoding="utf-8") as f: for line in f: rec = json.loads(line) title = rec.get("title") or "" synopsis = rec.get("synopsis") or rec.get("summary") or "" agency = rec.get("agency") or "" eligibility = rec.get("eligibility") or "" txt = "\n".join([title, synopsis, agency, eligibility]).strip() if not txt: continue texts.append(txt) metas.append({ "id": rec.get("id"), "title": title, "url": rec.get("url"), "source": rec.get("source"), "tags": rec.get("tags"), "geo": rec.get("geo"), "categories": rec.get("categories"), "agency": agency, "deadline": rec.get("deadline"), # ISO if available "deadline_text": rec.get("deadline_text"), "is_active": rec.get("is_active"), "program_number": rec.get("program_number"), "posted_date": rec.get("posted_date"), "all_urls": rec.get("all_urls"), }) print(f"[index] Rows loaded from docstore: {len(texts)}") if not texts: INDEX_DIR.mkdir(parents=True, exist_ok=True) (INDEX_DIR / "meta.json").write_text(json.dumps([], ensure_ascii=False)) print("[index] No texts to embed. Wrote empty meta.json.") return 0 # Embed (CPU default; portable) model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") model.max_seq_length = 256 batch = max(8, min(32, len(texts))) emb = model.encode( texts, convert_to_numpy=True, normalize_embeddings=True, show_progress_bar=True, batch_size=batch, ).astype(np.float32, copy=False) # FAISS index (Inner Product for cosine on normalized vectors) import faiss dim = emb.shape[1] index = faiss.IndexFlatIP(dim) index.add(emb) INDEX_DIR.mkdir(parents=True, exist_ok=True) faiss.write_index(index, str(INDEX_DIR / "faiss.index")) (INDEX_DIR / "meta.json").write_text(json.dumps(metas, ensure_ascii=False)) print(f"[index] Wrote FAISS index with {emb.shape[0]} vectors (dim={dim}).") return len(texts) # -------------------- Public API: ingest -------------------- __all__ = ["ingest"] def ingest(cfg_path: str = "config/sources.yaml", env: Dict | None = None): """ Reads config, fetches from enabled sources via adapters, normalizes to a single schema, applies filters (capacity / PA-MD), de-dupes, writes docstore, and builds the FAISS index. Returns (docstore_path, n_indexed). """ cfg = load_config(cfg_path) # ---- Filters from config ---- f_cfg = (cfg or {}).get("filters", {}) or {} capacity_only = bool(f_cfg.get("capacity_only", True)) pa_md_only = bool(f_cfg.get("pa_md_only", False)) print(f"[filters] capacity_only = {'TRUE' if capacity_only else 'FALSE'}") print(f"[filters] pa_md_only = {'TRUE' if pa_md_only else 'FALSE'}") all_rows: List[Dict[str, Any]] = [] for entry in cfg.get("sources", []): if not entry.get("enabled"): continue name = entry.get("name", "") geo = entry.get("geo") or "US" cats = entry.get("categories") or [] static = {"geo": geo, "categories": cats} typ = entry.get("type") rows: List[Dict[str, Any]] = [] # -------- Collect from each adapter -------- if typ == "grantsgov_api": raw_hits = _collect_from_grantsgov_api(entry) rows = [normalize("grants_gov", h, static) for h in raw_hits] elif typ in ("web_page", "http_html"): rows = _collect_from_http_html(entry, name, static) elif typ == "http_html_js": rows = _collect_from_http_html_js(entry, name, static) elif typ == "http_pdf": rows = _collect_from_http_pdf(entry, name, static) elif typ == "local_sample": p = Path(entry["path"]).expanduser() blob = json.loads(p.read_text(encoding="utf-8")) items = blob.get("opportunities") or [] rows = [normalize("local_sample", op, static) for op in items] else: print(f"[collect] {name}: unknown type '{typ}', skipping.") continue print(f"[collect] {name}: fetched_rows={len(rows)}") # ---- Apply capacity / geo filters BEFORE indexing (allow per-source bypass) ---- if rows: if entry.get("skip_filters"): print(f"[filter] {name}: skip_filters=true → keeping all {len(rows)}") else: pre = len(rows) filtered = [] for r in rows: t = _doc_text_from_row(r) if capacity_only and not _is_capacity_building_text(t): continue if pa_md_only and not _is_pa_md_text(t): continue filtered.append(r) print( f"[filter] {name}: kept {len(filtered)}/{pre} after filters " f"(capacity_only={capacity_only}, pa_md_only={pa_md_only})" ) rows = filtered # ---- Apply capacity / geo filters BEFORE indexing (allow per-source bypass) ---- if rows: if entry.get("skip_filters"): print(f"[filter] {name}: skip_filters=true → keeping all {len(rows)}") else: pre = len(rows) filtered = [] for r in rows: t = _doc_text_from_row(r) if capacity_only and not _is_capacity_building_text(t): continue if pa_md_only and not _is_pa_md_text(t): continue filtered.append(r) print( f"[filter] {name}: kept {len(filtered)}/{pre} after filters " f"(capacity_only={capacity_only}, pa_md_only={pa_md_only})" ) rows = filtered # >>> GLOBAL MATT 25 TAGGING (safer, one place) <<< for r in rows: # If you truly want to leave Grants.gov untouched, keep this guard: if r.get("source") == "grants_gov": continue _attach_matt25_tags(r) print(f"[collect] {name} → rows_after_filters={len(rows)}") all_rows.extend(rows) # ---- Cross-source DEDUPE + MERGE ---- unique = _dedupe_and_merge(all_rows) print(f"[ingest] Unique records to index: {len(unique)}") path = _save_docstore(unique) n = _build_index_from_docstore() return path, n # -------------------- CLI -------------------- if __name__ == "__main__": import argparse ap = argparse.ArgumentParser() ap.add_argument("--config", default="config/sources.yaml") args = ap.parse_args() p, n = ingest(args.config) print(f"Ingested {n} records. Docstore at {p}")