From bdc301df70be045f696bfe0efb726cd07f1bc836 Mon Sep 17 00:00:00 2001 From: zhinianboke Date: Thu, 23 Oct 2025 19:17:08 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- XianyuAutoAsync.py | 474 ++++++++++++++++++++++++++++------------ ai_reply_engine.py | 35 +++ cookie_manager.py | 54 ++++- order_status_handler.py | 5 +- 4 files changed, 417 insertions(+), 151 deletions(-) diff --git a/XianyuAutoAsync.py b/XianyuAutoAsync.py index b6bfdd1..b66d3dc 100644 --- a/XianyuAutoAsync.py +++ b/XianyuAutoAsync.py @@ -5,6 +5,7 @@ import time import base64 import os import random +from enum import Enum from loguru import logger import websockets from utils.xianyu_utils import ( @@ -23,6 +24,16 @@ from collections import defaultdict from db_manager import db_manager +class ConnectionState(Enum): + """WebSocket连接状态枚举""" + DISCONNECTED = "disconnected" # 未连接 + CONNECTING = "connecting" # 连接中 + CONNECTED = "connected" # 已连接 + RECONNECTING = "reconnecting" # 重连中 + FAILED = "failed" # 连接失败 + CLOSED = "closed" # 已关闭 + + class AutoReplyPauseManager: """自动回复暂停管理器""" def __init__(self): @@ -156,8 +167,10 @@ class XianyuLive: _order_detail_lock_times = {} # 商品详情缓存(24小时有效) - _item_detail_cache = {} # {item_id: {'detail': str, 'timestamp': float}} + _item_detail_cache = {} # {item_id: {'detail': str, 'timestamp': float, 'access_time': float}} _item_detail_cache_lock = asyncio.Lock() + _item_detail_cache_max_size = 1000 # 最大缓存1000个商品 + _item_detail_cache_ttl = 24 * 60 * 60 # 24小时TTL # 类级别的实例管理字典,用于API调用 _instances = {} # {cookie_id: XianyuLive实例} @@ -173,6 +186,139 @@ class XianyuLive: except: return "未知错误" + def _set_connection_state(self, new_state: ConnectionState, reason: str = ""): + """设置连接状态并记录日志""" + if self.connection_state != new_state: + old_state = self.connection_state + self.connection_state = new_state + self.last_state_change_time = time.time() + + # 记录状态转换 + state_msg = f"【{self.cookie_id}】连接状态: {old_state.value} → {new_state.value}" + if reason: + state_msg += f" ({reason})" + + # 根据状态严重程度选择日志级别 + if new_state == ConnectionState.FAILED: + logger.error(state_msg) + elif new_state == ConnectionState.RECONNECTING: + logger.warning(state_msg) + elif new_state == ConnectionState.CONNECTED: + logger.success(state_msg) + else: + logger.info(state_msg) + + async def _cancel_background_tasks(self): + """取消并清理所有后台任务""" + tasks_to_cancel = [] + + # 收集所有需要取消的任务 + if self.heartbeat_task: + tasks_to_cancel.append(("心跳任务", self.heartbeat_task)) + if self.token_refresh_task: + tasks_to_cancel.append(("Token刷新任务", self.token_refresh_task)) + if self.cleanup_task: + tasks_to_cancel.append(("清理任务", self.cleanup_task)) + if self.cookie_refresh_task: + tasks_to_cancel.append(("Cookie刷新任务", self.cookie_refresh_task)) + + if not tasks_to_cancel: + logger.debug(f"【{self.cookie_id}】没有后台任务需要取消") + return + + logger.info(f"【{self.cookie_id}】开始取消 {len(tasks_to_cancel)} 个后台任务...") + + # 取消所有任务 + for task_name, task in tasks_to_cancel: + try: + task.cancel() + logger.debug(f"【{self.cookie_id}】已发送取消信号: {task_name}") + except Exception as e: + logger.warning(f"【{self.cookie_id}】取消任务失败 {task_name}: {e}") + + # 等待所有任务完成取消 + tasks = [task for _, task in tasks_to_cancel] + try: + await asyncio.wait_for( + asyncio.gather(*tasks, return_exceptions=True), + timeout=5.0 + ) + logger.info(f"【{self.cookie_id}】所有后台任务已取消") + except asyncio.TimeoutError: + logger.warning(f"【{self.cookie_id}】等待任务取消超时,强制继续") + except Exception as e: + logger.warning(f"【{self.cookie_id}】等待任务取消时出错: {e}") + + # 重置任务引用 + self.heartbeat_task = None + self.token_refresh_task = None + self.cleanup_task = None + self.cookie_refresh_task = None + + def _calculate_retry_delay(self, error_msg: str) -> int: + """根据错误类型和失败次数计算重试延迟""" + # WebSocket意外断开 - 短延迟 + if "no close frame received or sent" in error_msg: + return min(3 * self.connection_failures, 15) + + # 网络连接问题 - 长延迟 + elif "Connection refused" in error_msg or "timeout" in error_msg.lower(): + return min(10 * self.connection_failures, 60) + + # 其他未知错误 - 中等延迟 + else: + return min(5 * self.connection_failures, 30) + + def _cleanup_instance_caches(self): + """清理实例级别的缓存,防止内存泄漏""" + try: + current_time = time.time() + cleaned_total = 0 + + # 清理过期的通知记录(保留1小时内的) + max_notification_age = 3600 # 1小时 + expired_notifications = [ + key for key, last_time in self.last_notification_time.items() + if current_time - last_time > max_notification_age + ] + for key in expired_notifications: + del self.last_notification_time[key] + if expired_notifications: + cleaned_total += len(expired_notifications) + logger.debug(f"【{self.cookie_id}】清理了 {len(expired_notifications)} 个过期通知记录") + + # 清理过期的发货记录(保留1小时内的) + max_delivery_age = 3600 # 1小时 + expired_deliveries = [ + order_id for order_id, last_time in self.last_delivery_time.items() + if current_time - last_time > max_delivery_age + ] + for order_id in expired_deliveries: + del self.last_delivery_time[order_id] + if expired_deliveries: + cleaned_total += len(expired_deliveries) + logger.debug(f"【{self.cookie_id}】清理了 {len(expired_deliveries)} 个过期发货记录") + + # 清理过期的订单确认记录(保留1小时内的) + max_confirm_age = 3600 # 1小时 + expired_confirms = [ + order_id for order_id, last_time in self.confirmed_orders.items() + if current_time - last_time > max_confirm_age + ] + for order_id in expired_confirms: + del self.confirmed_orders[order_id] + if expired_confirms: + cleaned_total += len(expired_confirms) + logger.debug(f"【{self.cookie_id}】清理了 {len(expired_confirms)} 个过期订单确认记录") + + # 只有实际清理了内容才记录总数日志 + if cleaned_total > 0: + logger.info(f"【{self.cookie_id}】实例缓存清理完成,共清理 {cleaned_total} 条记录") + logger.debug(f"【{self.cookie_id}】当前缓存数量 - 通知: {len(self.last_notification_time)}, 发货: {len(self.last_delivery_time)}, 确认: {len(self.confirmed_orders)}") + + except Exception as e: + logger.error(f"【{self.cookie_id}】清理实例缓存时出错: {self._safe_str(e)}") + def __init__(self, cookies_str=None, cookie_id: str = "default", user_id: int = None): """初始化闲鱼直播类""" logger.info(f"【{cookie_id}】开始初始化XianyuLive...") @@ -239,7 +385,7 @@ class XianyuLive: self.cookie_refresh_task = None self.cookie_refresh_interval = 1200 # 1小时 = 3600秒 self.last_cookie_refresh_time = 0 - self.cookie_refresh_running = False # 防止重复执行Cookie刷新 + self.cookie_refresh_lock = asyncio.Lock() # 使用Lock防止重复执行Cookie刷新 self.cookie_refresh_enabled = True # 是否启用Cookie刷新功能 # 扫码登录Cookie刷新标志 @@ -260,9 +406,11 @@ class XianyuLive: self.max_captcha_verification_count = 3 # 最大滑块验证次数,防止无限递归 # WebSocket连接监控 + self.connection_state = ConnectionState.DISCONNECTED # 连接状态 self.connection_failures = 0 # 连续连接失败次数 self.max_connection_failures = 5 # 最大连续失败次数 self.last_successful_connection = 0 # 上次成功连接时间 + self.last_state_change_time = time.time() # 上次状态变化时间 # 后台任务追踪(用于清理未等待的任务) self.background_tasks = set() # 追踪所有后台任务 @@ -909,7 +1057,8 @@ class XianyuLive: API_ENDPOINTS.get('token'), params=params, data=data, - headers=headers + headers=headers, + timeout=aiohttp.ClientTimeout(total=30) ) as response: res_json = await response.json() @@ -959,6 +1108,7 @@ class XianyuLive: # 添加风控日志记录 log_id = None try: + from db_manager import db_manager success = db_manager.add_risk_control_log( cookie_id=self.cookie_id, event_type='slider_captcha', @@ -986,6 +1136,7 @@ class XianyuLive: # 更新风控日志为成功状态 if 'log_id' in locals() and log_id: try: + from db_manager import db_manager db_manager.update_risk_control_log( log_id=log_id, processing_result=f"滑块验证成功,耗时: {captcha_duration:.2f}秒, cookies长度: {len(new_cookies_str)}", @@ -1005,6 +1156,7 @@ class XianyuLive: # 更新风控日志为失败状态 if 'log_id' in locals() and log_id: try: + from db_manager import db_manager db_manager.update_risk_control_log( log_id=log_id, processing_result=f"滑块验证失败,耗时: {captcha_duration:.2f}秒, 原因: 未获取到新cookies", @@ -1022,6 +1174,7 @@ class XianyuLive: captcha_duration = time.time() - captcha_start_time if 'captcha_start_time' in locals() else 0 if 'log_id' in locals() and log_id: try: + from db_manager import db_manager db_manager.update_risk_control_log( log_id=log_id, processing_result=f"滑块验证处理异常,耗时: {captcha_duration:.2f}秒", @@ -1609,7 +1762,9 @@ class XianyuLive: current_time = time.time() # 检查缓存是否在24小时内 - if current_time - cache_time < 24 * 60 * 60: # 24小时 + if current_time - cache_time < self._item_detail_cache_ttl: + # 更新访问时间(用于LRU) + cache_data['access_time'] = current_time logger.info(f"从缓存获取商品详情: {item_id}") return cache_data['detail'] else: @@ -1620,12 +1775,8 @@ class XianyuLive: # 2. 尝试使用浏览器获取商品详情 detail_from_browser = await self._fetch_item_detail_from_browser(item_id) if detail_from_browser: - # 保存到缓存 - async with self._item_detail_cache_lock: - self._item_detail_cache[item_id] = { - 'detail': detail_from_browser, - 'timestamp': time.time() - } + # 保存到缓存(带大小限制) + await self._add_to_item_cache(item_id, detail_from_browser) logger.info(f"成功通过浏览器获取商品详情: {item_id}, 长度: {len(detail_from_browser)}") return detail_from_browser @@ -1633,12 +1784,8 @@ class XianyuLive: logger.warning(f"浏览器获取商品详情失败,尝试外部API: {item_id}") detail_from_api = await self._fetch_item_detail_from_external_api(item_id) if detail_from_api: - # 保存到缓存 - async with self._item_detail_cache_lock: - self._item_detail_cache[item_id] = { - 'detail': detail_from_api, - 'timestamp': time.time() - } + # 保存到缓存(带大小限制) + await self._add_to_item_cache(item_id, detail_from_api) logger.info(f"成功通过外部API获取商品详情: {item_id}, 长度: {len(detail_from_api)}") return detail_from_api @@ -1649,8 +1796,62 @@ class XianyuLive: logger.error(f"获取商品详情异常: {item_id}, 错误: {self._safe_str(e)}") return "" + async def _add_to_item_cache(self, item_id: str, detail: str): + """添加商品详情到缓存,实现LRU策略和大小限制 + + Args: + item_id: 商品ID + detail: 商品详情 + """ + async with self._item_detail_cache_lock: + current_time = time.time() + + # 检查缓存大小,如果超过限制则清理 + if len(self._item_detail_cache) >= self._item_detail_cache_max_size: + # 使用LRU策略删除最久未访问的项 + if self._item_detail_cache: + # 找到最久未访问的项 + oldest_item = min( + self._item_detail_cache.items(), + key=lambda x: x[1].get('access_time', x[1]['timestamp']) + ) + oldest_item_id = oldest_item[0] + del self._item_detail_cache[oldest_item_id] + logger.debug(f"缓存已满,删除最旧项: {oldest_item_id}") + + # 添加新项到缓存 + self._item_detail_cache[item_id] = { + 'detail': detail, + 'timestamp': current_time, + 'access_time': current_time + } + logger.debug(f"添加商品详情到缓存: {item_id}, 当前缓存大小: {len(self._item_detail_cache)}") + + @classmethod + async def _cleanup_item_cache(cls): + """清理过期的商品详情缓存""" + async with cls._item_detail_cache_lock: + current_time = time.time() + expired_items = [] + + # 找出所有过期的项 + for item_id, cache_data in cls._item_detail_cache.items(): + if current_time - cache_data['timestamp'] >= cls._item_detail_cache_ttl: + expired_items.append(item_id) + + # 删除过期项 + for item_id in expired_items: + del cls._item_detail_cache[item_id] + + if expired_items: + logger.info(f"清理了 {len(expired_items)} 个过期的商品详情缓存") + + return len(expired_items) + async def _fetch_item_detail_from_browser(self, item_id: str) -> str: """使用浏览器获取商品详情""" + playwright = None + browser = None try: from playwright.async_api import async_playwright @@ -1739,6 +1940,7 @@ class XianyuLive: await asyncio.sleep(3) # 获取商品详情内容 + detail_text = "" try: # 等待目标元素出现 await page.wait_for_selector('.desc--GaIUKUQY', timeout=10000) @@ -1748,11 +1950,6 @@ class XianyuLive: if detail_element: detail_text = await detail_element.inner_text() logger.info(f"成功获取商品详情: {item_id}, 长度: {len(detail_text)}") - - # 清理资源 - await browser.close() - await playwright.stop() - return detail_text.strip() else: logger.warning(f"未找到商品详情元素: {item_id}") @@ -1760,15 +1957,26 @@ class XianyuLive: except Exception as e: logger.warning(f"获取商品详情元素失败: {item_id}, 错误: {self._safe_str(e)}") - # 清理资源 - await browser.close() - await playwright.stop() - return "" except Exception as e: logger.error(f"浏览器获取商品详情异常: {item_id}, 错误: {self._safe_str(e)}") return "" + finally: + # 确保资源被正确清理 + try: + if browser: + await browser.close() + logger.debug(f"Browser已关闭: {item_id}") + except Exception as e: + logger.warning(f"关闭browser时出错: {self._safe_str(e)}") + + try: + if playwright: + await playwright.stop() + logger.debug(f"Playwright已停止: {item_id}") + except Exception as e: + logger.warning(f"停止playwright时出错: {self._safe_str(e)}") async def _fetch_item_detail_from_external_api(self, item_id: str) -> str: """从外部API获取商品详情(备用方案)""" @@ -4184,7 +4392,7 @@ class XianyuLive: return False async def pause_cleanup_loop(self): - """定期清理过期的暂停记录和锁""" + """定期清理过期的暂停记录、锁和缓存""" while True: try: # 检查账号是否启用 @@ -4199,6 +4407,28 @@ class XianyuLive: # 清理过期的锁(每5分钟清理一次,保留24小时内的锁) self.cleanup_expired_locks(max_age_hours=24) + # 清理过期的商品详情缓存 + cleaned_count = await self._cleanup_item_cache() + if cleaned_count > 0: + logger.info(f"【{self.cookie_id}】清理了 {cleaned_count} 个过期的商品详情缓存") + + # 清理过期的通知、发货和订单确认记录(防止内存泄漏) + self._cleanup_instance_caches() + + # 清理AI回复引擎未使用的客户端(每5分钟检查一次) + try: + from ai_reply_engine import ai_reply_engine + ai_reply_engine.cleanup_unused_clients(max_idle_hours=24) + except Exception as ai_clean_e: + logger.debug(f"【{self.cookie_id}】清理AI客户端时出错: {ai_clean_e}") + + # 清理QR登录过期会话(每5分钟检查一次) + try: + from utils.qr_login import qr_login_manager + qr_login_manager.cleanup_expired_sessions() + except Exception as qr_clean_e: + logger.debug(f"【{self.cookie_id}】清理QR登录会话时出错: {qr_clean_e}") + # 每5分钟清理一次 await asyncio.sleep(300) except Exception as e: @@ -4232,7 +4462,7 @@ class XianyuLive: remaining_seconds = int(remaining_time % 60) logger.debug(f"【{self.cookie_id}】收到消息后冷却中,还需等待 {remaining_minutes}分{remaining_seconds}秒 才能执行Cookie刷新") # 检查是否已有Cookie刷新任务在执行 - elif self.cookie_refresh_running: + elif self.cookie_refresh_lock.locked(): logger.debug(f"【{self.cookie_id}】Cookie刷新任务已在执行中,跳过本次触发") else: logger.info(f"【{self.cookie_id}】开始执行Cookie刷新任务...") @@ -4248,59 +4478,54 @@ class XianyuLive: async def _execute_cookie_refresh(self, current_time): """独立执行Cookie刷新任务,避免阻塞主循环""" + # 使用Lock确保原子性,防止重复执行 + async with self.cookie_refresh_lock: + try: + logger.info(f"【{self.cookie_id}】开始Cookie刷新任务,暂时暂停心跳以避免连接冲突...") - # 设置运行状态,防止重复执行 - self.cookie_refresh_running = True + # 暂时暂停心跳任务,避免与浏览器操作冲突 + heartbeat_was_running = False + if self.heartbeat_task and not self.heartbeat_task.done(): + heartbeat_was_running = True + self.heartbeat_task.cancel() + logger.debug(f"【{self.cookie_id}】已暂停心跳任务") - try: - logger.info(f"【{self.cookie_id}】开始Cookie刷新任务,暂时暂停心跳以避免连接冲突...") + # 为整个Cookie刷新任务添加超时保护(3分钟,缩短时间减少影响) + success = await asyncio.wait_for( + self._refresh_cookies_via_browser(), + timeout=180.0 # 3分钟超时,减少对WebSocket的影响 + ) - # 暂时暂停心跳任务,避免与浏览器操作冲突 - heartbeat_was_running = False - if self.heartbeat_task and not self.heartbeat_task.done(): - heartbeat_was_running = True - self.heartbeat_task.cancel() - logger.debug(f"【{self.cookie_id}】已暂停心跳任务") + # 重新启动心跳任务 + if heartbeat_was_running and self.ws and not self.ws.closed: + logger.debug(f"【{self.cookie_id}】重新启动心跳任务") + self.heartbeat_task = asyncio.create_task(self.heartbeat_loop(self.ws)) - # 为整个Cookie刷新任务添加超时保护(3分钟,缩短时间减少影响) - success = await asyncio.wait_for( - self._refresh_cookies_via_browser(), - timeout=180.0 # 3分钟超时,减少对WebSocket的影响 - ) + if success: + self.last_cookie_refresh_time = current_time + logger.info(f"【{self.cookie_id}】Cookie刷新任务完成,心跳已恢复") + else: + logger.warning(f"【{self.cookie_id}】Cookie刷新任务失败") + # 即使失败也要更新时间,避免频繁重试 + self.last_cookie_refresh_time = current_time - # 重新启动心跳任务 - if heartbeat_was_running and self.ws and not self.ws.closed: - logger.debug(f"【{self.cookie_id}】重新启动心跳任务") - self.heartbeat_task = asyncio.create_task(self.heartbeat_loop(self.ws)) - - if success: + except asyncio.TimeoutError: + # 超时也要更新时间,避免频繁重试 self.last_cookie_refresh_time = current_time - logger.info(f"【{self.cookie_id}】Cookie刷新任务完成,心跳已恢复") - else: - logger.warning(f"【{self.cookie_id}】Cookie刷新任务失败") - # 即使失败也要更新时间,避免频繁重试 + except Exception as e: + logger.error(f"【{self.cookie_id}】执行Cookie刷新任务异常: {self._safe_str(e)}") + # 异常也要更新时间,避免频繁重试 self.last_cookie_refresh_time = current_time + finally: + # 确保心跳任务恢复(如果WebSocket仍然连接) + if (self.ws and not self.ws.closed and + (not self.heartbeat_task or self.heartbeat_task.done())): + logger.info(f"【{self.cookie_id}】Cookie刷新完成,心跳任务正常运行") + self.heartbeat_task = asyncio.create_task(self.heartbeat_loop(self.ws)) - except asyncio.TimeoutError: - # 超时也要更新时间,避免频繁重试 - self.last_cookie_refresh_time = current_time - except Exception as e: - logger.error(f"【{self.cookie_id}】执行Cookie刷新任务异常: {self._safe_str(e)}") - # 异常也要更新时间,避免频繁重试 - self.last_cookie_refresh_time = current_time - finally: - # 确保心跳任务恢复(如果WebSocket仍然连接) - if (self.ws and not self.ws.closed and - (not self.heartbeat_task or self.heartbeat_task.done())): - logger.info(f"【{self.cookie_id}】Cookie刷新完成,心跳任务正常运行") - self.heartbeat_task = asyncio.create_task(self.heartbeat_loop(self.ws)) - - # 清除运行状态 - self.cookie_refresh_running = False - - # 清空消息接收标志,允许下次正常执行Cookie刷新 - self.last_message_received_time = 0 - logger.debug(f"【{self.cookie_id}】Cookie刷新完成,已清空消息接收标志") + # 清空消息接收标志,允许下次正常执行Cookie刷新 + self.last_message_received_time = 0 + logger.debug(f"【{self.cookie_id}】Cookie刷新完成,已清空消息接收标志") @@ -5757,22 +5982,24 @@ class XianyuLive: headers = WEBSOCKET_HEADERS.copy() headers['Cookie'] = self.cookies_str - logger.info(f"【{self.cookie_id}】准备建立WebSocket连接到: {self.base_url}") - logger.info(f"【{self.cookie_id}】WebSocket headers: {headers}") + # 更新连接状态为连接中 + self._set_connection_state(ConnectionState.CONNECTING, "准备建立WebSocket连接") + logger.info(f"【{self.cookie_id}】WebSocket目标地址: {self.base_url}") # 兼容不同版本的websockets库 async with await self._create_websocket_connection(headers) as websocket: - logger.info(f"【{self.cookie_id}】WebSocket连接建立成功!") self.ws = websocket + logger.info(f"【{self.cookie_id}】WebSocket连接建立成功,开始初始化...") - # 更新连接状态 - self.connection_failures = 0 - self.last_successful_connection = time.time() - - logger.info(f"【{self.cookie_id}】开始初始化WebSocket连接...") + # 开始初始化 await self.init(websocket) logger.info(f"【{self.cookie_id}】WebSocket初始化完成!") + # 初始化完成后才设置为已连接状态 + self._set_connection_state(ConnectionState.CONNECTED, "初始化完成,连接就绪") + self.connection_failures = 0 + self.last_successful_connection = time.time() + # 启动心跳任务 logger.info(f"【{self.cookie_id}】启动心跳任务...") self.heartbeat_task = asyncio.create_task(self.heartbeat_loop(websocket)) @@ -5816,93 +6043,51 @@ class XianyuLive: error_msg = self._safe_str(e) self.connection_failures += 1 + # 更新连接状态为重连中 + self._set_connection_state(ConnectionState.RECONNECTING, f"第{self.connection_failures}次失败") + # 打印详细的错误信息 import traceback error_type = type(e).__name__ - error_trace = traceback.format_exc() - logger.error(f"WebSocket连接异常 ({self.connection_failures}/{self.max_connection_failures})") - logger.error(f"异常类型: {error_type}") - logger.error(f"异常信息: {error_msg}") - logger.error(f"异常堆栈:\n{error_trace}") + logger.error(f"【{self.cookie_id}】WebSocket连接异常 ({self.connection_failures}/{self.max_connection_failures})") + logger.error(f"【{self.cookie_id}】异常类型: {error_type}") + logger.error(f"【{self.cookie_id}】异常信息: {error_msg}") + logger.debug(f"【{self.cookie_id}】异常堆栈:\n{traceback.format_exc()}") # 检查是否超过最大失败次数 if self.connection_failures >= self.max_connection_failures: - logger.error(f"【{self.cookie_id}】连续连接失败{self.max_connection_failures}次,准备重启实例...") + self._set_connection_state(ConnectionState.FAILED, f"连续失败{self.max_connection_failures}次") + logger.error(f"【{self.cookie_id}】准备重启实例...") self.connection_failures = 0 # 重置失败计数 await self._restart_instance() # 重启实例 return # 重启后退出当前连接循环 - # 根据错误类型和失败次数决定处理策略 - if "no close frame received or sent" in error_msg: - logger.info(f"【{self.cookie_id}】检测到WebSocket连接意外断开,准备重新连接...") - retry_delay = min(3 * self.connection_failures, 15) # 递增重试间隔,最大15秒 - elif "Connection refused" in error_msg or "timeout" in error_msg.lower(): - logger.warning(f"【{self.cookie_id}】网络连接问题,延长重试间隔...") - retry_delay = min(10 * self.connection_failures, 60) # 递增重试间隔,最大60秒 - else: - logger.warning(f"【{self.cookie_id}】未知WebSocket错误,使用默认重试间隔...") - retry_delay = min(5 * self.connection_failures, 30) # 递增重试间隔,最大30秒 + # 计算重试延迟 + retry_delay = self._calculate_retry_delay(error_msg) + logger.warning(f"【{self.cookie_id}】将在 {retry_delay} 秒后重试连接...") # 清空当前token,确保重新连接时会重新获取 if self.current_token: - logger.info(f"【{self.cookie_id}】清空当前token,重新连接时将重新获取") + logger.debug(f"【{self.cookie_id}】清空当前token,重新连接时将重新获取") self.current_token = None - # 取消所有任务并重置为None - tasks_to_cancel = [] - if self.heartbeat_task: - tasks_to_cancel.append(self.heartbeat_task) - if self.token_refresh_task: - tasks_to_cancel.append(self.token_refresh_task) - if self.cleanup_task: - tasks_to_cancel.append(self.cleanup_task) - if self.cookie_refresh_task: - tasks_to_cancel.append(self.cookie_refresh_task) - - # 取消所有任务 - for task in tasks_to_cancel: - task.cancel() - - # 等待所有任务完成取消 - if tasks_to_cancel: - try: - await asyncio.gather(*tasks_to_cancel, return_exceptions=True) - except Exception: - pass - - # 重置任务为None - self.heartbeat_task = None - self.token_refresh_task = None - self.cleanup_task = None - self.cookie_refresh_task = None + # 使用统一的任务清理方法 + await self._cancel_background_tasks() - logger.info(f"【{self.cookie_id}】等待 {retry_delay} 秒后重试连接...") + # 等待后重试 await asyncio.sleep(retry_delay) continue finally: + # 更新连接状态为已关闭 + self._set_connection_state(ConnectionState.CLOSED, "程序退出") + # 清空当前token if self.current_token: logger.info(f"【{self.cookie_id}】程序退出,清空当前token") self.current_token = None - # 清理所有任务 - tasks_to_cancel = [] - if self.heartbeat_task: - tasks_to_cancel.append(self.heartbeat_task) - if self.token_refresh_task: - tasks_to_cancel.append(self.token_refresh_task) - if self.cleanup_task: - tasks_to_cancel.append(self.cleanup_task) - if self.cookie_refresh_task: - tasks_to_cancel.append(self.cookie_refresh_task) - - # 取消所有任务 - for task in tasks_to_cancel: - task.cancel() - - # 等待所有任务完成取消 - if tasks_to_cancel: - await asyncio.gather(*tasks_to_cancel, return_exceptions=True) + # 使用统一的任务清理方法 + await self._cancel_background_tasks() # 清理所有后台任务 if self.background_tasks: @@ -5915,7 +6100,8 @@ class XianyuLive: except asyncio.TimeoutError: logger.warning(f"【{self.cookie_id}】后台任务清理超时,强制继续") - await self.close_session() # 确保关闭session + # 确保关闭session + await self.close_session() # 从全局实例字典中注销当前实例 self._unregister_instance() diff --git a/ai_reply_engine.py b/ai_reply_engine.py index f3a9d2e..5fe354f 100644 --- a/ai_reply_engine.py +++ b/ai_reply_engine.py @@ -20,6 +20,7 @@ class AIReplyEngine: def __init__(self): self.clients = {} # 存储不同账号的OpenAI客户端 self.agents = {} # 存储不同账号的Agent实例 + self.client_last_used = {} # 记录客户端最后使用时间 {cookie_id: timestamp} self._init_default_prompts() def _init_default_prompts(self): @@ -71,6 +72,8 @@ class AIReplyEngine: logger.error(f"创建OpenAI客户端失败 {cookie_id}: {e}") return None + # 记录使用时间 + self.client_last_used[cookie_id] = time.time() return self.clients[cookie_id] def _is_dashscope_api(self, settings: dict) -> bool: @@ -374,10 +377,42 @@ class AIReplyEngine: """清理客户端缓存""" if cookie_id: self.clients.pop(cookie_id, None) + self.client_last_used.pop(cookie_id, None) logger.info(f"清理账号 {cookie_id} 的客户端缓存") else: self.clients.clear() + self.client_last_used.clear() logger.info("清理所有客户端缓存") + + def cleanup_unused_clients(self, max_idle_hours: int = 24): + """清理长时间未使用的客户端(防止内存泄漏) + + Args: + max_idle_hours: 最大空闲时间(小时),默认24小时 + """ + try: + current_time = time.time() + max_idle_seconds = max_idle_hours * 3600 + + # 找出超过最大空闲时间的客户端 + expired_clients = [ + cookie_id for cookie_id, last_used in self.client_last_used.items() + if current_time - last_used > max_idle_seconds + ] + + # 清理过期客户端 + for cookie_id in expired_clients: + self.clients.pop(cookie_id, None) + self.client_last_used.pop(cookie_id, None) + self.agents.pop(cookie_id, None) + + if expired_clients: + logger.info(f"AI回复引擎:清理了 {len(expired_clients)} 个长时间未使用的客户端") + logger.debug(f"清理的账号: {expired_clients}") + logger.debug(f"当前活跃客户端数量: {len(self.clients)}") + + except Exception as e: + logger.error(f"AI回复引擎:清理未使用客户端时出错: {e}") # 全局AI回复引擎实例 diff --git a/cookie_manager.py b/cookie_manager.py index 1d3138c..fd0bfb5 100644 --- a/cookie_manager.py +++ b/cookie_manager.py @@ -100,6 +100,15 @@ class CookieManager: task = self.tasks.pop(cookie_id, None) if task: task.cancel() + try: + # 等待任务完全清理,确保资源释放 + await task + except asyncio.CancelledError: + # 任务被取消是预期行为 + pass + except Exception as e: + logger.error(f"等待任务清理时出错: {cookie_id}, {e}") + self.cookies.pop(cookie_id, None) self.keywords.pop(cookie_id, None) # 从数据库删除 @@ -166,6 +175,14 @@ class CookieManager: task = self.tasks.pop(cookie_id, None) if task: task.cancel() + try: + # 等待任务完全清理,确保资源释放 + await task + except asyncio.CancelledError: + # 任务被取消是预期行为 + pass + except Exception as e: + logger.error(f"等待任务清理时出错: {cookie_id}, {e}") # 更新Cookie值 self.cookies[cookie_id] = new_value @@ -277,13 +294,38 @@ class CookieManager: logger.warning(f"Cookie任务不存在,跳过停止: {cookie_id}") return + async def _stop_task_async(): + """异步停止任务并等待清理""" + try: + task = self.tasks[cookie_id] + if not task.done(): + task.cancel() + try: + # 等待任务完全清理,确保资源释放 + await task + except asyncio.CancelledError: + # 任务被取消是预期行为 + pass + except Exception as e: + logger.error(f"等待任务清理时出错: {cookie_id}, {e}") + logger.info(f"已取消Cookie任务: {cookie_id}") + del self.tasks[cookie_id] + logger.info(f"成功停止Cookie任务: {cookie_id}") + except Exception as e: + logger.error(f"停止Cookie任务失败: {cookie_id}, {e}") + try: - task = self.tasks[cookie_id] - if not task.done(): - task.cancel() - logger.info(f"已取消Cookie任务: {cookie_id}") - del self.tasks[cookie_id] - logger.info(f"成功停止Cookie任务: {cookie_id}") + # 在事件循环中执行异步停止 + if hasattr(self.loop, 'is_running') and self.loop.is_running(): + fut = asyncio.run_coroutine_threadsafe(_stop_task_async(), self.loop) + fut.result(timeout=10) # 等待最多10秒 + else: + logger.warning(f"事件循环未运行,无法正常等待任务清理: {cookie_id}") + # 直接取消任务(非最佳方案) + task = self.tasks[cookie_id] + if not task.done(): + task.cancel() + del self.tasks[cookie_id] except Exception as e: logger.error(f"停止Cookie任务失败: {cookie_id}, {e}") diff --git a/order_status_handler.py b/order_status_handler.py index 0cb1c20..6898ca3 100644 --- a/order_status_handler.py +++ b/order_status_handler.py @@ -8,6 +8,7 @@ import json import time import uuid import threading +import asyncio from loguru import logger from typing import Optional, Dict, Any @@ -68,7 +69,9 @@ class OrderStatusHandler: # 用于退款撤销时回退到上一次状态 self._order_status_history = {} - # 线程锁,保护并发访问 + # 使用threading.RLock保护并发访问 + # 注意:虽然在async环境中asyncio.Lock更理想,但本类的所有方法都是同步的 + # 且被同步代码调用,因此保持使用threading.RLock是合适的 self._lock = threading.RLock() # 设置日志级别