Spaces:
Running
Running
| # app/ingest.py | |
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Tuple, Optional | |
| import yaml | |
| import numpy as np | |
| from sentence_transformers import SentenceTransformer | |
| from app.paths import DOCSTORE_DIR, INDEX_DIR | |
| from .normalize import normalize # β central normalizer | |
| import re | |
| import time | |
| import hashlib | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from datetime import datetime, timezone | |
| from difflib import SequenceMatcher | |
| from urllib.parse import urljoin | |
| from app.tags import infer_matt25_tags # β NEW | |
| # -------------------- Config -------------------- | |
| def load_config(cfg_path: str) -> Dict: | |
| with open(cfg_path, "r", encoding="utf-8") as f: | |
| return yaml.safe_load(f) | |
| # -------------------- Capacity / Geo Filters (config-driven) -------------------- | |
| # controls live in config/sources.yaml: | |
| # filters: | |
| # capacity_only: true | |
| # pa_md_only: false | |
| _INCLUDE_PATTERNS = [re.compile(p, re.I) for p in [ | |
| r"\bcapacity(?:[-\s]?building)?\b", | |
| r"\btechnical\s+assistance\b", | |
| r"\bTA\b", | |
| r"\borganizational\s+(capacity|effectiveness|development|readiness|stabilization)\b", | |
| r"\borganization(?:al)?\s+infrastructure\b", | |
| r"\bback[-\s]?office\b|\bbackbone\s+organization\b", | |
| r"\bgovernance\b|\bboard\s+development\b|\bboard\s+training\b", | |
| r"\bpre[-\s]?development\b|\bpredevelopment\b|\bplanning\s+grant\b", | |
| r"\bdata\s+systems?\b|\bCRM\b|\bcase\s+management\b", | |
| r"\b(staff|workforce)\s+capacity\b|\bhire\s+(?:staff|positions?)\b", | |
| r"\bscal(?:e|ing)\s+capacity\b|\bexpand\s+capacity\b", | |
| r"\bnonprofit\b|\bfaith[-\s]?based\b|\bcommunity[-\s]?based\b", | |
| # broaden for transportation / human services | |
| r"\bprovider\s+capacity\b|\bservice\s+capacity\b|\borganizational\s+support\b", | |
| ]] | |
| _EXCLUDE_PATTERNS = [re.compile(p, re.I) for p in [ | |
| r"\bteaching\s+assistant\b|\bTAs\b", | |
| r"\bbench\s+capacity\b|\bmanufacturing\s+capacity\b(?!.*organiz)", | |
| r"\bclinical\s+trial\b|\blaboratory\s+capacity\b(?!.*community)", | |
| r"\b(postsecondary|university|college)\b(?!.*community\s+partner)", | |
| r"\bconstruction\b(?!.*(admin|organiz|back[-\s]?office|governance|systems))", | |
| ]] | |
| _PA_MD_HINTS = re.compile( | |
| r"\b(" | |
| r"Pennsylvania|PA\b|Harrisburg|Philadelphia|Allegheny|Montgomery County\b|Pittsburgh|Scranton|Erie|" | |
| r"Maryland|MD\b|Annapolis|Baltimore|Prince\s+George'?s|Howard County\b" | |
| r")\b", | |
| re.I, | |
| ) | |
| def _doc_text_from_row(rec: Dict[str, Any]) -> str: | |
| title = rec.get("title") or "" | |
| synopsis = rec.get("synopsis") or rec.get("summary") or "" | |
| agency = rec.get("agency") or "" | |
| eligibility = rec.get("eligibility") or "" | |
| categories = " ".join(rec.get("categories") or []) if isinstance(rec.get("categories"), list) else (rec.get("categories") or "") | |
| geo = rec.get("geo") or "" | |
| return "\n".join([title, synopsis, agency, eligibility, categories, geo]).strip() | |
| def _is_capacity_building_text(text: str) -> bool: | |
| if not text: | |
| return False | |
| if any(p.search(text) for p in _EXCLUDE_PATTERNS): | |
| return False | |
| return any(p.search(text) for p in _INCLUDE_PATTERNS) | |
| def _is_pa_md_text(text: str) -> bool: | |
| if not text: | |
| return False | |
| return bool(_PA_MD_HINTS.search(text)) | |
| # -------------------- Deadline parsing helpers -------------------- | |
| # Common date shapes: "October 15, 2025", "Oct 15, 2025", "10/15/2025", "2025-10-15" | |
| _MONTHS = { | |
| "january":1, "jan":1, "february":2, "feb":2, "march":3, "mar":3, "april":4, "apr":4, | |
| "may":5, "june":6, "jun":6, "july":7, "jul":7, "august":8, "aug":8, "september":9, "sep":9, "sept":9, | |
| "october":10, "oct":10, "november":11, "nov":11, "december":12, "dec":12 | |
| } | |
| # Capture label + date snippets | |
| _DEADLINE_LINES = re.compile( | |
| r"(deadline|applications?\s+due|closing\s+date|due\s+date|submission\s+deadline)\s*[:\-]?\s*(.+)", | |
| re.I | |
| ) | |
| # Capture absolute dates | |
| _DATE_ISO = re.compile(r"\b(20\d{2})[-/\.](\d{1,2})[-/\.](\d{1,2})\b") # 2025-10-15 or 2025/10/15 | |
| _DATE_US = re.compile(r"\b(\d{1,2})[-/\.](\d{1,2})[-/\.](20\d{2})\b") # 10/15/2025 | |
| _DATE_LONG = re.compile( | |
| r"\b([A-Za-z]{3,9})\s+(\d{1,2})(?:st|nd|rd|th)?\,?\s+(20\d{2})\b", re.I) # Oct 15, 2025 / October 15 2025 | |
| def _to_iso(y:int, m:int, d:int) -> Optional[str]: | |
| try: | |
| return datetime(y, m, d, tzinfo=timezone.utc).date().isoformat() | |
| except Exception: | |
| return None | |
| def _parse_date_fragment(s: str) -> Optional[str]: | |
| s = (s or "").strip() | |
| # ISO first | |
| m = _DATE_ISO.search(s) | |
| if m: | |
| y, mm, dd = int(m.group(1)), int(m.group(2)), int(m.group(3)) | |
| return _to_iso(y, mm, dd) | |
| # US 10/15/2025 | |
| m = _DATE_US.search(s) | |
| if m: | |
| mm, dd, y = int(m.group(1)), int(m.group(2)), int(m.group(3)) | |
| return _to_iso(y, mm, dd) | |
| # Long months | |
| m = _DATE_LONG.search(s) | |
| if m: | |
| mon = m.group(1).lower() | |
| dd = int(m.group(2)) | |
| y = int(m.group(3)) | |
| mm = _MONTHS.get(mon) | |
| if mm: | |
| return _to_iso(y, mm, dd) | |
| return None | |
| def _extract_deadline(text: str) -> Tuple[Optional[str], Optional[str]]: | |
| """ | |
| Try to locate a deadline-like line, then parse a date. | |
| Returns (deadline_iso, raw_snippet) or (None, None). | |
| """ | |
| if not text: | |
| return None, None | |
| # Look for labeled lines first (strong signal) | |
| for line in text.splitlines(): | |
| m = _DEADLINE_LINES.search(line) | |
| if m: | |
| iso = _parse_date_fragment(m.group(2)) | |
| if iso: | |
| return iso, line.strip() | |
| # Fallback: scan whole text for any date (first match) | |
| iso = _parse_date_fragment(text) | |
| if iso: | |
| return iso, None | |
| return None, None | |
| def _compute_is_active(deadline_iso: Optional[str]) -> bool: | |
| if not deadline_iso: | |
| return True # treat unknown/TBD as active | |
| try: | |
| return datetime.fromisoformat(deadline_iso).date() >= datetime.utcnow().date() | |
| except Exception: | |
| return True | |
| def _attach_matt25_tags(rec: Dict[str, Any]) -> None: | |
| """Add Matthew 25 tags by inspecting core text fields; preserve any manual tags.""" | |
| blob = " ".join(filter(None, [ | |
| rec.get("title", ""), | |
| rec.get("synopsis") or rec.get("summary") or "", | |
| rec.get("eligibility", ""), | |
| rec.get("agency", ""), | |
| ])) | |
| manual = rec.get("tags") or [] | |
| if not isinstance(manual, list): | |
| manual = [manual] | |
| auto = infer_matt25_tags(blob) | |
| rec["tags"] = sorted(set(manual) | set(auto)) | |
| # -------------------- Grants.gov collector -------------------- | |
| def _collect_from_grantsgov_api(src: Dict) -> List[Dict[str, Any]]: | |
| """ | |
| Calls the Grants.gov Search2 client and returns a list of RAW dicts | |
| (adapter may already be close to unified; we'll still run normalize()). | |
| """ | |
| from app.sources.grantsgov_api import search_grants # local import to avoid cycles | |
| api = src.get("api", {}) | |
| page_size = int(api.get("page_size", src.get("page_size", 100))) | |
| max_pages = int(api.get("max_pages", src.get("max_pages", 5))) | |
| payload = api.get("payload", src.get("payload", {})) | |
| url = src.get("url", "") | |
| out = search_grants(url, payload, page_size=page_size, max_pages=max_pages) | |
| hits = out.get("hits", []) if isinstance(out, dict) else (out or []) | |
| return [h for h in hits if isinstance(h, dict)] | |
| # -------------------- HTTP helpers -------------------- | |
| _HTTP_HEADERS = { | |
| "User-Agent": "grants-rag/1.0 (+https://example.local) requests", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| } | |
| def _http_get(url: str, timeout: int = 20) -> Optional[requests.Response]: | |
| try: | |
| r = requests.get(url, headers=_HTTP_HEADERS, timeout=timeout) | |
| if r.status_code == 200 and r.content: | |
| return r | |
| except requests.RequestException: | |
| return None | |
| return None | |
| def _soup(html: str) -> BeautifulSoup: | |
| # use lxml or html5lib if available for robustness | |
| return BeautifulSoup(html, "lxml") | |
| def _text_from_soup(s: BeautifulSoup, selectors: Optional[List[str]] = None) -> Tuple[str, str]: | |
| """ | |
| Returns (title, text). Uses selectors if provided; | |
| falls back to common content containers. | |
| """ | |
| title = s.title.string.strip() if s.title and s.title.string else "" | |
| nodes = [] | |
| if selectors: | |
| for css in selectors: | |
| nodes.extend(s.select(css) or []) | |
| if not nodes: | |
| for css in ("main", "article", "#content", ".content", "[role='main']"): | |
| nodes.extend(s.select(css) or []) | |
| if not nodes: | |
| nodes = [s.body] if s.body else [] | |
| parts: List[str] = [] | |
| for n in nodes: | |
| if not n: | |
| continue | |
| txt = n.get_text(separator="\n", strip=True) | |
| if txt: | |
| parts.append(txt) | |
| body = "\n\n".join(parts).strip() | |
| return title, body | |
| def _make_id(*fields: str) -> str: | |
| h = hashlib.sha1() | |
| for f in fields: | |
| if f: | |
| h.update(f.encode("utf-8", "ignore")) | |
| h.update(b"|") | |
| return h.hexdigest() | |
| def _normalize_web_record( | |
| source_name: str, | |
| url: str, | |
| title: str, | |
| body: str, | |
| static: Dict[str, Any], | |
| extra: Optional[Dict[str, Any]] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Produce a record shaped like normalize() output so downstream stays unchanged. | |
| Adds parsed deadline + is_active when available. | |
| """ | |
| extra = extra or {} | |
| # If caller didn't pass a deadline, try to parse from body/title here. | |
| deadline_iso = extra.get("deadline") | |
| deadline_text = extra.get("deadline_text") | |
| if not deadline_iso: | |
| # Try parsing from body first, then from title. | |
| deadline_iso, deadline_text = _extract_deadline(body) or (None, None) | |
| if not deadline_iso and title: | |
| deadline_iso, _ = _extract_deadline(title) or (None, None) | |
| rec = { | |
| "id": extra.get("id") or _make_id(url, title or body[:160]), | |
| "title": title or extra.get("title") or url, | |
| "synopsis": (body or "")[:2000], # clip; embeddings use title+synopsis later | |
| "summary": None, | |
| "url": url, | |
| "source": source_name, | |
| "geo": static.get("geo"), | |
| "categories": static.get("categories"), | |
| "agency": extra.get("agency", ""), | |
| "eligibility": extra.get("eligibility", ""), | |
| # Store ISO (YYYY-MM-DD) in 'deadline' for consistency | |
| "deadline": deadline_iso, | |
| "deadline_text": deadline_text, # keep the raw line we matched (if any) | |
| "program_number": extra.get("program_number"), | |
| "posted_date": extra.get("posted_date"), | |
| } | |
| rec["is_active"] = _compute_is_active(rec["deadline"]) | |
| # NEW: add Matthew 25 tags based on title/synopsis/eligibility text | |
| _attach_matt25_tags(rec) | |
| return rec | |
| # -------------------- Collectors: http_html / web_page -------------------- | |
| def _collect_from_http_html(entry: Dict, source_name: str, static: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """ | |
| Supports types: 'web_page' and 'http_html' | |
| Config keys supported: | |
| - url (str) | |
| - parse: { follow_links: bool, link_selectors: [..], content_selectors: [..] } | |
| - crawl: { schedule: "...", max_depth: int } # max_depth 0/None = only landing | |
| """ | |
| url = entry.get("url") | |
| if not url: | |
| return [] | |
| r = _http_get(url) | |
| if not r: | |
| return [] | |
| s = _soup(r.text) | |
| parse = entry.get("parse", {}) or entry.get("extract", {}) or {} | |
| content_selectors = parse.get("content_selectors") or [] | |
| title, body = _text_from_soup(s, content_selectors) | |
| rows = [] | |
| rows.append(_normalize_web_record(source_name, url, title, body, static, extra={"posted_date": None})) | |
| # follow links? | |
| follow = bool(parse.get("follow_links")) | |
| link_selectors = parse.get("link_selectors") or [] | |
| crawl = entry.get("crawl", {}) or {} | |
| max_depth = int(crawl.get("max_depth", 0) or 0) | |
| visited = set([url]) | |
| def _enq_links(soup: BeautifulSoup) -> List[str]: | |
| if link_selectors: | |
| links = [] | |
| for sel in link_selectors: | |
| for a in soup.select(sel) or []: | |
| href = a.get("href") | |
| if href and href.startswith("http"): | |
| links.append(href) | |
| out, seen = [], set() | |
| for h in links: | |
| if h not in seen: | |
| out.append(h) | |
| seen.add(h) | |
| return out[:40] # polite cap | |
| return [] | |
| if follow and max_depth > 0: | |
| frontier = _enq_links(s) | |
| depth = 1 | |
| while frontier and depth <= max_depth and len(rows) < 200: | |
| next_frontier = [] | |
| for link in frontier: | |
| if link in visited: | |
| continue | |
| visited.add(link) | |
| rr = _http_get(link) | |
| if not rr: | |
| continue | |
| ss = _soup(rr.text) | |
| t2, b2 = _text_from_soup(ss, content_selectors) | |
| if b2: | |
| rows.append(_normalize_web_record(source_name, link, t2, b2, static, extra={"posted_date": None})) | |
| if depth < max_depth: | |
| next_frontier.extend(_enq_links(ss)) | |
| time.sleep(0.1) # gentle | |
| frontier = next_frontier | |
| depth += 1 | |
| return rows | |
| # -------------------- Collector: http_html_js (Playwright) -------------------- | |
| def _collect_from_http_html_js(entry: Dict, source_name: str, static: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """ | |
| JS-rendered pages using Playwright, with per-card extraction and robust scrolling. | |
| entry.options: | |
| - wait_for (css or ms int) | |
| - scroll (bool) | |
| - scroll_selector (css) # NEW: scroll a container div, not the window | |
| - scroll_times (int) # NEW: default 20 | |
| - scroll_wait_ms (int) # NEW: default 400 | |
| - min_cards (int) # NEW: wait until at least N cards exist | |
| - click_selector (css) | |
| - max_pages (int) | |
| - timeout_ms (int) | |
| - network_idle (bool) | |
| - debug (bool) | |
| entry.selectors: card, title, link, description, meta | |
| """ | |
| try: | |
| from playwright.sync_api import sync_playwright # type: ignore | |
| except Exception: | |
| print(f"[collect][skip] {source_name}: Playwright not installed.") | |
| return [] | |
| url = entry.get("url") | |
| if not url: | |
| return [] | |
| options = entry.get("options", {}) or {} | |
| parse = entry.get("parse", {}) or entry.get("extract", {}) or {} | |
| selectors = entry.get("selectors", {}) or {} | |
| content_selectors = parse.get("content_selectors") or [] | |
| timeout_ms = int(options.get("timeout_ms", 6000)) | |
| network_idle = bool(options.get("network_idle", True)) | |
| debug = bool(options.get("debug", False)) | |
| max_pages = int(options.get("max_pages", 1)) | |
| click_sel = options.get("click_selector") or entry.get("next_selector") | |
| wait_for = options.get("wait_for") | |
| rows: List[Dict[str, Any]] = [] | |
| def _text_first(soup: BeautifulSoup, css_list: str) -> str: | |
| if not css_list: | |
| return "" | |
| for css in [c.strip() for c in css_list.split(",")]: | |
| el = soup.select_one(css) | |
| if el: | |
| txt = el.get_text(separator=" ", strip=True) | |
| if txt: | |
| return txt | |
| return "" | |
| def _attr_first(soup: BeautifulSoup, css_list: str, attr: str) -> Optional[str]: | |
| if not css_list: | |
| return None | |
| for css in [c.strip() for c in css_list.split(",")]: | |
| el = soup.select_one(css) | |
| if el: | |
| val = el.get(attr) | |
| if val: | |
| return val | |
| return None | |
| def _parse_cards(page_html: str, base_url: str) -> List[Dict[str, Any]]: | |
| s = _soup(page_html) | |
| card_css = selectors.get("card", "") | |
| if not card_css: | |
| title, body = _text_from_soup(s, content_selectors) | |
| return [_normalize_web_record(source_name, base_url, title, body, static, extra={"posted_date": None})] | |
| out: List[Dict[str, Any]] = [] | |
| for card in s.select(card_css) or []: | |
| csoup = BeautifulSoup(str(card), "lxml") | |
| title = _text_first(csoup, selectors.get("title", "h1, h2, h3")) | |
| href = _attr_first(csoup, selectors.get("link", "a"), "href") | |
| link = urljoin(base_url, href) if href else base_url | |
| desc = _text_first(csoup, selectors.get("description", "p, .summary, .excerpt, .card-text")) | |
| meta = _text_first(csoup, selectors.get("meta", ".meta, .tags, .badge, .location")) | |
| body = "\n".join([p for p in (desc, meta) if p]).strip() | |
| if not (title or body): | |
| continue | |
| out.append(_normalize_web_record(source_name, link, title or link, body, static, extra={"posted_date": None})) | |
| return out | |
| def _wait_page_ready(page, *, wait_for, timeout_ms, options, selectors): | |
| # wait_for can be CSS or milliseconds | |
| if isinstance(wait_for, int): | |
| page.wait_for_timeout(wait_for) | |
| elif isinstance(wait_for, str) and wait_for: | |
| page.wait_for_selector(wait_for, timeout=min(timeout_ms, 15000)) | |
| # Scroll window or a container div | |
| if options.get("scroll"): | |
| scroll_sel = options.get("scroll_selector") | |
| scroll_times = int(options.get("scroll_times", 20)) | |
| scroll_wait = int(options.get("scroll_wait_ms", 400)) | |
| if scroll_sel: | |
| page.evaluate( | |
| """(sel, times, wait) => new Promise(res => { | |
| const el = document.querySelector(sel); | |
| if (!el) { res(); return; } | |
| let i = 0; | |
| const t = setInterval(() => { | |
| const y = el.scrollTop; | |
| el.scrollTop = el.scrollHeight; | |
| i++; | |
| if (el.scrollTop === y || i >= times) { clearInterval(t); res(); } | |
| }, wait); | |
| })""", | |
| scroll_sel, scroll_times, scroll_wait | |
| ) | |
| else: | |
| page.evaluate( | |
| """(times, wait) => new Promise(res => { | |
| let i = 0; | |
| const t = setInterval(() => { | |
| const y = window.scrollY; | |
| window.scrollBy(0, document.body.scrollHeight); | |
| i++; | |
| if (window.scrollY === y || i >= times) { clearInterval(t); res(); } | |
| }, wait); | |
| })""", | |
| scroll_times, scroll_wait | |
| ) | |
| # Optionally wait for a minimum number of cards (virtualized lists) | |
| min_cards = int(options.get("min_cards", 0)) | |
| card_css = (selectors or {}).get("card", "") | |
| if min_cards and card_css: | |
| try: | |
| page.wait_for_function( | |
| """([sel, target]) => { | |
| const els = document.querySelectorAll(sel); | |
| return els && els.length >= target; | |
| }""", | |
| arg=[card_css, min_cards], | |
| timeout=min(timeout_ms, 15000), | |
| ) | |
| except Exception: | |
| pass # best-effort | |
| def _try_once(p): | |
| def _route(route): | |
| r = route.request | |
| if r.resource_type in {"image", "media", "font"}: | |
| return route.abort() | |
| return route.continue_() | |
| browser = p.chromium.launch(headless=not debug) | |
| context = browser.new_context(user_agent=( | |
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0 Safari/537.36" | |
| )) | |
| context.route("**/*", _route) | |
| page = context.new_page() | |
| try: | |
| page.set_default_timeout(timeout_ms) | |
| page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms) | |
| if network_idle: | |
| page.wait_for_load_state("networkidle", timeout=min(timeout_ms, 15000)) | |
| _wait_page_ready(page, wait_for=wait_for, timeout_ms=timeout_ms, options=options, selectors=selectors) | |
| htmls = [page.content()] | |
| # pagination | |
| for _ in range(max_pages - 1): | |
| sel = click_sel | |
| if not sel or page.locator(sel).count() == 0: | |
| break | |
| page.click(sel) | |
| page.wait_for_load_state("domcontentloaded") | |
| if network_idle: | |
| page.wait_for_load_state("networkidle", timeout=min(timeout_ms, 15000)) | |
| _wait_page_ready(page, wait_for=wait_for, timeout_ms=timeout_ms, options=options, selectors=selectors) | |
| htmls.append(page.content()) | |
| for html in htmls: | |
| found = _parse_cards(html, url) | |
| rows.extend(found) | |
| print(f"[collect][cards] {source_name}: found {len(found)} cards on this page") | |
| browser.close() | |
| return True | |
| except Exception as e: | |
| if debug: | |
| try: | |
| snap = DOCSTORE_DIR / f"playwright_error_{hashlib.sha1(url.encode()).hexdigest()}.png" | |
| page.screenshot(path=str(snap)) | |
| print(f"[collect][debug] Saved screenshot: {snap}") | |
| except Exception: | |
| pass | |
| print(f"[collect][warn] {source_name}: {e.__class__.__name__}: {e}") | |
| try: | |
| browser.close() | |
| except Exception: | |
| pass | |
| return False | |
| from playwright.sync_api import sync_playwright # late import for clarity | |
| with sync_playwright() as p: | |
| ok = _try_once(p) | |
| if not ok: | |
| time.sleep(1.5) | |
| _try_once(p) | |
| if not rows: | |
| print(f"[collect][skip] {source_name}: no content after retries.") | |
| return rows | |
| # -------------------- PDF collector -------------------- | |
| def _collect_from_http_pdf(entry: Dict, source_name: str, static: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """ | |
| type: 'http_pdf' | |
| keys: | |
| - url (single PDF fetch) | |
| """ | |
| url = entry.get("url") | |
| if not url: | |
| return [] | |
| try: | |
| from pdfminer.high_level import extract_text # lazy import | |
| except Exception: | |
| return [] | |
| rows = [] | |
| r = _http_get(url, timeout=40) | |
| if not r: | |
| return rows | |
| tmp = DOCSTORE_DIR / (hashlib.sha1(url.encode("utf-8")).hexdigest() + ".pdf") | |
| try: | |
| DOCSTORE_DIR.mkdir(parents=True, exist_ok=True) | |
| tmp.write_bytes(r.content) | |
| body = extract_text(str(tmp)) or "" | |
| finally: | |
| try: | |
| tmp.unlink(missing_ok=True) | |
| except Exception: | |
| pass | |
| title = entry.get("name") or "PDF Document" | |
| rows.append(_normalize_web_record(source_name, url, title, body, static, extra={"posted_date": None})) | |
| return rows | |
| # -------------------- De-dup helpers (5.2) -------------------- | |
| def _norm(text: str) -> str: | |
| t = (text or "").lower() | |
| t = re.sub(r'[^a-z0-9 ]+', ' ', t) | |
| return re.sub(r'\s+', ' ', t).strip() | |
| def _hash_fingerprint(title: str, agency: str, deadline: str) -> str: | |
| base = f"{_norm(title)}|{_norm(agency)}|{deadline or ''}" | |
| return hashlib.sha1(base.encode()).hexdigest() | |
| def _near_duplicate(a: Dict[str, Any], b: Dict[str, Any]) -> bool: | |
| # Deadlines equal or both missing | |
| dates_close = (a.get("deadline") == b.get("deadline")) or (not a.get("deadline") and not b.get("deadline")) | |
| t_sim = SequenceMatcher(None, _norm(a.get("title","")), _norm(b.get("title",""))).ratio() | |
| ag_sim = SequenceMatcher(None, _norm(a.get("agency","")), _norm(b.get("agency",""))).ratio() | |
| return dates_close and (t_sim > 0.88) and (ag_sim > 0.75) | |
| def _merge_records(primary: Dict[str, Any], other: Dict[str, Any]) -> Dict[str, Any]: | |
| """Merge fields while preserving best data and provenance.""" | |
| merged = dict(primary) | |
| # Prefer non-empty fields; combine categories; keep earliest posted_date; keep earliest deadline if different. | |
| def choose(a, b): | |
| return a if (a not in (None, "", [], {})) else b | |
| merged["url"] = choose(primary.get("url"), other.get("url")) | |
| merged["title"] = choose(primary.get("title"), other.get("title")) | |
| merged["synopsis"] = choose(primary.get("synopsis"), other.get("synopsis")) | |
| merged["summary"] = choose(primary.get("summary"), other.get("summary")) | |
| merged["agency"] = choose(primary.get("agency"), other.get("agency")) | |
| merged["eligibility"] = choose(primary.get("eligibility"), other.get("eligibility")) | |
| merged["program_number"] = choose(primary.get("program_number"), other.get("program_number")) | |
| merged["geo"] = choose(primary.get("geo"), other.get("geo")) | |
| # categories β union (list) | |
| cats_a = primary.get("categories") or [] | |
| cats_b = other.get("categories") or [] | |
| if not isinstance(cats_a, list): cats_a = [cats_a] | |
| if not isinstance(cats_b, list): cats_b = [cats_b] | |
| merged["categories"] = sorted(set([c for c in cats_a + cats_b if c])) | |
| # deadline: choose earlier known date (safer to surface sooner one) | |
| da, db = primary.get("deadline"), other.get("deadline") | |
| if da and db: | |
| merged["deadline"] = min(da, db) | |
| else: | |
| merged["deadline"] = da or db | |
| # carry a deadline_text if any | |
| merged["deadline_text"] = choose(primary.get("deadline_text"), other.get("deadline_text")) | |
| merged["is_active"] = _compute_is_active(merged.get("deadline")) | |
| # posted_date: keep earliest if both | |
| pa, pb = primary.get("posted_date"), other.get("posted_date") | |
| merged["posted_date"] = min(pa, pb) if (pa and pb) else (pa or pb) | |
| # provenance: combine sources + urls | |
| prov_sources = set() | |
| for s in (primary.get("source"), other.get("source")): | |
| if not s: continue | |
| if isinstance(s, list): prov_sources.update(s) | |
| else: prov_sources.add(s) | |
| merged["source"] = sorted(prov_sources) if prov_sources else None | |
| prov_urls = set() | |
| for u in (primary.get("url"), other.get("url")): | |
| if u: prov_urls.add(u) | |
| # keep a list of all discovered urls | |
| merged["all_urls"] = sorted(prov_urls.union(set(primary.get("all_urls") or []), set(other.get("all_urls") or []))) | |
| # recompute ID based on merged fingerprint (title/agency/deadline) | |
| merged["id"] = _hash_fingerprint(merged.get("title",""), merged.get("agency",""), merged.get("deadline","")) | |
| return merged | |
| def _dedupe_and_merge(rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
| """Exact fingerprint + fuzzy near-dup consolidation across sources.""" | |
| uniques: List[Dict[str, Any]] = [] | |
| by_fp: Dict[str, int] = {} | |
| for r in rows: | |
| fp = _hash_fingerprint(r.get("title",""), r.get("agency",""), r.get("deadline","")) | |
| if fp in by_fp: | |
| # exact dup: merge into existing | |
| idx = by_fp[fp] | |
| uniques[idx] = _merge_records(uniques[idx], r) | |
| continue | |
| # fuzzy check against current uniques | |
| found_idx = None | |
| for i, u in enumerate(uniques): | |
| if _near_duplicate(r, u): | |
| found_idx = i | |
| break | |
| if found_idx is not None: | |
| uniques[found_idx] = _merge_records(uniques[found_idx], r) | |
| # also index its fingerprint (so later exact matches land here) | |
| new_fp = _hash_fingerprint(uniques[found_idx].get("title",""), | |
| uniques[found_idx].get("agency",""), | |
| uniques[found_idx].get("deadline","")) | |
| by_fp[new_fp] = found_idx | |
| else: | |
| by_fp[fp] = len(uniques) | |
| # initialize provenance | |
| r.setdefault("all_urls", [r.get("url")] if r.get("url") else []) | |
| uniques.append(r) | |
| return uniques | |
| # -------------------- Write docstore & build index -------------------- | |
| def _save_docstore(recs: List[Dict, Any]) -> str: | |
| DOCSTORE_DIR.mkdir(parents=True, exist_ok=True) | |
| path = DOCSTORE_DIR / "docstore.jsonl" | |
| with path.open("w", encoding="utf-8") as f: | |
| for r in recs: | |
| f.write(json.dumps(r, ensure_ascii=False) + "\n") | |
| return str(path) | |
| def _build_index_from_docstore() -> int: | |
| ds_path = DOCSTORE_DIR / "docstore.jsonl" | |
| if not ds_path.exists(): | |
| raise RuntimeError("Docstore not found. Run ingest first.") | |
| texts: List[str] = [] | |
| metas: List[Dict[str, Any]] = [] | |
| with ds_path.open("r", encoding="utf-8") as f: | |
| for line in f: | |
| rec = json.loads(line) | |
| title = rec.get("title") or "" | |
| synopsis = rec.get("synopsis") or rec.get("summary") or "" | |
| agency = rec.get("agency") or "" | |
| eligibility = rec.get("eligibility") or "" | |
| txt = "\n".join([title, synopsis, agency, eligibility]).strip() | |
| if not txt: | |
| continue | |
| texts.append(txt) | |
| metas.append({ | |
| "id": rec.get("id"), | |
| "title": title, | |
| "url": rec.get("url"), | |
| "source": rec.get("source"), | |
| "tags": rec.get("tags"), | |
| "geo": rec.get("geo"), | |
| "categories": rec.get("categories"), | |
| "agency": agency, | |
| "deadline": rec.get("deadline"), # ISO if available | |
| "deadline_text": rec.get("deadline_text"), | |
| "is_active": rec.get("is_active"), | |
| "program_number": rec.get("program_number"), | |
| "posted_date": rec.get("posted_date"), | |
| "all_urls": rec.get("all_urls"), | |
| }) | |
| print(f"[index] Rows loaded from docstore: {len(texts)}") | |
| if not texts: | |
| INDEX_DIR.mkdir(parents=True, exist_ok=True) | |
| (INDEX_DIR / "meta.json").write_text(json.dumps([], ensure_ascii=False)) | |
| print("[index] No texts to embed. Wrote empty meta.json.") | |
| return 0 | |
| # Embed (CPU default; portable) | |
| model = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") | |
| model.max_seq_length = 256 | |
| batch = max(8, min(32, len(texts))) | |
| emb = model.encode( | |
| texts, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True, | |
| show_progress_bar=True, | |
| batch_size=batch, | |
| ).astype(np.float32, copy=False) | |
| # FAISS index (Inner Product for cosine on normalized vectors) | |
| import faiss | |
| dim = emb.shape[1] | |
| index = faiss.IndexFlatIP(dim) | |
| index.add(emb) | |
| INDEX_DIR.mkdir(parents=True, exist_ok=True) | |
| faiss.write_index(index, str(INDEX_DIR / "faiss.index")) | |
| (INDEX_DIR / "meta.json").write_text(json.dumps(metas, ensure_ascii=False)) | |
| print(f"[index] Wrote FAISS index with {emb.shape[0]} vectors (dim={dim}).") | |
| return len(texts) | |
| # -------------------- Public API: ingest -------------------- | |
| __all__ = ["ingest"] | |
| def ingest(cfg_path: str = "config/sources.yaml", env: Dict | None = None): | |
| """ | |
| Reads config, fetches from enabled sources via adapters, normalizes to a single schema, | |
| applies filters (capacity / PA-MD), de-dupes, writes docstore, and builds the FAISS index. | |
| Returns (docstore_path, n_indexed). | |
| """ | |
| cfg = load_config(cfg_path) | |
| # ---- Filters from config ---- | |
| f_cfg = (cfg or {}).get("filters", {}) or {} | |
| capacity_only = bool(f_cfg.get("capacity_only", True)) | |
| pa_md_only = bool(f_cfg.get("pa_md_only", False)) | |
| print(f"[filters] capacity_only = {'TRUE' if capacity_only else 'FALSE'}") | |
| print(f"[filters] pa_md_only = {'TRUE' if pa_md_only else 'FALSE'}") | |
| all_rows: List[Dict[str, Any]] = [] | |
| for entry in cfg.get("sources", []): | |
| if not entry.get("enabled"): | |
| continue | |
| name = entry.get("name", "<source>") | |
| geo = entry.get("geo") or "US" | |
| cats = entry.get("categories") or [] | |
| static = {"geo": geo, "categories": cats} | |
| typ = entry.get("type") | |
| rows: List[Dict[str, Any]] = [] | |
| # -------- Collect from each adapter -------- | |
| if typ == "grantsgov_api": | |
| raw_hits = _collect_from_grantsgov_api(entry) | |
| rows = [normalize("grants_gov", h, static) for h in raw_hits] | |
| elif typ in ("web_page", "http_html"): | |
| rows = _collect_from_http_html(entry, name, static) | |
| elif typ == "http_html_js": | |
| rows = _collect_from_http_html_js(entry, name, static) | |
| elif typ == "http_pdf": | |
| rows = _collect_from_http_pdf(entry, name, static) | |
| elif typ == "local_sample": | |
| p = Path(entry["path"]).expanduser() | |
| blob = json.loads(p.read_text(encoding="utf-8")) | |
| items = blob.get("opportunities") or [] | |
| rows = [normalize("local_sample", op, static) for op in items] | |
| else: | |
| print(f"[collect] {name}: unknown type '{typ}', skipping.") | |
| continue | |
| print(f"[collect] {name}: fetched_rows={len(rows)}") | |
| # ---- Apply capacity / geo filters BEFORE indexing (allow per-source bypass) ---- | |
| if rows: | |
| if entry.get("skip_filters"): | |
| print(f"[filter] {name}: skip_filters=true β keeping all {len(rows)}") | |
| else: | |
| pre = len(rows) | |
| filtered = [] | |
| for r in rows: | |
| t = _doc_text_from_row(r) | |
| if capacity_only and not _is_capacity_building_text(t): | |
| continue | |
| if pa_md_only and not _is_pa_md_text(t): | |
| continue | |
| filtered.append(r) | |
| print( | |
| f"[filter] {name}: kept {len(filtered)}/{pre} after filters " | |
| f"(capacity_only={capacity_only}, pa_md_only={pa_md_only})" | |
| ) | |
| rows = filtered | |
| # ---- Apply capacity / geo filters BEFORE indexing (allow per-source bypass) ---- | |
| if rows: | |
| if entry.get("skip_filters"): | |
| print(f"[filter] {name}: skip_filters=true β keeping all {len(rows)}") | |
| else: | |
| pre = len(rows) | |
| filtered = [] | |
| for r in rows: | |
| t = _doc_text_from_row(r) | |
| if capacity_only and not _is_capacity_building_text(t): | |
| continue | |
| if pa_md_only and not _is_pa_md_text(t): | |
| continue | |
| filtered.append(r) | |
| print( | |
| f"[filter] {name}: kept {len(filtered)}/{pre} after filters " | |
| f"(capacity_only={capacity_only}, pa_md_only={pa_md_only})" | |
| ) | |
| rows = filtered | |
| # >>> GLOBAL MATT 25 TAGGING (safer, one place) <<< | |
| for r in rows: | |
| # If you truly want to leave Grants.gov untouched, keep this guard: | |
| if r.get("source") == "grants_gov": | |
| continue | |
| _attach_matt25_tags(r) | |
| print(f"[collect] {name} β rows_after_filters={len(rows)}") | |
| all_rows.extend(rows) | |
| # ---- Cross-source DEDUPE + MERGE ---- | |
| unique = _dedupe_and_merge(all_rows) | |
| print(f"[ingest] Unique records to index: {len(unique)}") | |
| path = _save_docstore(unique) | |
| n = _build_index_from_docstore() | |
| return path, n | |
| # -------------------- CLI -------------------- | |
| if __name__ == "__main__": | |
| import argparse | |
| ap = argparse.ArgumentParser() | |
| ap.add_argument("--config", default="config/sources.yaml") | |
| args = ap.parse_args() | |
| p, n = ingest(args.config) | |
| print(f"Ingested {n} records. Docstore at {p}") | |