xiaozhi / src /application.py
nzjsdsk's picture
Upload 169 files
27e74f3 verified
import asyncio
import json
import logging
import threading
import time
import sys
import traceback
from pathlib import Path
from src.utils.logging_config import get_logger
# 在导入 opuslib 之前处理 opus 动态库
from src.utils.system_info import setup_opus
from src.constants.constants import (
DeviceState, EventType, AudioConfig,
AbortReason, ListeningMode
)
from src.display import gui_display, cli_display
from src.utils.config_manager import ConfigManager
setup_opus()
# 配置日志
logger = get_logger(__name__)
# 现在导入 opuslib
try:
import opuslib # noqa: F401
from src.utils.tts_utility import TtsUtility
except Exception as e:
logger.critical("导入 opuslib 失败: %s", e, exc_info=True)
logger.critical("请确保 opus 动态库已正确安装或位于正确的位置")
sys.exit(1)
from src.protocols.mqtt_protocol import MqttProtocol
from src.protocols.websocket_protocol import WebsocketProtocol
class Application:
_instance = None
@classmethod
def get_instance(cls):
"""获取单例实例"""
if cls._instance is None:
logger.debug("创建Application单例实例")
cls._instance = Application()
return cls._instance
def __init__(self):
"""初始化应用程序"""
# 确保单例模式
if Application._instance is not None:
logger.error("尝试创建Application的多个实例")
raise Exception("Application是单例类,请使用get_instance()获取实例")
Application._instance = self
logger.debug("初始化Application实例")
# 获取配置管理器实例
self.config = ConfigManager.get_instance()
# 状态变量
self.device_state = DeviceState.IDLE
self.voice_detected = False
self.keep_listening = False
self.aborted = False
self.current_text = ""
self.current_emotion = "neutral"
# 音频处理相关
self.audio_codec = None # 将在 _initialize_audio 中初始化
self._tts_lock = threading.Lock()
self.is_tts_playing = False # 因为Display的播放状态只是GUI使用,不方便Music_player使用,所以加了这个标志位表示是TTS在说话
# 事件循环和线程
self.loop = asyncio.new_event_loop()
self.loop_thread = None
self.running = False
self.input_event_thread = None
self.output_event_thread = None
# 任务队列和锁
self.main_tasks = []
self.mutex = threading.Lock()
# 协议实例
self.protocol = None
# 回调函数
self.on_state_changed_callbacks = []
# 初始化事件对象
self.events = {
EventType.SCHEDULE_EVENT: threading.Event(),
EventType.AUDIO_INPUT_READY_EVENT: threading.Event(),
EventType.AUDIO_OUTPUT_READY_EVENT: threading.Event()
}
# 创建显示界面
self.display = None
# 添加唤醒词检测器
self.wake_word_detector = None
logger.debug("Application实例初始化完成")
def run(self, **kwargs):
"""启动应用程序"""
logger.info("启动应用程序,参数: %s", kwargs)
mode = kwargs.get('mode', 'gui')
protocol = kwargs.get('protocol', 'websocket')
# 启动主循环线程
logger.debug("启动主循环线程")
main_loop_thread = threading.Thread(target=self._main_loop)
main_loop_thread.daemon = True
main_loop_thread.start()
# 初始化通信协议
logger.debug("设置协议类型: %s", protocol)
self.set_protocol_type(protocol)
# 创建并启动事件循环线程
logger.debug("启动事件循环线程")
self.loop_thread = threading.Thread(target=self._run_event_loop)
self.loop_thread.daemon = True
self.loop_thread.start()
# 等待事件循环准备就绪
time.sleep(0.1)
# 初始化应用程序(移除自动连接)
logger.debug("初始化应用程序组件")
asyncio.run_coroutine_threadsafe(
self._initialize_without_connect(),
self.loop
)
# 初始化物联网设备
self._initialize_iot_devices()
logger.debug("设置显示类型: %s", mode)
self.set_display_type(mode)
# 启动GUI
logger.debug("启动显示界面")
self.display.start()
def _run_event_loop(self):
"""运行事件循环的线程函数"""
logger.debug("设置并启动事件循环")
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
def set_is_tts_playing(self, value: bool):
with self._tts_lock:
self.is_tts_playing = value
def get_is_tts_playing(self) -> bool:
with self._tts_lock:
return self.is_tts_playing
async def _initialize_without_connect(self):
"""初始化应用程序组件(不建立连接)"""
logger.info("正在初始化应用程序组件...")
# 设置设备状态为待命
logger.debug("设置初始设备状态为IDLE")
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
# 初始化音频编解码器
logger.debug("初始化音频编解码器")
self._initialize_audio()
# 初始化并启动唤醒词检测
self._initialize_wake_word_detector()
# 设置联网协议回调(MQTT AND WEBSOCKET)
logger.debug("设置协议回调函数")
self.protocol.on_network_error = self._on_network_error
self.protocol.on_incoming_audio = self._on_incoming_audio
self.protocol.on_incoming_json = self._on_incoming_json
self.protocol.on_audio_channel_opened = self._on_audio_channel_opened
self.protocol.on_audio_channel_closed = self._on_audio_channel_closed
logger.info("应用程序组件初始化完成")
def _initialize_audio(self):
"""初始化音频设备和编解码器"""
try:
logger.debug("开始初始化音频编解码器")
from src.audio_codecs.audio_codec import AudioCodec
self.audio_codec = AudioCodec()
logger.info("音频编解码器初始化成功")
# 记录音量控制状态
has_volume_control = (
hasattr(self.display, 'volume_controller') and
self.display.volume_controller
)
if has_volume_control:
logger.info("系统音量控制已启用")
else:
logger.info("系统音量控制未启用,将使用模拟音量控制")
except Exception as e:
logger.error("初始化音频设备失败: %s", e, exc_info=True)
self.alert("错误", f"初始化音频设备失败: {e}")
def set_protocol_type(self, protocol_type: str):
"""设置协议类型"""
logger.debug("设置协议类型: %s", protocol_type)
if protocol_type == 'mqtt':
self.protocol = MqttProtocol(self.loop)
logger.debug("已创建MQTT协议实例")
else: # websocket
self.protocol = WebsocketProtocol()
logger.debug("已创建WebSocket协议实例")
def set_display_type(self, mode: str):
"""初始化显示界面"""
logger.debug("设置显示界面类型: %s", mode)
# 通过适配器的概念管理不同的显示模式
if mode == 'gui':
self.display = gui_display.GuiDisplay()
logger.debug("已创建GUI显示界面")
self.display.set_callbacks(
press_callback=self.start_listening,
release_callback=self.stop_listening,
status_callback=self._get_status_text,
text_callback=self._get_current_text,
emotion_callback=self._get_current_emotion,
mode_callback=self._on_mode_changed,
auto_callback=self.toggle_chat_state,
abort_callback=lambda: self.abort_speaking(
AbortReason.WAKE_WORD_DETECTED
),
send_text_callback=self._send_text_tts
)
else:
self.display = cli_display.CliDisplay()
logger.debug("已创建CLI显示界面")
self.display.set_callbacks(
auto_callback=self.toggle_chat_state,
abort_callback=lambda: self.abort_speaking(
AbortReason.WAKE_WORD_DETECTED
),
status_callback=self._get_status_text,
text_callback=self._get_current_text,
emotion_callback=self._get_current_emotion,
send_text_callback=self._send_text_tts
)
logger.debug("显示界面回调函数设置完成")
def _main_loop(self):
"""应用程序主循环"""
logger.info("主循环已启动")
self.running = True
while self.running:
# 等待事件
for event_type, event in self.events.items():
if event.is_set():
event.clear()
logger.debug("处理事件: %s", event_type)
if event_type == EventType.AUDIO_INPUT_READY_EVENT:
self._handle_input_audio()
elif event_type == EventType.AUDIO_OUTPUT_READY_EVENT:
self._handle_output_audio()
elif event_type == EventType.SCHEDULE_EVENT:
self._process_scheduled_tasks()
# 短暂休眠以避免CPU占用过高
time.sleep(0.01)
def _process_scheduled_tasks(self):
"""处理调度任务"""
with self.mutex:
tasks = self.main_tasks.copy()
self.main_tasks.clear()
logger.debug("处理%d个调度任务", len(tasks))
for task in tasks:
try:
task()
except Exception as e:
logger.error("执行调度任务时出错: %s", e, exc_info=True)
def schedule(self, callback):
"""调度任务到主循环"""
with self.mutex:
self.main_tasks.append(callback)
self.events[EventType.SCHEDULE_EVENT].set()
def _handle_input_audio(self):
"""处理音频输入"""
if self.device_state != DeviceState.LISTENING:
return
# 读取并发送音频数据
encoded_data = self.audio_codec.read_audio()
if (encoded_data and self.protocol and
self.protocol.is_audio_channel_opened()):
asyncio.run_coroutine_threadsafe(
self.protocol.send_audio(encoded_data),
self.loop
)
async def _send_text_tts(self, text):
"""将文本转换为语音并发送"""
try:
tts_utility = TtsUtility(AudioConfig)
# 生成 Opus 音频数据包
opus_frames = await tts_utility.text_to_opus_audio(text)
# 尝试打开音频通道
if (not self.protocol.is_audio_channel_opened() and
DeviceState.IDLE == self.device_state):
# 打开音频通道
success = await self.protocol.open_audio_channel()
if not success:
logger.error("打开音频通道失败")
return
# 确认 opus 帧生成成功
if opus_frames:
logger.info(f"生成了 {len(opus_frames)} 个 Opus 音频帧")
# 设置状态为说话中
self.schedule(lambda: self.set_device_state(DeviceState.SPEAKING))
# 发送音频数据
for i, frame in enumerate(opus_frames):
await self.protocol.send_audio(frame)
await asyncio.sleep(0.06)
# 设置聊天消息
self.set_chat_message("user", text)
await self.protocol.send_text(
json.dumps({"session_id": "", "type": "listen", "state": "stop"}))
await self.protocol.send_text(b'')
return True
else:
logger.error("生成音频失败")
return False
except Exception as e:
logger.error(f"发送文本到TTS时出错: {e}")
logger.error(traceback.format_exc())
return False
def _handle_output_audio(self):
"""处理音频输出"""
if self.device_state != DeviceState.SPEAKING:
return
self.set_is_tts_playing(True) # 开始播放
self.audio_codec.play_audio()
def _on_network_error(self, error_message=None):
"""网络错误回调"""
if error_message:
logger.error(f"网络错误: {error_message}")
self.keep_listening = False
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
# 恢复唤醒词检测
if self.wake_word_detector and self.wake_word_detector.paused:
self.wake_word_detector.resume()
if self.device_state != DeviceState.CONNECTING:
logger.info("检测到连接断开")
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
# 关闭现有连接,但不关闭音频流
if self.protocol:
asyncio.run_coroutine_threadsafe(
self.protocol.close_audio_channel(),
self.loop
)
def _on_incoming_audio(self, data):
"""接收音频数据回调"""
if self.device_state == DeviceState.SPEAKING:
self.audio_codec.write_audio(data)
self.events[EventType.AUDIO_OUTPUT_READY_EVENT].set()
def _on_incoming_json(self, json_data):
"""接收JSON数据回调"""
try:
if not json_data:
return
# 解析JSON数据
if isinstance(json_data, str):
data = json.loads(json_data)
else:
data = json_data
# 处理不同类型的消息
msg_type = data.get("type", "")
if msg_type == "tts":
self._handle_tts_message(data)
elif msg_type == "stt":
self._handle_stt_message(data)
elif msg_type == "llm":
self._handle_llm_message(data)
elif msg_type == "iot":
self._handle_iot_message(data)
else:
logger.warning(f"收到未知类型的消息: {msg_type}")
except Exception as e:
logger.error(f"处理JSON消息时出错: {e}")
def _handle_tts_message(self, data):
"""处理TTS消息"""
state = data.get("state", "")
if state == "start":
self.schedule(lambda: self._handle_tts_start())
elif state == "stop":
self.schedule(lambda: self._handle_tts_stop())
elif state == "sentence_start":
text = data.get("text", "")
if text:
logger.info(f"<< {text}")
self.schedule(lambda: self.set_chat_message("assistant", text))
# 检查是否包含验证码信息
if "请登录到控制面板添加设备,输入验证码" in text:
self.schedule(lambda: self._handle_verification_code(text))
def _handle_tts_start(self):
"""处理TTS开始事件"""
self.aborted = False
self.set_is_tts_playing(True) # 开始播放
# 清空可能存在的旧音频数据
self.audio_codec.clear_audio_queue()
if self.device_state == DeviceState.IDLE or self.device_state == DeviceState.LISTENING:
self.schedule(lambda: self.set_device_state(DeviceState.SPEAKING))
# 注释掉恢复VAD检测器的代码
# if hasattr(self, 'vad_detector') and self.vad_detector:
# self.vad_detector.resume()
def _handle_tts_stop(self):
"""处理TTS停止事件"""
if self.device_state == DeviceState.SPEAKING:
# 给音频播放一个缓冲时间,确保所有音频都播放完毕
def delayed_state_change():
# 等待音频队列清空
# 增加等待重试次数,确保音频可以完全播放完毕
max_wait_attempts = 30 # 增加等待尝试次数
wait_interval = 0.1 # 每次等待的时间间隔
attempts = 0
# 等待直到队列为空或超过最大尝试次数
while (not self.audio_codec.audio_decode_queue.empty() and
attempts < max_wait_attempts):
time.sleep(wait_interval)
attempts += 1
# 确保所有数据都被播放出来
# 再额外等待一点时间确保最后的数据被处理
if self.get_is_tts_playing():
time.sleep(0.5)
# 设置TTS播放状态为False
self.set_is_tts_playing(False)
# 状态转换
if self.keep_listening:
asyncio.run_coroutine_threadsafe(
self.protocol.send_start_listening(ListeningMode.AUTO_STOP),
self.loop
)
self.schedule(lambda: self.set_device_state(DeviceState.LISTENING))
else:
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
# 安排延迟执行
# threading.Thread(target=delayed_state_change, daemon=True).start()
self.schedule(delayed_state_change)
def _handle_stt_message(self, data):
"""处理STT消息"""
text = data.get("text", "")
if text:
logger.info(f">> {text}")
self.schedule(lambda: self.set_chat_message("user", text))
def _handle_llm_message(self, data):
"""处理LLM消息"""
emotion = data.get("emotion", "")
if emotion:
self.schedule(lambda: self.set_emotion(emotion))
async def _on_audio_channel_opened(self):
"""音频通道打开回调"""
logger.info("音频通道已打开")
self.schedule(lambda: self._start_audio_streams())
# 发送物联网设备描述符
from src.iot.thing_manager import ThingManager
thing_manager = ThingManager.get_instance()
asyncio.run_coroutine_threadsafe(
self.protocol.send_iot_descriptors(thing_manager.get_descriptors_json()),
self.loop
)
self._update_iot_states(False)
def _start_audio_streams(self):
"""启动音频流"""
try:
# 不再关闭和重新打开流,只确保它们处于活跃状态
if self.audio_codec.input_stream and not self.audio_codec.input_stream.is_active():
try:
self.audio_codec.input_stream.start_stream()
except Exception as e:
logger.warning(f"启动输入流时出错: {e}")
# 只有在出错时才重新初始化
self.audio_codec._reinitialize_input_stream()
if self.audio_codec.output_stream and not self.audio_codec.output_stream.is_active():
try:
self.audio_codec.output_stream.start_stream()
except Exception as e:
logger.warning(f"启动输出流时出错: {e}")
# 只有在出错时才重新初始化
self.audio_codec._reinitialize_output_stream()
# 设置事件触发器
if self.input_event_thread is None or not self.input_event_thread.is_alive():
self.input_event_thread = threading.Thread(
target=self._audio_input_event_trigger, daemon=True)
self.input_event_thread.start()
logger.info("已启动输入事件触发线程")
# 检查输出事件线程
if self.output_event_thread is None or not self.output_event_thread.is_alive():
self.output_event_thread = threading.Thread(
target=self._audio_output_event_trigger, daemon=True)
self.output_event_thread.start()
logger.info("已启动输出事件触发线程")
logger.info("音频流已启动")
except Exception as e:
logger.error(f"启动音频流失败: {e}")
def _audio_input_event_trigger(self):
"""音频输入事件触发器"""
while self.running:
try:
# 只有在主动监听状态下才触发输入事件
if self.device_state == DeviceState.LISTENING and self.audio_codec.input_stream:
self.events[EventType.AUDIO_INPUT_READY_EVENT].set()
except OSError as e:
logger.error(f"音频输入流错误: {e}")
# 不要退出循环,继续尝试
time.sleep(0.5)
except Exception as e:
logger.error(f"音频输入事件触发器错误: {e}")
time.sleep(0.5)
# 确保触发频率足够高,即使帧长度较大
# 使用20ms作为最大触发间隔,确保即使帧长度为60ms也能有足够的采样率
sleep_time = min(20, AudioConfig.FRAME_DURATION) / 1000
time.sleep(sleep_time) # 按帧时长触发,但确保最小触发频率
def _audio_output_event_trigger(self):
"""音频输出事件触发器"""
while self.running:
try:
# 确保输出流是活跃的
if (self.device_state == DeviceState.SPEAKING and
self.audio_codec and
self.audio_codec.output_stream):
# 如果输出流不活跃,尝试重新激活
if not self.audio_codec.output_stream.is_active():
try:
self.audio_codec.output_stream.start_stream()
except Exception as e:
logger.warning(f"启动输出流失败,尝试重新初始化: {e}")
self.audio_codec._reinitialize_output_stream()
# 当队列中有数据时才触发事件
if not self.audio_codec.audio_decode_queue.empty():
self.events[EventType.AUDIO_OUTPUT_READY_EVENT].set()
except Exception as e:
logger.error(f"音频输出事件触发器错误: {e}")
time.sleep(0.02) # 稍微延长检查间隔
async def _on_audio_channel_closed(self):
"""音频通道关闭回调"""
logger.info("音频通道已关闭")
# 设置为空闲状态但不关闭音频流
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
self.keep_listening = False
# 确保唤醒词检测正常工作
if self.wake_word_detector:
if not self.wake_word_detector.is_running():
logger.info("在空闲状态下启动唤醒词检测")
# 直接使用AudioCodec实例,而不是尝试获取共享流
if hasattr(self, 'audio_codec') and self.audio_codec:
self.wake_word_detector.start(self.audio_codec)
else:
self.wake_word_detector.start()
elif self.wake_word_detector.paused:
logger.info("在空闲状态下恢复唤醒词检测")
self.wake_word_detector.resume()
def set_device_state(self, state):
"""设置设备状态"""
if self.device_state == state:
return
self.device_state = state
# 根据状态执行相应操作
if state == DeviceState.IDLE:
self.display.update_status("待命")
# self.display.update_emotion("😶")
self.set_emotion("neutral")
# 恢复唤醒词检测(添加安全检查)
if self.wake_word_detector and hasattr(self.wake_word_detector, 'paused') and self.wake_word_detector.paused:
self.wake_word_detector.resume()
logger.info("唤醒词检测已恢复")
# 恢复音频输入流
if self.audio_codec and self.audio_codec.is_input_paused():
self.audio_codec.resume_input()
elif state == DeviceState.CONNECTING:
self.display.update_status("连接中...")
elif state == DeviceState.LISTENING:
self.display.update_status("聆听中...")
self.set_emotion("neutral")
self._update_iot_states(True)
# 暂停唤醒词检测(添加安全检查)
if self.wake_word_detector and hasattr(self.wake_word_detector, 'is_running') and self.wake_word_detector.is_running():
self.wake_word_detector.pause()
logger.info("唤醒词检测已暂停")
# 确保音频输入流活跃
if self.audio_codec:
if self.audio_codec.is_input_paused():
self.audio_codec.resume_input()
elif state == DeviceState.SPEAKING:
self.display.update_status("说话中...")
if self.wake_word_detector and hasattr(self.wake_word_detector, 'paused') and self.wake_word_detector.paused:
self.wake_word_detector.resume()
# 暂停唤醒词检测(添加安全检查)
# if self.wake_word_detector and hasattr(self.wake_word_detector, 'is_running') and self.wake_word_detector.is_running():
# self.wake_word_detector.pause()
# logger.info("唤醒词检测已暂停")
# 暂停音频输入流以避免自我监听
# if self.audio_codec and not self.audio_codec.is_input_paused():
# self.audio_codec.pause_input()
# 通知状态变化
for callback in self.on_state_changed_callbacks:
try:
callback(state)
except Exception as e:
logger.error(f"执行状态变化回调时出错: {e}")
def _get_status_text(self):
"""获取当前状态文本"""
states = {
DeviceState.IDLE: "待命",
DeviceState.CONNECTING: "连接中...",
DeviceState.LISTENING: "聆听中...",
DeviceState.SPEAKING: "说话中..."
}
return states.get(self.device_state, "未知")
def _get_current_text(self):
"""获取当前显示文本"""
return self.current_text
def _get_current_emotion(self):
"""获取当前表情"""
# 如果表情没有变化,直接返回缓存的路径
if hasattr(self, '_last_emotion') and self._last_emotion == self.current_emotion:
return self._last_emotion_path
# 获取基础路径
if getattr(sys, 'frozen', False):
# 打包环境
if hasattr(sys, '_MEIPASS'):
base_path = Path(sys._MEIPASS)
else:
base_path = Path(sys.executable).parent
else:
# 开发环境
base_path = Path(__file__).parent.parent
emotion_dir = base_path / "assets" / "emojis"
emotions = {
"neutral": str(emotion_dir / "neutral.gif"),
"happy": str(emotion_dir / "happy.gif"),
"laughing": str(emotion_dir / "laughing.gif"),
"funny": str(emotion_dir / "funny.gif"),
"sad": str(emotion_dir / "sad.gif"),
"angry": str(emotion_dir / "angry.gif"),
"crying": str(emotion_dir / "crying.gif"),
"loving": str(emotion_dir / "loving.gif"),
"embarrassed": str(emotion_dir / "embarrassed.gif"),
"surprised": str(emotion_dir / "surprised.gif"),
"shocked": str(emotion_dir / "shocked.gif"),
"thinking": str(emotion_dir / "thinking.gif"),
"winking": str(emotion_dir / "winking.gif"),
"cool": str(emotion_dir / "cool.gif"),
"relaxed": str(emotion_dir / "relaxed.gif"),
"delicious": str(emotion_dir / "delicious.gif"),
"kissy": str(emotion_dir / "kissy.gif"),
"confident": str(emotion_dir / "confident.gif"),
"sleepy": str(emotion_dir / "sleepy.gif"),
"silly": str(emotion_dir / "silly.gif"),
"confused": str(emotion_dir / "confused.gif")
}
# 保存当前表情和对应的路径
self._last_emotion = self.current_emotion
self._last_emotion_path = emotions.get(self.current_emotion, str(emotion_dir / "neutral.gif"))
logger.debug(f"表情路径: {self._last_emotion_path}")
return self._last_emotion_path
def set_chat_message(self, role, message):
"""设置聊天消息"""
self.current_text = message
# 更新显示
if self.display:
self.display.update_text(message)
def set_emotion(self, emotion):
"""设置表情"""
self.current_emotion = emotion
# 更新显示
if self.display:
self.display.update_emotion(self._get_current_emotion())
def start_listening(self):
"""开始监听"""
self.schedule(self._start_listening_impl)
def _start_listening_impl(self):
"""开始监听的实现"""
if not self.protocol:
logger.error("协议未初始化")
return
self.keep_listening = False
# 检查唤醒词检测器是否存在
if self.wake_word_detector:
self.wake_word_detector.pause()
if self.device_state == DeviceState.IDLE:
self.schedule(lambda: self.set_device_state(DeviceState.CONNECTING)) # 设置设备状态为连接中
# 尝试打开音频通道
if not self.protocol.is_audio_channel_opened():
try:
# 等待异步操作完成
future = asyncio.run_coroutine_threadsafe(
self.protocol.open_audio_channel(),
self.loop
)
# 等待操作完成并获取结果
success = future.result(timeout=10.0) # 添加超时时间
if not success:
self.alert("错误", "打开音频通道失败") # 弹出错误提示
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
return
except Exception as e:
logger.error(f"打开音频通道时发生错误: {e}")
self.alert("错误", f"打开音频通道失败: {str(e)}")
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
return
# --- 强制重新初始化输入流 ---
try:
if self.audio_codec:
self.audio_codec._reinitialize_input_stream() # 调用重新初始化
else:
logger.warning("Cannot force reinitialization, audio_codec is None.")
except Exception as force_reinit_e:
logger.error(f"Forced reinitialization failed: {force_reinit_e}", exc_info=True)
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
if self.wake_word_detector and self.wake_word_detector.paused:
self.wake_word_detector.resume()
return
# --- 强制重新初始化结束 ---
asyncio.run_coroutine_threadsafe(
self.protocol.send_start_listening(ListeningMode.MANUAL),
self.loop
)
self.schedule(lambda: self.set_device_state(DeviceState.LISTENING))
elif self.device_state == DeviceState.SPEAKING:
if not self.aborted:
self.abort_speaking(AbortReason.WAKE_WORD_DETECTED)
async def _open_audio_channel_and_start_manual_listening(self):
"""打开音频通道并开始手动监听"""
if not await self.protocol.open_audio_channel():
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
self.alert("错误", "打开音频通道失败")
return
await self.protocol.send_start_listening(ListeningMode.MANUAL)
self.schedule(lambda: self.set_device_state(DeviceState.LISTENING))
def toggle_chat_state(self):
"""切换聊天状态"""
# 检查唤醒词检测器是否存在
if self.wake_word_detector:
self.wake_word_detector.pause()
self.schedule(self._toggle_chat_state_impl)
def _toggle_chat_state_impl(self):
"""切换聊天状态的具体实现"""
# 检查协议是否已初始化
if not self.protocol:
logger.error("协议未初始化")
return
# 如果设备当前处于空闲状态,尝试连接并开始监听
if self.device_state == DeviceState.IDLE:
self.schedule(lambda: self.set_device_state(DeviceState.CONNECTING)) # 设置设备状态为连接中
# 使用线程来处理连接操作,避免阻塞
def connect_and_listen():
# 尝试打开音频通道
if not self.protocol.is_audio_channel_opened():
try:
# 等待异步操作完成
future = asyncio.run_coroutine_threadsafe(
self.protocol.open_audio_channel(),
self.loop
)
# 等待操作完成并获取结果,使用较短的超时时间
try:
success = future.result(timeout=5.0)
except asyncio.TimeoutError:
logger.error("打开音频通道超时")
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
self.alert("错误", "打开音频通道超时")
return
except Exception as e:
logger.error(f"打开音频通道时发生未知错误: {e}")
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
self.alert("错误", f"打开音频通道失败: {str(e)}")
return
if not success:
self.alert("错误", "打开音频通道失败") # 弹出错误提示
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
return
except Exception as e:
logger.error(f"打开音频通道时发生错误: {e}")
self.alert("错误", f"打开音频通道失败: {str(e)}")
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
return
self.keep_listening = True # 开始监听
# 启动自动停止的监听模式
try:
asyncio.run_coroutine_threadsafe(
self.protocol.send_start_listening(ListeningMode.AUTO_STOP),
self.loop
)
self.schedule(lambda: self.set_device_state(DeviceState.LISTENING))
except Exception as e:
logger.error(f"启动监听时发生错误: {e}")
self.set_device_state(DeviceState.IDLE)
self.alert("错误", f"启动监听失败: {str(e)}")
# 启动连接线程
threading.Thread(target=connect_and_listen, daemon=True).start()
# 如果设备正在说话,停止当前说话
elif self.device_state == DeviceState.SPEAKING:
self.abort_speaking(AbortReason.NONE) # 中止说话
# 如果设备正在监听,关闭音频通道
elif self.device_state == DeviceState.LISTENING:
# 使用线程处理关闭操作,避免阻塞
def close_audio_channel():
try:
future = asyncio.run_coroutine_threadsafe(
self.protocol.close_audio_channel(),
self.loop
)
future.result(timeout=3.0) # 使用较短的超时
except Exception as e:
logger.error(f"关闭音频通道时发生错误: {e}")
threading.Thread(target=close_audio_channel, daemon=True).start()
# 立即设置为空闲状态,不等待关闭完成
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
def stop_listening(self):
"""停止监听"""
self.schedule(self._stop_listening_impl)
def _stop_listening_impl(self):
"""停止监听的实现"""
if self.device_state == DeviceState.LISTENING:
asyncio.run_coroutine_threadsafe(
self.protocol.send_stop_listening(),
self.loop
)
self.set_device_state(DeviceState.IDLE)
def abort_speaking(self, reason):
"""中止语音输出"""
# 如果已经中止,不要重复处理
if self.aborted:
logger.debug(f"已经中止,忽略重复的中止请求: {reason}")
return
logger.info(f"中止语音输出,原因: {reason}")
self.aborted = True
# 设置TTS播放状态为False
self.set_is_tts_playing(False)
# 立即清空音频队列
if self.audio_codec:
self.audio_codec.clear_audio_queue()
# 注释掉确保VAD检测器暂停的代码
# if hasattr(self, 'vad_detector') and self.vad_detector:
# self.vad_detector.pause()
# 使用线程来处理状态变更和异步操作,避免阻塞主线程
def process_abort():
# 先发送中止指令
try:
future = asyncio.run_coroutine_threadsafe(
self.protocol.send_abort_speaking(reason),
self.loop
)
# 使用较短的超时确保不会长时间阻塞
future.result(timeout=1.0)
except Exception as e:
logger.error(f"发送中止指令时出错: {e}")
# 然后设置状态
# self.set_device_state(DeviceState.IDLE)
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
# 如果是唤醒词触发的中止,并且启用了自动聆听,则自动进入录音模式
if (reason == AbortReason.WAKE_WORD_DETECTED and
self.keep_listening and
self.protocol.is_audio_channel_opened()):
# 短暂延迟确保abort命令被处理
time.sleep(0.1) # 缩短延迟时间
self.schedule(lambda: self.toggle_chat_state())
# 启动处理线程
threading.Thread(target=process_abort, daemon=True).start()
def alert(self, title, message):
"""显示警告信息"""
logger.warning(f"警告: {title}, {message}")
# 在GUI上显示警告
if self.display:
self.display.update_text(f"{title}: {message}")
def on_state_changed(self, callback):
"""注册状态变化回调"""
self.on_state_changed_callbacks.append(callback)
def shutdown(self):
"""关闭应用程序"""
logger.info("正在关闭应用程序...")
self.running = False
# 关闭音频编解码器
if self.audio_codec:
self.audio_codec.close()
# 关闭协议
if self.protocol:
asyncio.run_coroutine_threadsafe(
self.protocol.close_audio_channel(),
self.loop
)
# 停止事件循环
if self.loop and self.loop.is_running():
self.loop.call_soon_threadsafe(self.loop.stop)
# 等待事件循环线程结束
if self.loop_thread and self.loop_thread.is_alive():
self.loop_thread.join(timeout=1.0)
# 停止唤醒词检测
if self.wake_word_detector:
self.wake_word_detector.stop()
# 关闭VAD检测器
# if hasattr(self, 'vad_detector') and self.vad_detector:
# self.vad_detector.stop()
logger.info("应用程序已关闭")
def _handle_verification_code(self, text):
"""处理验证码信息"""
try:
# 提取验证码
import re
verification_code = re.search(r'验证码:(\d+)', text)
if verification_code:
code = verification_code.group(1)
# 尝试复制到剪贴板
try:
import pyperclip
pyperclip.copy(code)
logger.info(f"验证码 {code} 已复制到剪贴板")
except Exception as e:
logger.warning(f"无法复制验证码到剪贴板: {e}")
# 尝试打开浏览器
try:
import webbrowser
if webbrowser.open("https://xiaozhi.me/login"):
logger.info("已打开登录页面")
else:
logger.warning("无法打开浏览器")
except Exception as e:
logger.warning(f"打开浏览器时出错: {e}")
# 无论如何都显示验证码
self.alert("验证码", f"您的验证码是: {code}")
except Exception as e:
logger.error(f"处理验证码时出错: {e}")
def _on_mode_changed(self, auto_mode):
"""处理对话模式变更"""
# 只有在IDLE状态下才允许切换模式
if self.device_state != DeviceState.IDLE:
self.alert("提示", "只有在待命状态下才能切换对话模式")
return False
self.keep_listening = auto_mode
logger.info(f"对话模式已切换为: {'自动' if auto_mode else '手动'}")
return True
def _initialize_wake_word_detector(self):
"""初始化唤醒词检测器"""
# 首先检查配置中是否启用了唤醒词功能
if not self.config.get_config('WAKE_WORD_OPTIONS.USE_WAKE_WORD', False):
logger.info("唤醒词功能已在配置中禁用,跳过初始化")
self.wake_word_detector = None
return
try:
from src.audio_processing.wake_word_detect import WakeWordDetector
# 创建检测器实例
self.wake_word_detector = WakeWordDetector()
# 如果唤醒词检测器被禁用(内部故障),则更新配置
if not getattr(self.wake_word_detector, 'enabled', True):
logger.warning("唤醒词检测器被禁用(内部故障)")
self.config.update_config("WAKE_WORD_OPTIONS.USE_WAKE_WORD", False)
self.wake_word_detector = None
return
# 注册唤醒词检测回调和错误处理
self.wake_word_detector.on_detected(self._on_wake_word_detected)
# 使用lambda捕获self,而不是单独定义函数
self.wake_word_detector.on_error = lambda error: (
self._handle_wake_word_error(error)
)
logger.info("唤醒词检测器初始化成功")
# 启动唤醒词检测器
self._start_wake_word_detector()
except Exception as e:
logger.error(f"初始化唤醒词检测器失败: {e}")
import traceback
logger.error(traceback.format_exc())
# 禁用唤醒词功能,但不影响程序其他功能
self.config.update_config("WAKE_WORD_OPTIONS.USE_WAKE_WORD", False)
logger.info("由于初始化失败,唤醒词功能已禁用,但程序将继续运行")
self.wake_word_detector = None
def _handle_wake_word_error(self, error):
"""处理唤醒词检测器错误"""
logger.error(f"唤醒词检测错误: {error}")
# 尝试重新启动检测器
if self.device_state == DeviceState.IDLE:
self.schedule(lambda: self._restart_wake_word_detector())
def _start_wake_word_detector(self):
"""启动唤醒词检测器"""
if not self.wake_word_detector:
return
# 确保音频编解码器已初始化
if hasattr(self, 'audio_codec') and self.audio_codec:
logger.info("使用音频编解码器启动唤醒词检测器")
self.wake_word_detector.start(self.audio_codec)
else:
# 如果没有音频编解码器,使用独立模式
logger.info("使用独立模式启动唤醒词检测器")
self.wake_word_detector.start()
def _on_wake_word_detected(self, wake_word, full_text):
"""唤醒词检测回调"""
logger.info(f"检测到唤醒词: {wake_word} (完整文本: {full_text})")
self.schedule(lambda: self._handle_wake_word_detected(wake_word))
def _handle_wake_word_detected(self, wake_word):
"""处理唤醒词检测事件"""
if self.device_state == DeviceState.IDLE:
# 暂停唤醒词检测
if self.wake_word_detector:
self.wake_word_detector.pause()
# 开始连接并监听
self.schedule(lambda: self.set_device_state(DeviceState.CONNECTING))
# 尝试连接并打开音频通道
asyncio.run_coroutine_threadsafe(
self._connect_and_start_listening(wake_word),
self.loop
)
elif self.device_state == DeviceState.SPEAKING:
self.abort_speaking(AbortReason.WAKE_WORD_DETECTED)
async def _connect_and_start_listening(self, wake_word):
"""连接服务器并开始监听"""
# 首先尝试连接服务器
if not await self.protocol.connect():
logger.error("连接服务器失败")
self.alert("错误", "连接服务器失败")
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
# 恢复唤醒词检测
if self.wake_word_detector:
self.wake_word_detector.resume()
return
# 然后尝试打开音频通道
if not await self.protocol.open_audio_channel():
logger.error("打开音频通道失败")
self.schedule(lambda: self.set_device_state(DeviceState.IDLE))
self.alert("错误", "打开音频通道失败")
# 恢复唤醒词检测
if self.wake_word_detector:
self.wake_word_detector.resume()
return
await self.protocol.send_wake_word_detected(wake_word)
# 设置为自动监听模式
self.keep_listening = True
await self.protocol.send_start_listening(ListeningMode.AUTO_STOP)
self.schedule(lambda: self.set_device_state(DeviceState.LISTENING))
def _restart_wake_word_detector(self):
"""重新启动唤醒词检测器"""
logger.info("尝试重新启动唤醒词检测器")
try:
# 停止现有的检测器
if self.wake_word_detector:
self.wake_word_detector.stop()
time.sleep(0.5) # 给予一些时间让资源释放
# 直接使用音频编解码器
if hasattr(self, 'audio_codec') and self.audio_codec:
self.wake_word_detector.start(self.audio_codec)
logger.info("使用音频编解码器重新启动唤醒词检测器")
else:
# 如果没有音频编解码器,使用独立模式
self.wake_word_detector.start()
logger.info("使用独立模式重新启动唤醒词检测器")
logger.info("唤醒词检测器重新启动成功")
except Exception as e:
logger.error(f"重新启动唤醒词检测器失败: {e}")
def _initialize_iot_devices(self):
"""初始化物联网设备"""
from src.iot.thing_manager import ThingManager
from src.iot.things.lamp import Lamp
from src.iot.things.speaker import Speaker
from src.iot.things.music_player import MusicPlayer
from src.iot.things.CameraVL.Camera import Camera
from src.iot.things.query_bridge_rag import QueryBridgeRAG
from src.iot.things.temperature_sensor import TemperatureSensor
# 导入Home Assistant设备控制类
from src.iot.things.ha_control import HomeAssistantLight, HomeAssistantSwitch, HomeAssistantNumber, HomeAssistantButton
# 导入新的倒计时器设备
from src.iot.things.countdown_timer import CountdownTimer
# 获取物联网设备管理器实例
thing_manager = ThingManager.get_instance()
# 添加设备
thing_manager.add_thing(Lamp())
thing_manager.add_thing(Speaker())
thing_manager.add_thing(MusicPlayer())
# 默认不启用以下示例
thing_manager.add_thing(Camera())
# thing_manager.add_thing(QueryBridgeRAG())
# thing_manager.add_thing(TemperatureSensor())
# 添加倒计时器设备
thing_manager.add_thing(CountdownTimer())
logger.info("已添加倒计时器设备,用于计时执行命令用")
# 添加Home Assistant设备
ha_devices = self.config.get_config("HOME_ASSISTANT.DEVICES", [])
for device in ha_devices:
entity_id = device.get("entity_id")
friendly_name = device.get("friendly_name")
if entity_id:
# 根据实体ID判断设备类型
if entity_id.startswith("light."):
# 灯设备
thing_manager.add_thing(HomeAssistantLight(entity_id, friendly_name))
logger.info(f"已添加Home Assistant灯设备: {friendly_name or entity_id}")
elif entity_id.startswith("switch."):
# 开关设备
thing_manager.add_thing(HomeAssistantSwitch(entity_id, friendly_name))
logger.info(f"已添加Home Assistant开关设备: {friendly_name or entity_id}")
elif entity_id.startswith("number."):
# 数值设备(如音量控制)
thing_manager.add_thing(HomeAssistantNumber(entity_id, friendly_name))
logger.info(f"已添加Home Assistant数值设备: {friendly_name or entity_id}")
elif entity_id.startswith("button."):
# 按钮设备
thing_manager.add_thing(HomeAssistantButton(entity_id, friendly_name))
logger.info(f"已添加Home Assistant按钮设备: {friendly_name or entity_id}")
else:
# 默认作为灯设备处理
thing_manager.add_thing(HomeAssistantLight(entity_id, friendly_name))
logger.info(f"已添加Home Assistant设备(默认作为灯处理): {friendly_name or entity_id}")
logger.info("物联网设备初始化完成")
def _handle_iot_message(self, data):
"""处理物联网消息"""
from src.iot.thing_manager import ThingManager
thing_manager = ThingManager.get_instance()
commands = data.get("commands", [])
for command in commands:
try:
result = thing_manager.invoke(command)
logger.info(f"执行物联网命令结果: {result}")
# self.schedule(lambda: self._update_iot_states())
except Exception as e:
logger.error(f"执行物联网命令失败: {e}")
def _update_iot_states(self, delta=None):
"""
更新物联网设备状态
Args:
delta: 是否只发送变化的部分
- None: 使用原始行为,总是发送所有状态
- True: 只发送变化的部分
- False: 发送所有状态并重置缓存
"""
from src.iot.thing_manager import ThingManager
thing_manager = ThingManager.get_instance()
# 处理向下兼容
if delta is None:
# 保持原有行为:获取所有状态并发送
states_json = thing_manager.get_states_json_str() # 调用旧方法
# 发送状态更新
asyncio.run_coroutine_threadsafe(
self.protocol.send_iot_states(states_json),
self.loop
)
logger.info("物联网设备状态已更新")
return
# 使用新方法获取状态
changed, states_json = thing_manager.get_states_json(delta=delta)
# delta=False总是发送,delta=True只在有变化时发送
if not delta or changed:
asyncio.run_coroutine_threadsafe(
self.protocol.send_iot_states(states_json),
self.loop
)
if delta:
logger.info("物联网设备状态已更新(增量)")
else:
logger.info("物联网设备状态已更新(完整)")
else:
logger.debug("物联网设备状态无变化,跳过更新")
def _update_wake_word_detector_stream(self):
"""更新唤醒词检测器的音频流"""
if self.wake_word_detector and self.audio_codec and self.wake_word_detector.is_running():
# 直接引用AudioCodec实例中的输入流
if self.audio_codec.input_stream and self.audio_codec.input_stream.is_active():
self.wake_word_detector.stream = self.audio_codec.input_stream
self.wake_word_detector.external_stream = True
logger.info("已更新唤醒词检测器的音频流引用")