sethmcknight commited on
Commit
3916e13
·
1 Parent(s): 6d37c4a

fix: Add detailed logging and improved locking for ingestion startup

Browse files
Files changed (1) hide show
  1. src/app_factory.py +130 -43
src/app_factory.py CHANGED
@@ -27,30 +27,58 @@ class InitializationTimeoutError(Exception):
27
  def ensure_embeddings_on_startup():
28
  """
29
  Ensure embeddings exist and have the correct dimension on app startup.
30
- This is critical for Render deployments where the vector store is ephemeral.
31
  Uses a file-based lock to prevent race conditions between workers.
32
  """
 
 
 
 
33
  lock_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data", "locks")
34
  if not os.path.exists(lock_dir):
35
- os.makedirs(lock_dir)
 
 
 
 
 
 
36
  lock_file = os.path.join(lock_dir, "ingestion.lock")
37
- lock_timeout = 180 # 3 minutes
38
-
39
- start_time = time.time()
40
- while os.path.exists(lock_file):
41
- if time.time() - start_time > lock_timeout:
42
- logging.error(f"Lock file {lock_file} has been present for over {lock_timeout} seconds. Aborting wait.")
43
- # In a real-world scenario, you might want to raise an exception
44
- # or attempt to delete a stale lock file. For now, we just stop waiting.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45
  return
46
- logging.info(f"Another process is handling ingestion. Waiting for lock file {lock_file} to be released...")
47
- time.sleep(5)
48
 
49
  try:
50
- # Acquire lock
51
- with open(lock_file, "w") as f:
52
- f.write(str(os.getpid()))
53
- logging.info(f"Acquired ingestion lock: {lock_file}")
54
 
55
  from src.config import (
56
  COLLECTION_NAME,
@@ -65,54 +93,112 @@ def ensure_embeddings_on_startup():
65
  from src.ingestion.ingestion_pipeline import IngestionPipeline
66
  from src.vector_store.vector_db import VectorDatabase
67
 
68
- logging.info("Checking vector store on startup...")
 
 
 
 
69
 
70
  # Initialize vector database to check its state
71
- vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME)
 
 
 
 
 
72
 
73
  # Check if embeddings exist and have correct dimension
74
- if not vector_db.has_valid_embeddings(EMBEDDING_DIMENSION):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  logging.warning(
76
- f"Vector store is empty or has wrong dimension. "
77
- f"Expected: {EMBEDDING_DIMENSION}, "
78
- f"Current: {vector_db.get_embedding_dimension()}"
79
  )
80
- logging.info(f"Running ingestion pipeline with model: {EMBEDDING_MODEL_NAME}")
 
 
 
 
 
 
 
 
81
 
82
  # Run ingestion pipeline to rebuild embeddings
83
- ingestion_pipeline = IngestionPipeline(
84
- chunk_size=DEFAULT_CHUNK_SIZE,
85
- overlap=DEFAULT_OVERLAP,
86
- seed=RANDOM_SEED,
87
- store_embeddings=True,
88
- )
 
 
 
 
 
89
 
90
  # Process the corpus directory
91
- results = ingestion_pipeline.process_directory(CORPUS_DIRECTORY)
 
 
 
 
 
 
92
 
93
  if not results or len(results) == 0:
94
  logging.error(
95
- "Ingestion failed or processed 0 chunks. "
96
- "Please check the corpus directory and "
97
- "ingestion pipeline for errors."
98
  )
99
  else:
100
- logging.info(f"Ingestion completed: {len(results)} chunks processed")
 
 
 
 
 
 
 
 
 
 
 
101
  else:
102
  logging.info(
103
- f"Vector store is valid with {vector_db.get_count()} embeddings "
104
- f"of dimension {vector_db.get_embedding_dimension()}"
105
  )
106
 
107
  except Exception as e:
108
- logging.error(f"Failed to ensure embeddings on startup: {e}")
109
  # Don't crash the app, but log the error
110
  # The app will still start but searches may fail
111
  finally:
112
  # Release lock
113
- if os.path.exists(lock_file):
114
- os.remove(lock_file)
115
- logging.info(f"Released ingestion lock: {lock_file}")
 
 
 
116
 
117
 
