# ────────────────────────────── memo/core.py ────────────────────────────── """ Core Memory System Main memory system that provides both legacy and enhanced functionality. """ import os import asyncio from typing import List, Dict, Any, Optional, Tuple from utils.logger import get_logger from utils.rag.embeddings import EmbeddingClient from memo.legacy import MemoryLRU from memo.persistent import PersistentMemory logger = get_logger("CORE_MEMORY", __name__) class MemorySystem: """ Main memory system that provides both legacy and enhanced functionality. Automatically uses enhanced features when MongoDB is available. """ def __init__(self, mongo_uri: str = None, db_name: str = "studybuddy"): self.mongo_uri = mongo_uri or os.getenv("MONGO_URI", "mongodb://localhost:27017") self.db_name = db_name # Initialize legacy memory system (always available) self.legacy_memory = MemoryLRU() # Initialize enhanced memory system if MongoDB is available self.enhanced_available = False self.enhanced_memory = None self.embedder = None self.session_memory = None try: self.embedder = EmbeddingClient() self.enhanced_memory = PersistentMemory(self.mongo_uri, self.db_name, self.embedder) from memo.session import get_session_memory_manager self.session_memory = get_session_memory_manager(self.mongo_uri, self.db_name) self.enhanced_available = True logger.info("[CORE_MEMORY] Enhanced memory system and session memory initialized") except Exception as e: logger.warning(f"[CORE_MEMORY] Enhanced memory system unavailable: {e}") self.enhanced_available = False logger.info(f"[CORE_MEMORY] Initialized with enhanced_available={self.enhanced_available}") # ────────────────────────────── Core Memory Operations ────────────────────────────── def add(self, user_id: str, qa_summary: str): """Add a Q&A summary to memory (backward compatibility)""" try: # Add to legacy memory self.legacy_memory.add(user_id, qa_summary) # Also add to enhanced memory if available if self.enhanced_available: # Extract question and answer from summary lines = qa_summary.split('\n') question = "" answer = "" for line in lines: if line.strip().lower().startswith('q:'): question = line.strip()[2:].strip() elif line.strip().lower().startswith('a:'): answer = line.strip()[2:].strip() if question and answer: asyncio.create_task(self._add_enhanced_memory(user_id, question, answer)) logger.debug(f"[CORE_MEMORY] Added memory for user {user_id}") except Exception as e: logger.error(f"[CORE_MEMORY] Failed to add memory: {e}") def recent(self, user_id: str, n: int = 3) -> List[str]: """Get recent memories (backward compatibility)""" return self.legacy_memory.recent(user_id, n) def rest(self, user_id: str, skip_n: int = 3) -> List[str]: """Get remaining memories excluding recent ones (backward compatibility)""" return self.legacy_memory.rest(user_id, skip_n) def all(self, user_id: str) -> List[str]: """Get all memories for a user (backward compatibility)""" return self.legacy_memory.all(user_id) def clear(self, user_id: str) -> None: """Clear all memories for a user (backward compatibility)""" self.legacy_memory.clear(user_id) # Also clear enhanced memory if available if self.enhanced_available: try: self.enhanced_memory.clear_user_memories(user_id) logger.info(f"[CORE_MEMORY] Cleared enhanced memory for user {user_id}") except Exception as e: logger.warning(f"[CORE_MEMORY] Failed to clear enhanced memory: {e}") def clear_all_memory(self, user_id: str, project_id: str = None) -> Dict[str, Any]: """Clear all memory components for a user including sessions and planning state""" try: results = { "legacy_cleared": False, "enhanced_cleared": False, "session_cleared": False, "planning_reset": False, "errors": [] } # Clear legacy memory try: self.legacy_memory.clear(user_id) results["legacy_cleared"] = True logger.info(f"[CORE_MEMORY] Cleared legacy memory for user {user_id}") except Exception as e: error_msg = f"Failed to clear legacy memory: {e}" results["errors"].append(error_msg) logger.warning(f"[CORE_MEMORY] {error_msg}") # Clear enhanced memory if available if self.enhanced_available: try: if project_id: # Clear project-specific memories self.enhanced_memory.memories.delete_many({ "user_id": user_id, "project_id": project_id }) else: # Clear all user memories self.enhanced_memory.clear_user_memories(user_id) results["enhanced_cleared"] = True logger.info(f"[CORE_MEMORY] Cleared enhanced memory for user {user_id}, project {project_id}") except Exception as e: error_msg = f"Failed to clear enhanced memory: {e}" results["errors"].append(error_msg) logger.warning(f"[CORE_MEMORY] {error_msg}") # Clear conversation sessions try: from memo.sessions import get_session_manager session_manager = get_session_manager() session_manager.clear_session(user_id) results["session_cleared"] = True logger.info(f"[CORE_MEMORY] Cleared session for user {user_id}") except Exception as e: error_msg = f"Failed to clear session: {e}" results["errors"].append(error_msg) logger.warning(f"[CORE_MEMORY] {error_msg}") # Reset planning state (if needed) try: # Planning state is stateless, but we can log the reset results["planning_reset"] = True logger.info(f"[CORE_MEMORY] Reset planning state for user {user_id}") except Exception as e: error_msg = f"Failed to reset planning state: {e}" results["errors"].append(error_msg) logger.warning(f"[CORE_MEMORY] {error_msg}") # Clear any cached contexts try: from memo.retrieval import get_retrieval_manager retrieval_manager = get_retrieval_manager(self, self.embedder) # Reset any cached state if needed logger.info(f"[CORE_MEMORY] Cleared cached contexts for user {user_id}") except Exception as e: error_msg = f"Failed to clear cached contexts: {e}" results["errors"].append(error_msg) logger.warning(f"[CORE_MEMORY] {error_msg}") success = all([results["legacy_cleared"], results["session_cleared"]]) if self.enhanced_available: success = success and results["enhanced_cleared"] if success: logger.info(f"[CORE_MEMORY] Successfully cleared all memory for user {user_id}, project {project_id}") else: logger.warning(f"[CORE_MEMORY] Partial memory clear for user {user_id}, project {project_id}: {results}") return results except Exception as e: logger.error(f"[CORE_MEMORY] Failed to clear all memory for user {user_id}: {e}") return { "legacy_cleared": False, "enhanced_cleared": False, "session_cleared": False, "planning_reset": False, "errors": [f"Critical error: {e}"] } def is_enhanced_available(self) -> bool: """Check if enhanced memory features are available""" return self.enhanced_available # ────────────────────────────── Enhanced Features ────────────────────────────── async def add_conversation_memory(self, user_id: str, question: str, answer: str, project_id: Optional[str] = None, context: Dict[str, Any] = None) -> str: """Add conversation memory with enhanced context""" if not self.enhanced_available: logger.warning("[CORE_MEMORY] Enhanced features not available") return "" try: memory_id = self.enhanced_memory.add_memory( user_id=user_id, content=f"Q: {question}\nA: {answer}", memory_type="conversation", project_id=project_id, importance="medium", tags=["conversation", "qa"], metadata=context or {} ) return memory_id except Exception as e: logger.error(f"[CORE_MEMORY] Failed to add conversation memory: {e}") return "" async def get_conversation_context(self, user_id: str, question: str, project_id: Optional[str] = None) -> Tuple[str, str]: """Get conversation context for chat continuity with enhanced memory ability""" try: if self.enhanced_available: # Use enhanced context retrieval with better integration recent_context, semantic_context = await self._get_enhanced_context(user_id, question) return recent_context, semantic_context else: # Use legacy context with enhanced semantic selection from memo.context import get_legacy_context return await get_legacy_context(user_id, question, self, self.embedder, 3) except Exception as e: logger.error(f"[CORE_MEMORY] Failed to get conversation context: {e}") return "", "" async def get_enhanced_context(self, user_id: str, question: str, project_id: Optional[str] = None) -> Tuple[str, str, Dict[str, Any]]: """Get enhanced context using the new memory planning system""" try: return await self.get_smart_context(user_id, question, None, project_id, "chat") except Exception as e: logger.error(f"[CORE_MEMORY] Failed to get enhanced context: {e}") return "", "", {"error": str(e)} async def search_memories(self, user_id: str, query: str, project_id: Optional[str] = None, limit: int = 10) -> List[Tuple[str, float]]: """Search memories using semantic similarity""" if not self.enhanced_available: return [] try: results = self.enhanced_memory.search_memories( user_id=user_id, query=query, project_id=project_id, limit=limit ) return [(m["content"], score) for m, score in results] except Exception as e: logger.error(f"[CORE_MEMORY] Failed to search memories: {e}") return [] def get_memory_stats(self, user_id: str) -> Dict[str, Any]: """Get memory statistics for a user""" if self.enhanced_available: return self.enhanced_memory.get_memory_stats(user_id) else: # Legacy memory stats all_memories = self.legacy_memory.all(user_id) return { "total_memories": len(all_memories), "system_type": "legacy", "enhanced_available": False } async def consolidate_memories(self, user_id: str, nvidia_rotator=None) -> Dict[str, Any]: """Consolidate and prune memories to prevent information overload""" try: from memo.conversation import get_conversation_manager conversation_manager = get_conversation_manager(self, self.embedder) return await conversation_manager.consolidate_memories(user_id, nvidia_rotator) except Exception as e: logger.error(f"[CORE_MEMORY] Memory consolidation failed: {e}") return {"consolidated": 0, "pruned": 0, "error": str(e)} async def handle_context_switch(self, user_id: str, new_question: str, nvidia_rotator=None) -> Dict[str, Any]: """Handle context switching when user changes topics""" try: from memo.conversation import get_conversation_manager conversation_manager = get_conversation_manager(self, self.embedder) return await conversation_manager.handle_context_switch(user_id, new_question, nvidia_rotator) except Exception as e: logger.error(f"[CORE_MEMORY] Context switch handling failed: {e}") return {"is_context_switch": False, "confidence": 0.0, "error": str(e)} def get_conversation_insights(self, user_id: str) -> Dict[str, Any]: """Get insights about the user's conversation patterns""" try: from memo.conversation import get_conversation_manager conversation_manager = get_conversation_manager(self, self.embedder) return conversation_manager.get_conversation_insights(user_id) except Exception as e: logger.error(f"[CORE_MEMORY] Failed to get conversation insights: {e}") return {"error": str(e)} async def get_smart_context(self, user_id: str, question: str, nvidia_rotator=None, project_id: Optional[str] = None, conversation_mode: str = "chat") -> Tuple[str, str, Dict[str, Any]]: """Get smart context using advanced memory planning strategy""" try: from memo.planning import get_memory_planner memory_planner = get_memory_planner(self, self.embedder) # Plan memory strategy based on user intent execution_plan = await memory_planner.plan_memory_strategy( user_id, question, nvidia_rotator, project_id ) # Execute the planned strategy recent_context, semantic_context, metadata = await memory_planner.execute_memory_plan( user_id, question, execution_plan, nvidia_rotator, project_id ) # Add planning metadata to response metadata.update({ "memory_planning": True, "intent": execution_plan["intent"].value, "strategy": execution_plan["strategy"].value, "enhancement_focus": execution_plan["enhancement_focus"], "qa_focus": execution_plan["qa_focus"] }) return recent_context, semantic_context, metadata except Exception as e: logger.error(f"[CORE_MEMORY] Failed to get smart context: {e}") # Fallback to original conversation manager try: from memo.conversation import get_conversation_manager conversation_manager = get_conversation_manager(self, self.embedder) return await conversation_manager.get_smart_context( user_id, question, nvidia_rotator, project_id, conversation_mode ) except Exception as fallback_error: logger.error(f"[CORE_MEMORY] Fallback also failed: {fallback_error}") return "", "", {"error": str(e)} async def get_enhancement_context(self, user_id: str, question: str, nvidia_rotator=None, project_id: Optional[str] = None) -> Tuple[str, str, Dict[str, Any]]: """Get context specifically optimized for enhancement requests""" try: from memo.planning import get_memory_planner, QueryIntent, MemoryStrategy memory_planner = get_memory_planner(self, self.embedder) # Force enhancement intent and focused Q&A strategy execution_plan = { "intent": QueryIntent.ENHANCEMENT, "strategy": MemoryStrategy.FOCUSED_QA, "retrieval_params": { "recent_limit": 5, "semantic_limit": 10, "qa_focus": True, "enhancement_mode": True, "priority_types": ["conversation", "qa"], "similarity_threshold": 0.05, # Very low threshold for maximum recall "use_ai_selection": True }, "conversation_context": {}, "enhancement_focus": True, "qa_focus": True } # Execute the enhancement-focused strategy recent_context, semantic_context, metadata = await memory_planner.execute_memory_plan( user_id, question, execution_plan, nvidia_rotator, project_id ) # Add enhancement-specific metadata metadata.update({ "enhancement_mode": True, "qa_focused": True, "memory_planning": True, "intent": "enhancement", "strategy": "focused_qa" }) logger.info(f"[CORE_MEMORY] Enhancement context retrieved: {len(recent_context)} recent, {len(semantic_context)} semantic") return recent_context, semantic_context, metadata except Exception as e: logger.error(f"[CORE_MEMORY] Failed to get enhancement context: {e}") return "", "", {"error": str(e)} # ────────────────────────────── Session-Specific Memory Operations ────────────────────────────── def add_session_memory(self, user_id: str, project_id: str, session_id: str, question: str, answer: str, context: Dict[str, Any] = None) -> str: """Add memory to a specific session""" try: if not self.session_memory: logger.warning("[CORE_MEMORY] Session memory not available") return "" # Create session-specific memory content content = f"Q: {question}\nA: {answer}" memory_id = self.session_memory.add_session_memory( user_id=user_id, project_id=project_id, session_id=session_id, content=content, memory_type="conversation", importance="medium", tags=["conversation", "qa"], metadata=context or {} ) logger.debug(f"[CORE_MEMORY] Added session memory for session {session_id}") return memory_id except Exception as e: logger.error(f"[CORE_MEMORY] Failed to add session memory: {e}") return "" def get_session_memory_context(self, user_id: str, project_id: str, session_id: str, question: str, limit: int = 5) -> Tuple[str, str]: """Get memory context for a specific session""" try: if not self.session_memory: return "", "" # Get recent session memories recent_memories = self.session_memory.get_session_memories( user_id, project_id, session_id, memory_type="conversation", limit=limit ) recent_context = "" if recent_memories: recent_context = "\n\n".join([mem["content"] for mem in recent_memories]) # Get semantic context from session memories semantic_memories = self.session_memory.search_session_memories( user_id, project_id, session_id, question, self.embedder, limit=3 ) semantic_context = "" if semantic_memories: semantic_context = "\n\n".join([mem["content"] for mem, score in semantic_memories]) return recent_context, semantic_context except Exception as e: logger.error(f"[CORE_MEMORY] Failed to get session memory context: {e}") return "", "" def clear_session_memories(self, user_id: str, project_id: str, session_id: str): """Clear all memories for a specific session""" try: if not self.session_memory: return 0 deleted_count = self.session_memory.clear_session_memories(user_id, project_id, session_id) logger.info(f"[CORE_MEMORY] Cleared {deleted_count} session memories for session {session_id}") return deleted_count except Exception as e: logger.error(f"[CORE_MEMORY] Failed to clear session memories: {e}") return 0 # ────────────────────────────── Private Helper Methods ────────────────────────────── async def _add_enhanced_memory(self, user_id: str, question: str, answer: str): """Add memory to enhanced system""" try: self.enhanced_memory.add_memory( user_id=user_id, content=f"Q: {question}\nA: {answer}", memory_type="conversation", importance="medium", tags=["conversation", "qa"] ) except Exception as e: logger.warning(f"[CORE_MEMORY] Failed to add enhanced memory: {e}") async def _get_enhanced_context(self, user_id: str, question: str) -> Tuple[str, str]: """Get context from enhanced memory system with semantic selection""" try: # Get recent conversation memories recent_memories = self.enhanced_memory.get_memories( user_id=user_id, memory_type="conversation", limit=5 ) recent_context = "" if recent_memories and self.embedder: # Use semantic similarity to select most relevant recent memories try: from memo.context import semantic_context recent_summaries = [m["summary"] for m in recent_memories] recent_context = await semantic_context(question, recent_summaries, self.embedder, 3) except Exception as e: logger.warning(f"[CORE_MEMORY] Semantic recent context failed, using all: {e}") recent_context = "\n\n".join([m["summary"] for m in recent_memories]) elif recent_memories: recent_context = "\n\n".join([m["summary"] for m in recent_memories]) # Get semantic context from other memory types semantic_memories = self.enhanced_memory.get_memories( user_id=user_id, limit=10 ) semantic_context = "" if semantic_memories and self.embedder: try: from memo.context import semantic_context other_memories = [m for m in semantic_memories if m.get("memory_type") != "conversation"] if other_memories: other_summaries = [m["summary"] for m in other_memories] semantic_context = await semantic_context(question, other_summaries, self.embedder, 5) except Exception as e: logger.warning(f"[CORE_MEMORY] Semantic context failed, using all: {e}") other_memories = [m for m in semantic_memories if m.get("memory_type") != "conversation"] if other_memories: semantic_context = "\n\n".join([m["summary"] for m in other_memories]) elif semantic_memories: other_memories = [m for m in semantic_memories if m.get("memory_type") != "conversation"] if other_memories: semantic_context = "\n\n".join([m["summary"] for m in other_memories]) return recent_context, semantic_context except Exception as e: logger.error(f"[CORE_MEMORY] Failed to get enhanced context: {e}") return "", "" # ────────────────────────────── Global Instance ────────────────────────────── _memory_system: Optional[MemorySystem] = None def get_memory_system(mongo_uri: str = None, db_name: str = None) -> MemorySystem: """Get the global memory system instance""" global _memory_system if _memory_system is None: if mongo_uri is None: mongo_uri = os.getenv("MONGO_URI", "mongodb://localhost:27017") if db_name is None: db_name = os.getenv("MONGO_DB", "studybuddy") _memory_system = MemorySystem(mongo_uri, db_name) logger.info("[CORE_MEMORY] Global memory system initialized") return _memory_system # def reset_memory_system(): # """Reset the global memory system (for testing)""" # global _memory_system # _memory_system = None