EdSummariser / memo /core.py
LiamKhoaLe's picture
Add session-spec within project
d063204
# ────────────────────────────── memo/core.py ──────────────────────────────
"""
Core Memory System
Main memory system that provides both legacy and enhanced functionality.
"""
import os
import asyncio
from typing import List, Dict, Any, Optional, Tuple
from utils.logger import get_logger
from utils.rag.embeddings import EmbeddingClient
from memo.legacy import MemoryLRU
from memo.persistent import PersistentMemory
logger = get_logger("CORE_MEMORY", __name__)
class MemorySystem:
"""
Main memory system that provides both legacy and enhanced functionality.
Automatically uses enhanced features when MongoDB is available.
"""
def __init__(self, mongo_uri: str = None, db_name: str = "studybuddy"):
self.mongo_uri = mongo_uri or os.getenv("MONGO_URI", "mongodb://localhost:27017")
self.db_name = db_name
# Initialize legacy memory system (always available)
self.legacy_memory = MemoryLRU()
# Initialize enhanced memory system if MongoDB is available
self.enhanced_available = False
self.enhanced_memory = None
self.embedder = None
self.session_memory = None
try:
self.embedder = EmbeddingClient()
self.enhanced_memory = PersistentMemory(self.mongo_uri, self.db_name, self.embedder)
from memo.session import get_session_memory_manager
self.session_memory = get_session_memory_manager(self.mongo_uri, self.db_name)
self.enhanced_available = True
logger.info("[CORE_MEMORY] Enhanced memory system and session memory initialized")
except Exception as e:
logger.warning(f"[CORE_MEMORY] Enhanced memory system unavailable: {e}")
self.enhanced_available = False
logger.info(f"[CORE_MEMORY] Initialized with enhanced_available={self.enhanced_available}")
# ────────────────────────────── Core Memory Operations ──────────────────────────────
def add(self, user_id: str, qa_summary: str):
"""Add a Q&A summary to memory (backward compatibility)"""
try:
# Add to legacy memory
self.legacy_memory.add(user_id, qa_summary)
# Also add to enhanced memory if available
if self.enhanced_available:
# Extract question and answer from summary
lines = qa_summary.split('\n')
question = ""
answer = ""
for line in lines:
if line.strip().lower().startswith('q:'):
question = line.strip()[2:].strip()
elif line.strip().lower().startswith('a:'):
answer = line.strip()[2:].strip()
if question and answer:
asyncio.create_task(self._add_enhanced_memory(user_id, question, answer))
logger.debug(f"[CORE_MEMORY] Added memory for user {user_id}")
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to add memory: {e}")
def recent(self, user_id: str, n: int = 3) -> List[str]:
"""Get recent memories (backward compatibility)"""
return self.legacy_memory.recent(user_id, n)
def rest(self, user_id: str, skip_n: int = 3) -> List[str]:
"""Get remaining memories excluding recent ones (backward compatibility)"""
return self.legacy_memory.rest(user_id, skip_n)
def all(self, user_id: str) -> List[str]:
"""Get all memories for a user (backward compatibility)"""
return self.legacy_memory.all(user_id)
def clear(self, user_id: str) -> None:
"""Clear all memories for a user (backward compatibility)"""
self.legacy_memory.clear(user_id)
# Also clear enhanced memory if available
if self.enhanced_available:
try:
self.enhanced_memory.clear_user_memories(user_id)
logger.info(f"[CORE_MEMORY] Cleared enhanced memory for user {user_id}")
except Exception as e:
logger.warning(f"[CORE_MEMORY] Failed to clear enhanced memory: {e}")
def clear_all_memory(self, user_id: str, project_id: str = None) -> Dict[str, Any]:
"""Clear all memory components for a user including sessions and planning state"""
try:
results = {
"legacy_cleared": False,
"enhanced_cleared": False,
"session_cleared": False,
"planning_reset": False,
"errors": []
}
# Clear legacy memory
try:
self.legacy_memory.clear(user_id)
results["legacy_cleared"] = True
logger.info(f"[CORE_MEMORY] Cleared legacy memory for user {user_id}")
except Exception as e:
error_msg = f"Failed to clear legacy memory: {e}"
results["errors"].append(error_msg)
logger.warning(f"[CORE_MEMORY] {error_msg}")
# Clear enhanced memory if available
if self.enhanced_available:
try:
if project_id:
# Clear project-specific memories
self.enhanced_memory.memories.delete_many({
"user_id": user_id,
"project_id": project_id
})
else:
# Clear all user memories
self.enhanced_memory.clear_user_memories(user_id)
results["enhanced_cleared"] = True
logger.info(f"[CORE_MEMORY] Cleared enhanced memory for user {user_id}, project {project_id}")
except Exception as e:
error_msg = f"Failed to clear enhanced memory: {e}"
results["errors"].append(error_msg)
logger.warning(f"[CORE_MEMORY] {error_msg}")
# Clear conversation sessions
try:
from memo.sessions import get_session_manager
session_manager = get_session_manager()
session_manager.clear_session(user_id)
results["session_cleared"] = True
logger.info(f"[CORE_MEMORY] Cleared session for user {user_id}")
except Exception as e:
error_msg = f"Failed to clear session: {e}"
results["errors"].append(error_msg)
logger.warning(f"[CORE_MEMORY] {error_msg}")
# Reset planning state (if needed)
try:
# Planning state is stateless, but we can log the reset
results["planning_reset"] = True
logger.info(f"[CORE_MEMORY] Reset planning state for user {user_id}")
except Exception as e:
error_msg = f"Failed to reset planning state: {e}"
results["errors"].append(error_msg)
logger.warning(f"[CORE_MEMORY] {error_msg}")
# Clear any cached contexts
try:
from memo.retrieval import get_retrieval_manager
retrieval_manager = get_retrieval_manager(self, self.embedder)
# Reset any cached state if needed
logger.info(f"[CORE_MEMORY] Cleared cached contexts for user {user_id}")
except Exception as e:
error_msg = f"Failed to clear cached contexts: {e}"
results["errors"].append(error_msg)
logger.warning(f"[CORE_MEMORY] {error_msg}")
success = all([results["legacy_cleared"], results["session_cleared"]])
if self.enhanced_available:
success = success and results["enhanced_cleared"]
if success:
logger.info(f"[CORE_MEMORY] Successfully cleared all memory for user {user_id}, project {project_id}")
else:
logger.warning(f"[CORE_MEMORY] Partial memory clear for user {user_id}, project {project_id}: {results}")
return results
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to clear all memory for user {user_id}: {e}")
return {
"legacy_cleared": False,
"enhanced_cleared": False,
"session_cleared": False,
"planning_reset": False,
"errors": [f"Critical error: {e}"]
}
def is_enhanced_available(self) -> bool:
"""Check if enhanced memory features are available"""
return self.enhanced_available
# ────────────────────────────── Enhanced Features ──────────────────────────────
async def add_conversation_memory(self, user_id: str, question: str, answer: str,
project_id: Optional[str] = None,
context: Dict[str, Any] = None) -> str:
"""Add conversation memory with enhanced context"""
if not self.enhanced_available:
logger.warning("[CORE_MEMORY] Enhanced features not available")
return ""
try:
memory_id = self.enhanced_memory.add_memory(
user_id=user_id,
content=f"Q: {question}\nA: {answer}",
memory_type="conversation",
project_id=project_id,
importance="medium",
tags=["conversation", "qa"],
metadata=context or {}
)
return memory_id
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to add conversation memory: {e}")
return ""
async def get_conversation_context(self, user_id: str, question: str,
project_id: Optional[str] = None) -> Tuple[str, str]:
"""Get conversation context for chat continuity with enhanced memory ability"""
try:
if self.enhanced_available:
# Use enhanced context retrieval with better integration
recent_context, semantic_context = await self._get_enhanced_context(user_id, question)
return recent_context, semantic_context
else:
# Use legacy context with enhanced semantic selection
from memo.context import get_legacy_context
return await get_legacy_context(user_id, question, self, self.embedder, 3)
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to get conversation context: {e}")
return "", ""
async def get_enhanced_context(self, user_id: str, question: str,
project_id: Optional[str] = None) -> Tuple[str, str, Dict[str, Any]]:
"""Get enhanced context using the new memory planning system"""
try:
return await self.get_smart_context(user_id, question, None, project_id, "chat")
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to get enhanced context: {e}")
return "", "", {"error": str(e)}
async def search_memories(self, user_id: str, query: str,
project_id: Optional[str] = None,
limit: int = 10) -> List[Tuple[str, float]]:
"""Search memories using semantic similarity"""
if not self.enhanced_available:
return []
try:
results = self.enhanced_memory.search_memories(
user_id=user_id,
query=query,
project_id=project_id,
limit=limit
)
return [(m["content"], score) for m, score in results]
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to search memories: {e}")
return []
def get_memory_stats(self, user_id: str) -> Dict[str, Any]:
"""Get memory statistics for a user"""
if self.enhanced_available:
return self.enhanced_memory.get_memory_stats(user_id)
else:
# Legacy memory stats
all_memories = self.legacy_memory.all(user_id)
return {
"total_memories": len(all_memories),
"system_type": "legacy",
"enhanced_available": False
}
async def consolidate_memories(self, user_id: str, nvidia_rotator=None) -> Dict[str, Any]:
"""Consolidate and prune memories to prevent information overload"""
try:
from memo.conversation import get_conversation_manager
conversation_manager = get_conversation_manager(self, self.embedder)
return await conversation_manager.consolidate_memories(user_id, nvidia_rotator)
except Exception as e:
logger.error(f"[CORE_MEMORY] Memory consolidation failed: {e}")
return {"consolidated": 0, "pruned": 0, "error": str(e)}
async def handle_context_switch(self, user_id: str, new_question: str,
nvidia_rotator=None) -> Dict[str, Any]:
"""Handle context switching when user changes topics"""
try:
from memo.conversation import get_conversation_manager
conversation_manager = get_conversation_manager(self, self.embedder)
return await conversation_manager.handle_context_switch(user_id, new_question, nvidia_rotator)
except Exception as e:
logger.error(f"[CORE_MEMORY] Context switch handling failed: {e}")
return {"is_context_switch": False, "confidence": 0.0, "error": str(e)}
def get_conversation_insights(self, user_id: str) -> Dict[str, Any]:
"""Get insights about the user's conversation patterns"""
try:
from memo.conversation import get_conversation_manager
conversation_manager = get_conversation_manager(self, self.embedder)
return conversation_manager.get_conversation_insights(user_id)
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to get conversation insights: {e}")
return {"error": str(e)}
async def get_smart_context(self, user_id: str, question: str,
nvidia_rotator=None, project_id: Optional[str] = None,
conversation_mode: str = "chat") -> Tuple[str, str, Dict[str, Any]]:
"""Get smart context using advanced memory planning strategy"""
try:
from memo.planning import get_memory_planner
memory_planner = get_memory_planner(self, self.embedder)
# Plan memory strategy based on user intent
execution_plan = await memory_planner.plan_memory_strategy(
user_id, question, nvidia_rotator, project_id
)
# Execute the planned strategy
recent_context, semantic_context, metadata = await memory_planner.execute_memory_plan(
user_id, question, execution_plan, nvidia_rotator, project_id
)
# Add planning metadata to response
metadata.update({
"memory_planning": True,
"intent": execution_plan["intent"].value,
"strategy": execution_plan["strategy"].value,
"enhancement_focus": execution_plan["enhancement_focus"],
"qa_focus": execution_plan["qa_focus"]
})
return recent_context, semantic_context, metadata
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to get smart context: {e}")
# Fallback to original conversation manager
try:
from memo.conversation import get_conversation_manager
conversation_manager = get_conversation_manager(self, self.embedder)
return await conversation_manager.get_smart_context(
user_id, question, nvidia_rotator, project_id, conversation_mode
)
except Exception as fallback_error:
logger.error(f"[CORE_MEMORY] Fallback also failed: {fallback_error}")
return "", "", {"error": str(e)}
async def get_enhancement_context(self, user_id: str, question: str,
nvidia_rotator=None, project_id: Optional[str] = None) -> Tuple[str, str, Dict[str, Any]]:
"""Get context specifically optimized for enhancement requests"""
try:
from memo.planning import get_memory_planner, QueryIntent, MemoryStrategy
memory_planner = get_memory_planner(self, self.embedder)
# Force enhancement intent and focused Q&A strategy
execution_plan = {
"intent": QueryIntent.ENHANCEMENT,
"strategy": MemoryStrategy.FOCUSED_QA,
"retrieval_params": {
"recent_limit": 5,
"semantic_limit": 10,
"qa_focus": True,
"enhancement_mode": True,
"priority_types": ["conversation", "qa"],
"similarity_threshold": 0.05, # Very low threshold for maximum recall
"use_ai_selection": True
},
"conversation_context": {},
"enhancement_focus": True,
"qa_focus": True
}
# Execute the enhancement-focused strategy
recent_context, semantic_context, metadata = await memory_planner.execute_memory_plan(
user_id, question, execution_plan, nvidia_rotator, project_id
)
# Add enhancement-specific metadata
metadata.update({
"enhancement_mode": True,
"qa_focused": True,
"memory_planning": True,
"intent": "enhancement",
"strategy": "focused_qa"
})
logger.info(f"[CORE_MEMORY] Enhancement context retrieved: {len(recent_context)} recent, {len(semantic_context)} semantic")
return recent_context, semantic_context, metadata
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to get enhancement context: {e}")
return "", "", {"error": str(e)}
# ────────────────────────────── Session-Specific Memory Operations ──────────────────────────────
def add_session_memory(self, user_id: str, project_id: str, session_id: str,
question: str, answer: str, context: Dict[str, Any] = None) -> str:
"""Add memory to a specific session"""
try:
if not self.session_memory:
logger.warning("[CORE_MEMORY] Session memory not available")
return ""
# Create session-specific memory content
content = f"Q: {question}\nA: {answer}"
memory_id = self.session_memory.add_session_memory(
user_id=user_id,
project_id=project_id,
session_id=session_id,
content=content,
memory_type="conversation",
importance="medium",
tags=["conversation", "qa"],
metadata=context or {}
)
logger.debug(f"[CORE_MEMORY] Added session memory for session {session_id}")
return memory_id
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to add session memory: {e}")
return ""
def get_session_memory_context(self, user_id: str, project_id: str, session_id: str,
question: str, limit: int = 5) -> Tuple[str, str]:
"""Get memory context for a specific session"""
try:
if not self.session_memory:
return "", ""
# Get recent session memories
recent_memories = self.session_memory.get_session_memories(
user_id, project_id, session_id, memory_type="conversation", limit=limit
)
recent_context = ""
if recent_memories:
recent_context = "\n\n".join([mem["content"] for mem in recent_memories])
# Get semantic context from session memories
semantic_memories = self.session_memory.search_session_memories(
user_id, project_id, session_id, question, self.embedder, limit=3
)
semantic_context = ""
if semantic_memories:
semantic_context = "\n\n".join([mem["content"] for mem, score in semantic_memories])
return recent_context, semantic_context
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to get session memory context: {e}")
return "", ""
def clear_session_memories(self, user_id: str, project_id: str, session_id: str):
"""Clear all memories for a specific session"""
try:
if not self.session_memory:
return 0
deleted_count = self.session_memory.clear_session_memories(user_id, project_id, session_id)
logger.info(f"[CORE_MEMORY] Cleared {deleted_count} session memories for session {session_id}")
return deleted_count
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to clear session memories: {e}")
return 0
# ────────────────────────────── Private Helper Methods ──────────────────────────────
async def _add_enhanced_memory(self, user_id: str, question: str, answer: str):
"""Add memory to enhanced system"""
try:
self.enhanced_memory.add_memory(
user_id=user_id,
content=f"Q: {question}\nA: {answer}",
memory_type="conversation",
importance="medium",
tags=["conversation", "qa"]
)
except Exception as e:
logger.warning(f"[CORE_MEMORY] Failed to add enhanced memory: {e}")
async def _get_enhanced_context(self, user_id: str, question: str) -> Tuple[str, str]:
"""Get context from enhanced memory system with semantic selection"""
try:
# Get recent conversation memories
recent_memories = self.enhanced_memory.get_memories(
user_id=user_id,
memory_type="conversation",
limit=5
)
recent_context = ""
if recent_memories and self.embedder:
# Use semantic similarity to select most relevant recent memories
try:
from memo.context import semantic_context
recent_summaries = [m["summary"] for m in recent_memories]
recent_context = await semantic_context(question, recent_summaries, self.embedder, 3)
except Exception as e:
logger.warning(f"[CORE_MEMORY] Semantic recent context failed, using all: {e}")
recent_context = "\n\n".join([m["summary"] for m in recent_memories])
elif recent_memories:
recent_context = "\n\n".join([m["summary"] for m in recent_memories])
# Get semantic context from other memory types
semantic_memories = self.enhanced_memory.get_memories(
user_id=user_id,
limit=10
)
semantic_context = ""
if semantic_memories and self.embedder:
try:
from memo.context import semantic_context
other_memories = [m for m in semantic_memories if m.get("memory_type") != "conversation"]
if other_memories:
other_summaries = [m["summary"] for m in other_memories]
semantic_context = await semantic_context(question, other_summaries, self.embedder, 5)
except Exception as e:
logger.warning(f"[CORE_MEMORY] Semantic context failed, using all: {e}")
other_memories = [m for m in semantic_memories if m.get("memory_type") != "conversation"]
if other_memories:
semantic_context = "\n\n".join([m["summary"] for m in other_memories])
elif semantic_memories:
other_memories = [m for m in semantic_memories if m.get("memory_type") != "conversation"]
if other_memories:
semantic_context = "\n\n".join([m["summary"] for m in other_memories])
return recent_context, semantic_context
except Exception as e:
logger.error(f"[CORE_MEMORY] Failed to get enhanced context: {e}")
return "", ""
# ────────────────────────────── Global Instance ──────────────────────────────
_memory_system: Optional[MemorySystem] = None
def get_memory_system(mongo_uri: str = None, db_name: str = None) -> MemorySystem:
"""Get the global memory system instance"""
global _memory_system
if _memory_system is None:
if mongo_uri is None:
mongo_uri = os.getenv("MONGO_URI", "mongodb://localhost:27017")
if db_name is None:
db_name = os.getenv("MONGO_DB", "studybuddy")
_memory_system = MemorySystem(mongo_uri, db_name)
logger.info("[CORE_MEMORY] Global memory system initialized")
return _memory_system
# def reset_memory_system():
# """Reset the global memory system (for testing)"""
# global _memory_system
# _memory_system = None