Spaces:
Sleeping
Sleeping
| # ────────────────────────────── memo/conversation.py ────────────────────────────── | |
| """ | |
| Conversation Management Orchestrator | |
| Main conversation manager that coordinates session management, | |
| context retrieval, and memory consolidation for natural conversation flow. | |
| """ | |
| from typing import List, Dict, Any, Tuple, Optional | |
| import os | |
| from utils.logger import get_logger | |
| from utils.rag.embeddings import EmbeddingClient | |
| logger = get_logger("CONVERSATION_MANAGER", __name__) | |
| class ConversationManager: | |
| """ | |
| Main conversation manager that orchestrates all conversation-related functionality. | |
| """ | |
| def __init__(self, memory_system, embedder: EmbeddingClient): | |
| self.memory_system = memory_system | |
| self.embedder = embedder | |
| # Initialize sub-managers | |
| from memo.sessions import get_session_manager | |
| from memo.retrieval import get_retrieval_manager | |
| from memo.consolidation import get_consolidation_manager | |
| self.session_manager = get_session_manager() | |
| self.retrieval_manager = get_retrieval_manager(memory_system, embedder) | |
| self.consolidation_manager = get_consolidation_manager(memory_system, embedder) | |
| async def get_smart_context(self, user_id: str, question: str, | |
| nvidia_rotator=None, project_id: Optional[str] = None, | |
| conversation_mode: str = "chat") -> Tuple[str, str, Dict[str, Any]]: | |
| """ | |
| Get intelligent context for conversation with enhanced memory planning. | |
| Args: | |
| user_id: User identifier | |
| question: Current question/instruction | |
| nvidia_rotator: NVIDIA API rotator for AI enhancement | |
| project_id: Project context | |
| conversation_mode: "chat" or "report" | |
| Returns: | |
| Tuple of (recent_context, semantic_context, metadata) | |
| """ | |
| try: | |
| return await self.retrieval_manager.get_smart_context( | |
| user_id, question, nvidia_rotator, project_id, conversation_mode | |
| ) | |
| except Exception as e: | |
| logger.error(f"[CONVERSATION_MANAGER] Smart context failed: {e}") | |
| return "", "", {"error": str(e)} | |
| async def get_enhancement_context(self, user_id: str, question: str, | |
| nvidia_rotator=None, project_id: Optional[str] = None) -> Tuple[str, str, Dict[str, Any]]: | |
| """Get context specifically optimized for enhancement requests""" | |
| try: | |
| return await self.retrieval_manager.get_enhancement_context( | |
| user_id, question, nvidia_rotator, project_id | |
| ) | |
| except Exception as e: | |
| logger.error(f"[CONVERSATION_MANAGER] Enhancement context failed: {e}") | |
| return "", "", {"error": str(e)} | |
| async def consolidate_memories(self, user_id: str, nvidia_rotator=None) -> Dict[str, Any]: | |
| """Consolidate and prune memories to prevent information overload""" | |
| try: | |
| return await self.consolidation_manager.consolidate_memories(user_id, nvidia_rotator) | |
| except Exception as e: | |
| logger.error(f"[CONVERSATION_MANAGER] Memory consolidation failed: {e}") | |
| return {"consolidated": 0, "pruned": 0, "error": str(e)} | |
| async def handle_context_switch(self, user_id: str, new_question: str, | |
| nvidia_rotator=None) -> Dict[str, Any]: | |
| """Handle context switching when user changes topics""" | |
| try: | |
| return await self.session_manager.detect_context_switch(user_id, new_question, nvidia_rotator) | |
| except Exception as e: | |
| logger.error(f"[CONVERSATION_MANAGER] Context switch handling failed: {e}") | |
| return {"is_context_switch": False, "confidence": 0.0, "error": str(e)} | |
| def get_conversation_insights(self, user_id: str) -> Dict[str, Any]: | |
| """Get insights about the user's conversation patterns""" | |
| try: | |
| return self.session_manager.get_conversation_insights(user_id) | |
| except Exception as e: | |
| logger.error(f"[CONVERSATION_MANAGER] Failed to get conversation insights: {e}") | |
| return {"error": str(e)} | |
| def clear_session(self, user_id: str): | |
| """Clear conversation session for user""" | |
| try: | |
| self.session_manager.clear_session(user_id) | |
| logger.info(f"[CONVERSATION_MANAGER] Cleared session for user {user_id}") | |
| except Exception as e: | |
| logger.error(f"[CONVERSATION_MANAGER] Failed to clear session: {e}") | |
| def reset_all(self, user_id: str, project_id: str = None) -> Dict[str, Any]: | |
| """Reset all conversation-related components for a user""" | |
| try: | |
| results = { | |
| "session_cleared": False, | |
| "memory_cleared": False, | |
| "errors": [] | |
| } | |
| # Clear session | |
| try: | |
| self.session_manager.clear_session(user_id) | |
| results["session_cleared"] = True | |
| logger.info(f"[CONVERSATION_MANAGER] Cleared session for user {user_id}") | |
| except Exception as e: | |
| error_msg = f"Failed to clear session: {e}" | |
| results["errors"].append(error_msg) | |
| logger.warning(f"[CONVERSATION_MANAGER] {error_msg}") | |
| # Clear memory using core memory system | |
| try: | |
| clear_results = self.memory_system.clear_all_memory(user_id, project_id) | |
| results["memory_cleared"] = clear_results.get("legacy_cleared", False) and clear_results.get("session_cleared", False) | |
| if clear_results.get("errors"): | |
| results["errors"].extend(clear_results["errors"]) | |
| logger.info(f"[CONVERSATION_MANAGER] Cleared memory for user {user_id}, project {project_id}") | |
| except Exception as e: | |
| error_msg = f"Failed to clear memory: {e}" | |
| results["errors"].append(error_msg) | |
| logger.warning(f"[CONVERSATION_MANAGER] {error_msg}") | |
| return results | |
| except Exception as e: | |
| logger.error(f"[CONVERSATION_MANAGER] Failed to reset all for user {user_id}: {e}") | |
| return { | |
| "session_cleared": False, | |
| "memory_cleared": False, | |
| "errors": [f"Critical error: {e}"] | |
| } | |
| # ────────────────────────────── Global Instance ────────────────────────────── | |
| _conversation_manager: Optional[ConversationManager] = None | |
| def get_conversation_manager(memory_system=None, embedder: EmbeddingClient = None) -> ConversationManager: | |
| """Get the global conversation manager instance""" | |
| global _conversation_manager | |
| if _conversation_manager is None: | |
| if not memory_system: | |
| from memo.core import get_memory_system | |
| memory_system = get_memory_system() | |
| if not embedder: | |
| from utils.rag.embeddings import EmbeddingClient | |
| embedder = EmbeddingClient() | |
| _conversation_manager = ConversationManager(memory_system, embedder) | |
| logger.info("[CONVERSATION_MANAGER] Global conversation manager initialized") | |
| return _conversation_manager | |
| # def reset_conversation_manager(): | |
| # """Reset the global conversation manager (for testing)""" | |
| # global _conversation_manager | |
| # _conversation_manager = None |