# This file will contain the logic for uploading data to BigQuery. from google.cloud import bigquery from google.cloud.exceptions import NotFound import uuid from datetime import datetime # --- Configuration: Set your project, dataset, and table details here --- PROJECT_ID = "gem-creation" DATASET_ID = "aura_mind_glow_data" TABLE_ID = "farm_analysis" def get_bigquery_client(): """Returns an authenticated BigQuery client.""" try: client = bigquery.Client(project=PROJECT_ID) print("✅ Successfully authenticated with BigQuery.") return client except Exception as e: print(f"❌ Error authenticating with BigQuery: {e}") return None def create_dataset_if_not_exists(client): """Creates the BigQuery dataset if it doesn't exist.""" dataset_id = f"{PROJECT_ID}.{DATASET_ID}" try: client.get_dataset(dataset_id) # Make an API request. print(f"â„šī¸ Dataset {dataset_id} already exists.") except NotFound: print(f"🟡 Dataset {dataset_id} not found. Creating dataset...") dataset = bigquery.Dataset(dataset_id) dataset.location = "US" # You can change the location if needed dataset = client.create_dataset(dataset, timeout=30) # Make an API request. print(f"✅ Created dataset {client.project}.{dataset.dataset_id}") def create_table_if_not_exists(client): """Creates the BigQuery table if it doesn't exist.""" table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}" try: client.get_table(table_id) # Make an API request. print(f"â„šī¸ Table {table_id} already exists.") except NotFound: print(f"🟡 Table {table_id} not found. Creating table...") schema = [ bigquery.SchemaField("analysis_id", "STRING", mode="REQUIRED"), bigquery.SchemaField("timestamp", "TIMESTAMP", mode="REQUIRED"), bigquery.SchemaField("farmer_id", "STRING", mode="NULLABLE"), bigquery.SchemaField("gps_latitude", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("gps_longitude", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("crop_type", "STRING", mode="NULLABLE"), bigquery.SchemaField("crop_variety", "STRING", mode="NULLABLE"), bigquery.SchemaField("ai_diagnosis", "STRING", mode="NULLABLE"), bigquery.SchemaField("confidence_score", "FLOAT", mode="NULLABLE"), bigquery.SchemaField("recommended_action", "STRING", mode="NULLABLE"), bigquery.SchemaField("farmer_feedback", "STRING", mode="NULLABLE"), bigquery.SchemaField("treatment_applied", "STRING", mode="NULLABLE"), bigquery.SchemaField("outcome_image_id", "STRING", mode="NULLABLE"), ] table = bigquery.Table(table_id, schema=schema) table = client.create_table(table) # Make an API request. print(f"✅ Created table {table.project}.{table.dataset_id}.{table.table_id}") def upload_diagnosis_to_bigquery(diagnosis_data: dict): """Uploads a single diagnosis record (from a dictionary) to BigQuery.""" client = get_bigquery_client() if client is None: print("❌ BigQuery client not available. Cannot upload diagnosis.") return "BigQuery client not available." # Ensure dataset and table are ready create_dataset_if_not_exists(client) create_table_if_not_exists(client) table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}" # Add required fields if not present if "analysis_id" not in diagnosis_data: diagnosis_data["analysis_id"] = str(uuid.uuid4()) if "timestamp" not in diagnosis_data: diagnosis_data["timestamp"] = datetime.now().isoformat() rows_to_insert = [diagnosis_data] errors = client.insert_rows_json(table_id, rows_to_insert) if not errors: print(f"✅ Diagnosis record {diagnosis_data.get('analysis_id')} uploaded successfully.") return "Diagnosis uploaded successfully." else: print(f"❌ Encountered errors while inserting diagnosis record: {errors}") return f"Error uploading diagnosis: {errors}" # ============================================================================== # ✨ NEW FUNCTION TO UPLOAD A CSV FILE ✨ # ============================================================================== def upload_csv_to_bigquery(csv_file_path: str): """ Uploads the contents of a CSV file to the specified BigQuery table. Args: csv_file_path (str): The local path to the CSV file. """ client = get_bigquery_client() if client is None: print("❌ BigQuery client not available. Cannot upload CSV.") return # Ensure the destination dataset and table exist before uploading create_dataset_if_not_exists(client) create_table_if_not_exists(client) table_id = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}" # Configure the load job job_config = bigquery.LoadJobConfig( source_format=bigquery.SourceFormat.CSV, skip_leading_rows=1, # Skip the header row autodetect=True, # Let BigQuery automatically detect the schema from the table write_disposition=bigquery.WriteDisposition.WRITE_APPEND, # Append data to the table ) print(f"🚀 Starting CSV upload from '{csv_file_path}' to table '{table_id}'...") try: with open(csv_file_path, "rb") as source_file: load_job = client.load_table_from_file(source_file, table_id, job_config=job_config) load_job.result() # Wait for the job to complete destination_table = client.get_table(table_id) print(f"✅ Job finished. Loaded {destination_table.num_rows} rows into {table_id}.") return "CSV upload successful." except Exception as e: print(f"❌ An error occurred during the CSV upload: {e}") return f"Error during CSV upload: {e}" # ============================================================================== # --- Example Usage --- # To run this file directly and test the new upload function: # 1. Save the CSV data from the previous step into a file named 'farm_analysis_data.csv'. # 2. Make sure that file is in the same directory as this script. # 3. Run 'python your_script_name.py' in your terminal. # ============================================================================== if __name__ == "__main__": # The name of the CSV file you created csv_file_to_upload = "farm_analysis_data.csv" print("--- Running BigQuery CSV Uploader Test ---") upload_csv_to_bigquery(csv_file_to_upload) print("--- Test complete ---")