118
  def create_app(
@@ -1119,13 +1205,14 @@ def create_app(
1119
  except Exception as e:
1120
  logging.warning(f"Failed to register document management blueprint: {e}")
1121
 
1122
- # Conditionally run ingestion pipeline on startup based on environment variable
 
1123
  if os.getenv("REBUILD_EMBEDDINGS_ON_START", "false").lower() == "true":
1124
  with app.app_context():
1125
- logging.info("REBUILD_EMBEDDINGS_ON_START is true, ensuring embeddings exist.")
1126
  ensure_embeddings_on_startup()
1127
  else:
1128
- logging.info("REBUILD_EMBEDDINGS_ON_START is not set to true, skipping initial embedding.")
1129
 
1130
  # Add Render-specific memory middleware if running on Render and
1131
  # memory monitoring is enabled
 
27
  def ensure_embeddings_on_startup():
28
  """
29
  Ensure embeddings exist and have the correct dimension on app startup.
30
+ This is critical for Hugging Face deployments where the vector store needs to be built on startup.
31
  Uses a file-based lock to prevent race conditions between workers.
32
  """
33
+ import fcntl
34
+
35
+ logging.info(f"[PID {os.getpid()}] Starting ensure_embeddings_on_startup function")
36
+
37
  lock_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "data", "locks")
38
  if not os.path.exists(lock_dir):
39
+ try:
40
+ os.makedirs(lock_dir)
41
+ logging.info(f"[PID {os.getpid()}] Created lock directory: {lock_dir}")
42
+ except Exception as e:
43
+ logging.error(f"[PID {os.getpid()}] Failed to create lock directory: {e}")
44
+ return
45
+
46
  lock_file = os.path.join(lock_dir, "ingestion.lock")
47
+ lock_timeout = 300 # 5 minutes for Hugging Face with more resources
48
+
49
+ logging.info(f"[PID {os.getpid()}] Attempting to acquire lock: {lock_file}")
50
+
51
+ # Use proper file locking with fcntl for better reliability
52
+ try:
53
+ lock_fd = open(lock_file, "w")
54
+ fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
55
+ logging.info(f"[PID {os.getpid()}] Successfully acquired exclusive lock")
56
+
57
+ # Write PID to lock file for debugging
58
+ lock_fd.write(f"{os.getpid()}\n")
59
+ lock_fd.flush()
60
+
61
+ except (IOError, OSError):
62
+ logging.info(f"[PID {os.getpid()}] Lock is held by another process, waiting...")
63
+ lock_fd.close()
64
+
65
+ # Wait for lock to be released
66
+ start_time = time.time()
67
+ while time.time() - start_time < lock_timeout:
68
+ try:
69
+ lock_fd = open(lock_file, "w")
70
+ fcntl.flock(lock_fd.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
71
+ logging.info(f"[PID {os.getpid()}] Lock acquired after waiting {time.time() - start_time:.1f}s")
72
+ break
73
+ except (IOError, OSError):
74
+ lock_fd.close()
75
+ time.sleep(2)
76
+ else:
77
+ logging.error(f"[PID {os.getpid()}] Timeout waiting for lock after {lock_timeout}s")
78
  return
 
 
79
 
80
  try:
81
+ logging.info(f"[PID {os.getpid()}] Lock acquired, starting ingestion process")
 
 
 
82
 
83
  from src.config import (
84
  COLLECTION_NAME,
 
93
  from src.ingestion.ingestion_pipeline import IngestionPipeline
94
  from src.vector_store.vector_db import VectorDatabase
95
 
96
+ logging.info(f"[PID {os.getpid()}] Imported modules successfully")
97
+ logging.info(f"[PID {os.getpid()}] Checking vector store at: {VECTOR_DB_PERSIST_PATH}")
98
+ logging.info(f"[PID {os.getpid()}] Collection name: {COLLECTION_NAME}")
99
+ logging.info(f"[PID {os.getpid()}] Corpus directory: {CORPUS_DIRECTORY}")
100
+ logging.info(f"[PID {os.getpid()}] Expected embedding dimension: {EMBEDDING_DIMENSION}")
101
 
102
  # Initialize vector database to check its state
103
+ try:
104
+ vector_db = VectorDatabase(VECTOR_DB_PERSIST_PATH, COLLECTION_NAME)
105
+ logging.info(f"[PID {os.getpid()}] Vector database initialized successfully")
106
+ except Exception as e:
107
+ logging.error(f"[PID {os.getpid()}] Failed to initialize vector database: {e}")
108
+ raise
109
 
110
  # Check if embeddings exist and have correct dimension
111
+ try:
112
+ current_count = vector_db.get_count()
113
+ current_dimension = vector_db.get_embedding_dimension()
114
+ logging.info(
115
+ f"[PID {os.getpid()}] Current database state: {current_count} embeddings, dimension {current_dimension}"
116
+ )
117
+
118
+ has_valid = vector_db.has_valid_embeddings(EMBEDDING_DIMENSION)
119
+ logging.info(f"[PID {os.getpid()}] Has valid embeddings: {has_valid}")
120
+
121
+ except Exception as e:
122
+ logging.error(f"[PID {os.getpid()}] Failed to check vector database state: {e}")
123
+ # Assume we need to rebuild
124
+ has_valid = False
125
+ current_count = 0
126
+ current_dimension = 0
127
+
128
+ if not has_valid:
129
  logging.warning(
130
+ f"[PID {os.getpid()}] Vector store is empty or has wrong dimension. "
131
+ f"Expected: {EMBEDDING_DIMENSION}, Current: {current_dimension}, "
132
+ f"Count: {current_count}"
133
  )
134
+ logging.info(f"[PID {os.getpid()}] Starting ingestion pipeline with model: {EMBEDDING_MODEL_NAME}")
135
+
136
+ # Check if corpus directory exists
137
+ if not os.path.exists(CORPUS_DIRECTORY):
138
+ logging.error(f"[PID {os.getpid()}] Corpus directory does not exist: {CORPUS_DIRECTORY}")
139
+ return
140
+
141
+ corpus_files = os.listdir(CORPUS_DIRECTORY)
142
+ logging.info(f"[PID {os.getpid()}] Found {len(corpus_files)} files in corpus directory")
143
 
144
  # Run ingestion pipeline to rebuild embeddings
145
+ try:
146
+ ingestion_pipeline = IngestionPipeline(
147
+ chunk_size=DEFAULT_CHUNK_SIZE,
148
+ overlap=DEFAULT_OVERLAP,
149
+ seed=RANDOM_SEED,
150
+ store_embeddings=True,
151
+ )
152
+ logging.info(f"[PID {os.getpid()}] Ingestion pipeline created successfully")
153
+ except Exception as e:
154
+ logging.error(f"[PID {os.getpid()}] Failed to create ingestion pipeline: {e}")
155
+ raise
156
 
157
  # Process the corpus directory
158
+ try:
159
+ logging.info(f"[PID {os.getpid()}] Starting to process corpus directory...")
160
+ results = ingestion_pipeline.process_directory(CORPUS_DIRECTORY)
161
+ logging.info(f"[PID {os.getpid()}] Process directory completed, got results: {type(results)}")
162
+ except Exception as e:
163
+ logging.error(f"[PID {os.getpid()}] Failed during directory processing: {e}", exc_info=True)
164
+ raise
165
 
166
  if not results or len(results) == 0:
167
  logging.error(
168
+ f"[PID {os.getpid()}] Ingestion failed or processed 0 chunks. "
169
+ "Please check the corpus directory and ingestion pipeline for errors."
 
170
  )
171
  else:
172
+ logging.info(f"[PID {os.getpid()}] Ingestion completed successfully: {len(results)} chunks processed")
173
+
174
+ # Verify the embeddings were actually stored
175
+ try:
176
+ final_count = vector_db.get_count()
177
+ final_dimension = vector_db.get_embedding_dimension()
178
+ logging.info(
179
+ f"[PID {os.getpid()}] Final database state: {final_count} embeddings, "
180
+ f"dimension {final_dimension}"
181
+ )
182
+ except Exception as e:
183
+ logging.error(f"[PID {os.getpid()}] Failed to verify final database state: {e}")
184
  else:
185
  logging.info(
186
+ f"[PID {os.getpid()}] Vector store is valid with {current_count} embeddings "
187
+ f"of dimension {current_dimension}"
188
  )
189
 
190
  except Exception as e:
191
+ logging.error(f"[PID {os.getpid()}] Failed to ensure embeddings on startup: {e}", exc_info=True)
192
  # Don't crash the app, but log the error
193
  # The app will still start but searches may fail
194
  finally:
195
  # Release lock
196
+ try:
197
+ fcntl.flock(lock_fd.fileno(), fcntl.LOCK_UN)
198
+ lock_fd.close()
199
+ logging.info(f"[PID {os.getpid()}] Released ingestion lock")
200
+ except Exception as e:
201
+ logging.error(f"[PID {os.getpid()}] Failed to release lock: {e}")
202
 
203
 
204
  def create_app(
 
1205
  except Exception as e:
1206
  logging.warning(f"Failed to register document management blueprint: {e}")
1207
 
1208
+ # Use pre-built embeddings by default for reliable deployment
1209
+ # Only rebuild embeddings if explicitly requested via environment variable
1210
  if os.getenv("REBUILD_EMBEDDINGS_ON_START", "false").lower() == "true":
1211
  with app.app_context():
1212
+ logging.info("REBUILD_EMBEDDINGS_ON_START is true, rebuilding embeddings on startup.")
1213
  ensure_embeddings_on_startup()
1214
  else:
1215
+ logging.info("Using pre-built embeddings. Set REBUILD_EMBEDDINGS_ON_START=true to rebuild.")
1216
 
1217
  # Add Render-specific memory middleware if running on Render and
1218
  # memory monitoring is enabled