""" Memory monitoring and management utilities for production deployment. """ import gc import logging import os import tracemalloc from functools import wraps from typing import Optional logger = logging.getLogger(__name__) def get_memory_usage() -> float: """ Get current memory usage in MB. Falls back to basic approach if psutil is not available. """ try: import psutil return psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 except ImportError: # Fallback: use tracemalloc if available try: current, peak = tracemalloc.get_traced_memory() return current / 1024 / 1024 except Exception: return 0.0 def log_memory_usage(context: str = "") -> float: """Log current memory usage with context and return the memory value.""" memory_mb = get_memory_usage() if context: logger.info(f"Memory usage ({context}): {memory_mb:.1f}MB") else: logger.info(f"Memory usage: {memory_mb:.1f}MB") return memory_mb def memory_monitor(func): """Decorator to monitor memory usage of functions.""" @wraps(func) def wrapper(*args, **kwargs): memory_before = get_memory_usage() result = func(*args, **kwargs) memory_after = get_memory_usage() memory_diff = memory_after - memory_before logger.info( f"Memory change in {func.__name__}: " f"{memory_before:.1f}MB -> {memory_after:.1f}MB " f"(+{memory_diff:.1f}MB)" ) return result return wrapper def force_garbage_collection(): """Force garbage collection and log memory freed.""" memory_before = get_memory_usage() # Force garbage collection collected = gc.collect() memory_after = get_memory_usage() memory_freed = memory_before - memory_after logger.info( f"Garbage collection: freed {memory_freed:.1f}MB, " f"collected {collected} objects" ) def check_memory_threshold(threshold_mb: float = 400) -> bool: """ Check if memory usage exceeds threshold. Args: threshold_mb: Memory threshold in MB (default 400MB for 512MB limit) Returns: True if memory usage is above threshold """ current_memory = get_memory_usage() if current_memory > threshold_mb: logger.warning( f"Memory usage {current_memory:.1f}MB exceeds threshold {threshold_mb}MB" ) return True return False def clean_memory(context: str = ""): """ Clean memory and force garbage collection with context logging. Args: context: Description of when/why cleanup is happening """ memory_before = get_memory_usage() # Force garbage collection collected = gc.collect() memory_after = get_memory_usage() memory_freed = memory_before - memory_after if context: logger.info( f"Memory cleanup ({context}): " f"{memory_before:.1f}MB -> {memory_after:.1f}MB " f"(freed {memory_freed:.1f}MB, collected {collected} objects)" ) else: logger.info( f"Memory cleanup: freed {memory_freed:.1f}MB, collected {collected} objects" ) def optimize_memory(): """ Perform memory optimization operations. Called when memory usage gets high. """ logger.info("Performing memory optimization...") # Force garbage collection force_garbage_collection() # Clear any model caches if they exist try: from src.embedding.embedding_service import EmbeddingService if hasattr(EmbeddingService, "_model_cache"): cache_size = len(EmbeddingService._model_cache) if cache_size > 1: # Keep at least one model cached # Clear all but one cached model (no usage tracking) keys = list(EmbeddingService._model_cache.keys()) for key in keys[:-1]: del EmbeddingService._model_cache[key] logger.info(f"Cleared {cache_size - 1} cached models, kept 1") except Exception as e: logger.debug(f"Could not clear model cache: {e}") class MemoryManager: """Context manager for memory-intensive operations.""" def __init__(self, operation_name: str = "operation", threshold_mb: float = 400): self.operation_name = operation_name self.threshold_mb = threshold_mb self.start_memory: Optional[float] = None def __enter__(self): self.start_memory = get_memory_usage() logger.info( f"Starting {self.operation_name} (Memory: {self.start_memory:.1f}MB)" ) # Check if we're already near the threshold if self.start_memory > self.threshold_mb: logger.warning("Starting operation with high memory usage") optimize_memory() return self def __exit__(self, exc_type, exc_val, exc_tb): end_memory = get_memory_usage() memory_diff = end_memory - (self.start_memory or 0) logger.info( f"Completed {self.operation_name} " f"(Memory: {self.start_memory:.1f}MB -> {end_memory:.1f}MB, " f"Change: {memory_diff:+.1f}MB)" ) # If memory usage increased significantly, trigger cleanup if memory_diff > 50: # More than 50MB increase logger.info("Large memory increase detected, running cleanup") force_garbage_collection()