Spaces:
Running
Running
Major update. Support for 15 LLMs, World Flora Online taxonomy validation, geolocation, 2 OCR methods, significant UI changes, stability improvements, consistent JSON parsing
ae215ea
| import itertools, wikipediaapi, requests, re, json | |
| from langchain_community.tools import WikipediaQueryRun | |
| from langchain_community.utilities import WikipediaAPIWrapper | |
| # from langchain_community.tools.wikidata.tool import WikidataAPIWrapper, WikidataQueryRun | |
| import cProfile | |
| import pstats | |
| class WikipediaLinks(): | |
| def __init__(self, tool_wikipedia, json_file_path_wiki) -> None: | |
| self.is_enabled = tool_wikipedia | |
| self.json_file_path_wiki = json_file_path_wiki | |
| self.wiki_wiki = wikipediaapi.Wikipedia( | |
| user_agent='VoucherVision ([email protected])', | |
| language='en' | |
| ) | |
| self.property_to_rank = { | |
| 'P225': 'Species', | |
| 'P171': 'Family', | |
| 'P105': 'Taxon rank', | |
| 'P70': 'Genus', | |
| 'P75': 'Clade', | |
| 'P76': 'Subgenus', | |
| 'P67': 'Subfamily', | |
| 'P66': 'Tribe', | |
| 'P71': 'Subtribe', | |
| 'P61': 'Order', | |
| 'P72': 'Suborder', | |
| 'P73': 'Infraorder', | |
| 'P74': 'Superfamily', | |
| 'P142': 'Phylum', | |
| 'P75': 'Clade', | |
| 'P76': 'Subclass', | |
| 'P77': 'Infraclass', | |
| 'P78': 'Superorder', | |
| 'P81': 'Class', | |
| 'P82': 'Superclass', | |
| 'P84': 'Kingdom', | |
| 'P85': 'Superkingdom', | |
| 'P86': 'Subkingdom', | |
| 'P87': 'Infrakingdom', | |
| 'P88': 'Parvkingdom', | |
| 'P89': 'Domain', | |
| 'P1421': 'GRIN', | |
| 'P1070': 'KEW', | |
| 'P5037': 'POWOID', | |
| } | |
| def get_label_for_entity_id(self, entity_id): | |
| url = "https://www.wikidata.org/w/api.php" | |
| params = { | |
| "action": "wbgetentities", | |
| "format": "json", | |
| "ids": entity_id, | |
| "props": "labels", | |
| "languages": "en" # Assuming you want the label in English | |
| } | |
| response = requests.get(url, params=params) | |
| data = response.json() | |
| return data['entities'][entity_id]['labels']['en']['value'] if 'en' in data['entities'][entity_id]['labels'] else None | |
| def is_valid_url(self, url): | |
| try: | |
| response = requests.head(url, allow_redirects=True, timeout=5) | |
| # If the response status code is 200, the URL is reachable | |
| return response.status_code == 200 | |
| except requests.RequestException as e: | |
| # If there was some issue with the request, such as the domain does not exist | |
| # print(f"URL {url} is not reachable. Error: {e}") | |
| return False | |
| # def get_infobar_data(self, wiki_page_title): | |
| # # Step 1: Extract the Wikidata Item ID from the Wikipedia page | |
| # wiki_api_url = "https://en.wikipedia.org/w/api.php" | |
| # wiki_params = { | |
| # "action": "query", | |
| # "format": "json", | |
| # "titles": wiki_page_title, | |
| # "prop": "revisions", | |
| # "rvprop": "content", | |
| # "rvslots": "*" | |
| # } | |
| # wiki_response = requests.get(wiki_api_url, params=wiki_params) | |
| # wiki_data = wiki_response.json() | |
| # page_key = next(iter(wiki_data['query']['pages'])) | |
| # content = wiki_data['query']['pages'][page_key]['revisions'][0]['slots']['main']['*'] | |
| # infobox_pattern = re.compile(r'\{\{Infobox.*?\|title\}\}', re.DOTALL) | |
| # match = infobox_pattern.search(content) | |
| # if match: | |
| # wikidata_id = match.group(1) # Returns the full match including the 'Infobox' braces | |
| # else: | |
| # return "Infobox not found" | |
| # # Step 2: Fetch Data from Wikidata Using the Extracted ID | |
| # wikidata_api_url = "https://www.wikidata.org/w/api.php" | |
| # wikidata_params = { | |
| # "action": "wbgetentities", | |
| # "format": "json", | |
| # "ids": wikidata_id, | |
| # "props": "claims" # Adjust as needed to fetch the desired data | |
| # } | |
| # wikidata_response = requests.get(wikidata_api_url, params=wikidata_params) | |
| # wikidata_content = wikidata_response.json() | |
| # classification_full = {} | |
| # classification = {} | |
| # label_cache = {} # Cache for labels | |
| # # Turn this on to see the available properties to decode | |
| # for prop_id, claims in wikidata_content['entities'][wikidata_id]['claims'].items(): | |
| # # Assuming the main snak value is what we want | |
| # value = claims[0]['mainsnak']['datavalue']['value'] | |
| # if isinstance(value, dict): # If the value is an entity ID | |
| # # entity_id = value['id'] | |
| # # entity_id = value['id'] | |
| # if prop_id not in label_cache: | |
| # label_cache[prop_id] = self.get_label_for_entity_id(prop_id) | |
| # classification_full[prop_id] = label_cache[prop_id] | |
| # else: | |
| # classification_full[prop_id] = value | |
| # print(classification_full) | |
| # Map Wikidata properties to the corresponding taxonomic ranks | |
| def convert_to_decimal(self, coord_parts): | |
| lat_deg, lat_min, lat_dir, lon_deg, lon_min, lon_dir = coord_parts[:6] | |
| lat = float(lat_deg) + float(lat_min) / 60 | |
| lon = float(lon_deg) + float(lon_min) / 60 | |
| if lat_dir == 'S': | |
| lat = -lat | |
| if lon_dir == 'W': | |
| lon = -lon | |
| return f"{lat},{lon}" | |
| def extract_coordinates_and_region(self, coord_string): | |
| # Extract the coordinate parts and region info | |
| coord_parts = re.findall(r'(\d+|\w+)', coord_string) | |
| region_info = re.search(r'region:([^|]+)\|display', coord_string) | |
| if coord_parts and len(coord_parts) >= 6: | |
| # Convert to decimal coordinates | |
| decimal_coords = self.convert_to_decimal(coord_parts) | |
| else: | |
| decimal_coords = "Invalid coordinates format" | |
| region = region_info.group(1) if region_info else "Region not found" | |
| return decimal_coords, region | |
| def parse_infobox(self, infobox_string): | |
| # Split the string into lines | |
| lines = infobox_string.split('\n') | |
| # Dictionary to store the extracted data | |
| infobox_data = {} | |
| # Iterate over each line | |
| for line in lines: | |
| # Split the line into key and value | |
| parts = line.split('=', 1) | |
| # If the line is properly formatted with a key and value | |
| if len(parts) == 2: | |
| key = parts[0].strip() | |
| key = key.split(' ')[1] | |
| value = parts[1].strip() | |
| # Handling special cases like links or coordinates | |
| if value.startswith('[[') and value.endswith(']]'): | |
| # Extracting linked article titles | |
| value = value[2:-2].split('|')[0] | |
| elif value.startswith('{{coord') and value.endswith('}}'): | |
| # Extracting coordinates | |
| value = value[7:-2] | |
| elif value.startswith('[') and value.endswith(']') and ('http' in value): | |
| value = value[1:-1] | |
| url_parts = value.split(" ") | |
| infobox_data['url_location'] = next((part for part in url_parts if 'http' in part), None) | |
| if key == 'coordinates': | |
| decimal_coordinates, region = self.extract_coordinates_and_region(value) | |
| infobox_data['region'] = region | |
| infobox_data['decimal_coordinates'] = decimal_coordinates | |
| key = self.sanitize(key) | |
| value = self.sanitize(value) | |
| value = self.remove_html_and_wiki_markup(value) | |
| # Add to dictionary | |
| infobox_data[key] = value | |
| return infobox_data | |
| def get_infobox_data(self, wiki_page_title, opt=None): | |
| wiki_api_url = "https://en.wikipedia.org/w/api.php" | |
| wiki_params = { | |
| "action": "query", | |
| "format": "json", | |
| "titles": wiki_page_title, | |
| "prop": "revisions", | |
| "rvprop": "content", | |
| "rvslots": "*" | |
| } | |
| try: | |
| wiki_response = requests.get(wiki_api_url, params=wiki_params) | |
| wiki_response.raise_for_status() # Check for HTTP errors | |
| except requests.RequestException as e: | |
| return f"Error fetching data: {e}" | |
| wiki_data = wiki_response.json() | |
| page_key = next(iter(wiki_data['query']['pages']), None) | |
| if page_key is None or "missing" in wiki_data['query']['pages'][page_key]: | |
| return "Page not found" | |
| content = wiki_data['query']['pages'][page_key]['revisions'][0]['slots']['main']['*'] | |
| infobox_pattern = re.compile(r'\{\{Infobox.*?\}\}', re.DOTALL) | |
| match = infobox_pattern.search(content) | |
| if match: | |
| infobox_content = match.group() | |
| else: | |
| self.infobox_data = {} | |
| self.infobox_data_locality = {} | |
| return "Infobox not found" | |
| if opt is None: | |
| self.infobox_data = self.parse_infobox(infobox_content) | |
| else: | |
| self.infobox_data_locality = self.parse_infobox(infobox_content) | |
| # Example usage | |
| # for prop_id, claims in wikidata_content['entities'][wikidata_id]['claims'].items(): | |
| # # Get the taxonomic rank from the mapping | |
| # rank = self.property_to_rank.get(prop_id) | |
| # if rank: | |
| # value = claims[0]['mainsnak']['datavalue']['value'] | |
| # if isinstance(value, dict): # If the value is an entity ID | |
| # entity_id = value['id'] | |
| # if entity_id not in label_cache: | |
| # label_cache[entity_id] = self.get_label_for_entity_id(entity_id) | |
| # classification[rank] = label_cache[entity_id] | |
| # else: | |
| # classification[rank] = value | |
| # try: | |
| # unknown_link = "https://powo.science.kew.org/taxon/" + classification['POWOID'] | |
| # if self.is_valid_url(unknown_link): | |
| # classification['POWOID'] = unknown_link | |
| # classification['POWOID_syn'] = unknown_link + '#synonyms' | |
| # except: | |
| # pass | |
| # return classification | |
| def get_taxonbar_data(self, wiki_page_title): | |
| # Step 1: Extract the Wikidata Item ID from the Wikipedia page | |
| wiki_api_url = "https://en.wikipedia.org/w/api.php" | |
| wiki_params = { | |
| "action": "query", | |
| "format": "json", | |
| "titles": wiki_page_title, | |
| "prop": "revisions", | |
| "rvprop": "content", | |
| "rvslots": "*" | |
| } | |
| wiki_response = requests.get(wiki_api_url, params=wiki_params) | |
| wiki_data = wiki_response.json() | |
| page_key = next(iter(wiki_data['query']['pages'])) | |
| content = wiki_data['query']['pages'][page_key]['revisions'][0]['slots']['main']['*'] | |
| taxonbar_match = re.search(r'\{\{Taxonbar\|from=(Q\d+)\}\}', content) | |
| if not taxonbar_match: | |
| return "Taxonbar not found" | |
| wikidata_id = taxonbar_match.group(1) | |
| # Step 2: Fetch Data from Wikidata Using the Extracted ID | |
| wikidata_api_url = "https://www.wikidata.org/w/api.php" | |
| wikidata_params = { | |
| "action": "wbgetentities", | |
| "format": "json", | |
| "ids": wikidata_id, | |
| "props": "claims" # Adjust as needed to fetch the desired data | |
| } | |
| wikidata_response = requests.get(wikidata_api_url, params=wikidata_params) | |
| wikidata_content = wikidata_response.json() | |
| classification_full = {} | |
| classification = {} | |
| label_cache = {} # Cache for labels | |
| # Turn this on to see the available properties to decode | |
| # for prop_id, claims in wikidata_content['entities'][wikidata_id]['claims'].items(): | |
| # # Assuming the main snak value is what we want | |
| # value = claims[0]['mainsnak']['datavalue']['value'] | |
| # if isinstance(value, dict): # If the value is an entity ID | |
| # # entity_id = value['id'] | |
| # # entity_id = value['id'] | |
| # if prop_id not in label_cache: | |
| # label_cache[prop_id] = self.get_label_for_entity_id(prop_id) | |
| # classification_full[prop_id] = label_cache[prop_id] | |
| # else: | |
| # classification_full[prop_id] = value | |
| # print(classification_full) | |
| # Map Wikidata properties to the corresponding taxonomic ranks | |
| for prop_id, claims in wikidata_content['entities'][wikidata_id]['claims'].items(): | |
| # Get the taxonomic rank from the mapping | |
| rank = self.property_to_rank.get(prop_id) | |
| if rank: | |
| value = claims[0]['mainsnak']['datavalue']['value'] | |
| if isinstance(value, dict): # If the value is an entity ID | |
| entity_id = value['id'] | |
| if entity_id not in label_cache: | |
| label_cache[entity_id] = self.get_label_for_entity_id(entity_id) | |
| classification[rank] = label_cache[entity_id] | |
| else: | |
| classification[rank] = value | |
| try: | |
| unknown_link = "https://powo.science.kew.org/taxon/" + classification['POWOID'] | |
| if self.is_valid_url(unknown_link): | |
| classification['POWOID'] = unknown_link | |
| classification['POWOID_syn'] = unknown_link + '#synonyms' | |
| except: | |
| pass | |
| return classification | |
| def extract_page_title(self, result_string): | |
| first_line = result_string.split('\n')[0] | |
| page_title = first_line.replace('Page: ', '').strip() | |
| return page_title | |
| def get_wikipedia_url(self, page_title): | |
| page = self.wiki_wiki.page(page_title) | |
| if page.exists(): | |
| return page.fullurl | |
| else: | |
| return None | |
| def extract_info_taxa(self, page): | |
| links = [] | |
| self.info_packet['WIKI_TAXA']['LINKS'] = {} | |
| self.info_packet['WIKI_TAXA']['DATA'] = {} | |
| self.info_packet['WIKI_TAXA']['DATA'].update(self.get_taxonbar_data(page.title)) | |
| # for back in page.backlinks: | |
| # back = self.sanitize(back) | |
| # if ':' not in back: | |
| # link = self.sanitize(self.get_wikipedia_url(back)) | |
| # if link not in links: | |
| # links.append(link) | |
| # self.info_packet['WIKI_TAXA']['LINKS'][back] = link | |
| def extract_info_geo(self, page, opt=None): | |
| links = [] | |
| self.info_packet['WIKI_GEO']['LINKS'] = {} | |
| if opt is None: | |
| self.get_infobox_data(page.title) | |
| else: | |
| self.get_infobox_data(page.title,opt=opt) | |
| for back in itertools.islice(page.backlinks, 10): | |
| back = self.sanitize(back) | |
| if ':' not in back: | |
| link = self.sanitize(self.get_wikipedia_url(back)) | |
| if link not in links: | |
| links.append(link) | |
| self.info_packet['WIKI_GEO']['LINKS'][back] = link | |
| def gather_geo(self, query,opt=None): | |
| if opt is None: | |
| self.info_packet['WIKI_GEO']['DATA'] = {} | |
| else: | |
| self.info_packet['WIKI_LOCALITY']['DATA'] = {} | |
| wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()) | |
| result = wikipedia.run(query) | |
| summary = result.split('Summary:')[1] | |
| summary = self.sanitize(summary) | |
| # print(result) | |
| page_title = self.extract_page_title(result) | |
| page = self.wiki_wiki.page(page_title) | |
| # Do these first, they are less likely to fail | |
| if opt is None: | |
| self.info_packet['WIKI_GEO']['PAGE_LINK'] = self.get_wikipedia_url(page_title) | |
| self.info_packet['WIKI_GEO']['PAGE_TITLE'] = page_title | |
| self.info_packet['WIKI_GEO']['SUMMARY'] = summary | |
| else: | |
| self.info_packet['WIKI_LOCALITY']['PAGE_TITLE'] = page_title | |
| self.info_packet['WIKI_LOCALITY']['PAGE_LINK'] = self.get_wikipedia_url(page_title) | |
| self.info_packet['WIKI_LOCALITY']['SUMMARY'] = summary | |
| # Check if the page exists, get the more complex data. Do it last in case of failure ########################## This might not be useful enough to justify the time | |
| # if page.exists(): | |
| # if opt is None: | |
| # self.extract_info_geo(page) | |
| # else: | |
| # self.extract_info_geo(page, opt=opt) | |
| if opt is None: | |
| self.info_packet['WIKI_GEO']['DATA'].update(self.infobox_data) | |
| else: | |
| self.info_packet['WIKI_LOCALITY']['DATA'].update(self.infobox_data_locality) | |
| def gather_taxonomy(self, query): | |
| wikipedia = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()) | |
| # query = "Tracaulon sagittatum Tracaulon sagittatum" | |
| result = wikipedia.run(query) | |
| summary = result.split('Summary:')[1] | |
| summary = self.sanitize(summary) | |
| # print(result) | |
| page_title = self.extract_page_title(result) | |
| page = self.wiki_wiki.page(page_title) | |
| # Check if the page exists | |
| if page.exists(): | |
| self.extract_info_taxa(page) | |
| self.info_packet['WIKI_TAXA']['PAGE_TITLE'] = page_title | |
| self.info_packet['WIKI_TAXA']['PAGE_LINK'] = self.get_wikipedia_url(page_title) | |
| self.info_packet['WIKI_TAXA']['SUMMARY'] = summary | |
| return self.info_packet | |
| def gather_wikipedia_results(self, output): | |
| self.info_packet = {} | |
| self.info_packet['WIKI_TAXA'] = {} | |
| self.info_packet['WIKI_GEO'] = {} | |
| self.info_packet['WIKI_LOCALITY'] = {} | |
| if self.is_enabled: | |
| municipality = output.get('municipality','') | |
| county = output.get('county','') | |
| stateProvince = output.get('stateProvince','') | |
| country = output.get('country','') | |
| locality = output.get('locality','') | |
| order = output.get('order','') | |
| family = output.get('family','') | |
| scientificName = output.get('scientificName','') | |
| genus = output.get('genus','') | |
| specificEpithet = output.get('specificEpithet','') | |
| query_geo = ' '.join([municipality, county, stateProvince, country]).strip() | |
| query_locality = locality.strip() | |
| query_taxa_primary = scientificName.strip() | |
| query_taxa_secondary = ' '.join([genus, specificEpithet]).strip() | |
| query_taxa_tertiary = ' '.join([order, family, genus, specificEpithet]).strip() | |
| # query_taxa = "Tracaulon sagittatum Tracaulon sagittatum" | |
| # query_geo = "Indiana Porter Co." | |
| # query_locality = "Mical Springs edge" | |
| if query_geo: | |
| try: | |
| self.gather_geo(query_geo) | |
| except: | |
| pass | |
| if query_locality: | |
| try: | |
| self.gather_geo(query_locality,'locality') | |
| except: | |
| pass | |
| queries_taxa = [query_taxa_primary, query_taxa_secondary, query_taxa_tertiary] | |
| for q in queries_taxa: | |
| if q: | |
| try: | |
| self.gather_taxonomy(q) | |
| break | |
| except: | |
| pass | |
| # print(self.info_packet) | |
| # return self.info_packet | |
| # self.gather_geo(query_geo) | |
| try: | |
| with open(self.json_file_path_wiki, 'w', encoding='utf-8') as file: | |
| json.dump(self.info_packet, file, indent=4) | |
| except: | |
| sanitized_data = self.sanitize(self.info_packet) | |
| with open(self.json_file_path_wiki, 'w', encoding='utf-8') as file: | |
| json.dump(sanitized_data, file, indent=4) | |
| def sanitize(self, data): | |
| if isinstance(data, dict): | |
| return {self.sanitize(key): self.sanitize(value) for key, value in data.items()} | |
| elif isinstance(data, list): | |
| return [self.sanitize(element) for element in data] | |
| elif isinstance(data, str): | |
| return data.encode('utf-8', 'ignore').decode('utf-8') | |
| else: | |
| return data | |
| def remove_html_and_wiki_markup(self, text): | |
| # Remove HTML tags | |
| clean_text = re.sub(r'<.*?>', '', text) | |
| # Remove Wiki links but keep the text inside | |
| # For example, '[[Greg Abbott]]' becomes 'Greg Abbott' | |
| clean_text = re.sub(r'\[\[(?:[^\]|]*\|)?([^\]|]*)\]\]', r'\1', clean_text) | |
| # Remove Wiki template markup, e.g., '{{nowrap|text}}' becomes 'text' | |
| clean_text = re.sub(r'\{\{(?:[^\}|]*\|)?([^\}|]*)\}\}', r'\1', clean_text) | |
| return clean_text | |
| def validate_wikipedia(tool_wikipedia, json_file_path_wiki, output): | |
| Wiki = WikipediaLinks(tool_wikipedia, json_file_path_wiki) | |
| Wiki.gather_wikipedia_results(output) | |
| if __name__ == '__main__': | |
| test_output = { | |
| "filename": "MICH_7375774_Polygonaceae_Persicaria_", | |
| "catalogNumber": "1439649", | |
| "order": "", | |
| "family": "", | |
| "scientificName": "Tracaulon sagittatum", | |
| "scientificNameAuthorship": "", | |
| "genus": "Tracaulon", | |
| "subgenus": "", | |
| "specificEpithet": "sagittatum", | |
| "infraspecificEpithet": "", | |
| "identifiedBy": "", | |
| "recordedBy": "Marcus W. Lyon, Jr.", | |
| "recordNumber": "TX 11", | |
| "verbatimEventDate": "1927", | |
| "eventDate": "1927-00-00", | |
| "habitat": "wet subdunal woods", | |
| "occurrenceRemarks": "Flowers pink", | |
| "country": "Indiana", | |
| "stateProvince": "Porter Co.", | |
| "county": "", | |
| "municipality": "", | |
| "locality": "Mical Springs edge", | |
| "degreeOfEstablishment": "", | |
| "decimalLatitude": "", | |
| "decimalLongitude": "", | |
| "verbatimCoordinates": "", | |
| "minimumElevationInMeters": "", | |
| "maximumElevationInMeters": "" | |
| } | |
| do_print_profiler = True | |
| if do_print_profiler: | |
| profiler = cProfile.Profile() | |
| profiler.enable() | |
| Wiki = WikipediaLinks('D:/D_Desktop/usda_pdf/test.json') | |
| info_packet= Wiki.gather_wikipedia_results(test_output) | |
| if do_print_profiler: | |
| profiler.disable() | |
| stats = pstats.Stats(profiler).sort_stats('cumulative') | |
| stats.print_stats(50) | |