This commit is contained in:
zhinianboke 2025-10-23 19:17:08 +08:00
parent 97637aa963
commit bdc301df70
4 changed files with 417 additions and 151 deletions

View File

@ -5,6 +5,7 @@ import time
import base64
import os
import random
from enum import Enum
from loguru import logger
import websockets
from utils.xianyu_utils import (
@ -23,6 +24,16 @@ from collections import defaultdict
from db_manager import db_manager
class ConnectionState(Enum):
"""WebSocket连接状态枚举"""
DISCONNECTED = "disconnected" # 未连接
CONNECTING = "connecting" # 连接中
CONNECTED = "connected" # 已连接
RECONNECTING = "reconnecting" # 重连中
FAILED = "failed" # 连接失败
CLOSED = "closed" # 已关闭
class AutoReplyPauseManager:
"""自动回复暂停管理器"""
def __init__(self):
@ -156,8 +167,10 @@ class XianyuLive:
_order_detail_lock_times = {}
# 商品详情缓存24小时有效
_item_detail_cache = {} # {item_id: {'detail': str, 'timestamp': float}}
_item_detail_cache = {} # {item_id: {'detail': str, 'timestamp': float, 'access_time': float}}
_item_detail_cache_lock = asyncio.Lock()
_item_detail_cache_max_size = 1000 # 最大缓存1000个商品
_item_detail_cache_ttl = 24 * 60 * 60 # 24小时TTL
# 类级别的实例管理字典用于API调用
_instances = {} # {cookie_id: XianyuLive实例}
@ -173,6 +186,139 @@ class XianyuLive:
except:
return "未知错误"
def _set_connection_state(self, new_state: ConnectionState, reason: str = ""):
"""设置连接状态并记录日志"""
if self.connection_state != new_state:
old_state = self.connection_state
self.connection_state = new_state
self.last_state_change_time = time.time()
# 记录状态转换
state_msg = f"{self.cookie_id}】连接状态: {old_state.value}{new_state.value}"
if reason:
state_msg += f" ({reason})"
# 根据状态严重程度选择日志级别
if new_state == ConnectionState.FAILED:
logger.error(state_msg)
elif new_state == ConnectionState.RECONNECTING:
logger.warning(state_msg)
elif new_state == ConnectionState.CONNECTED:
logger.success(state_msg)
else:
logger.info(state_msg)
async def _cancel_background_tasks(self):
"""取消并清理所有后台任务"""
tasks_to_cancel = []
# 收集所有需要取消的任务
if self.heartbeat_task:
tasks_to_cancel.append(("心跳任务", self.heartbeat_task))
if self.token_refresh_task:
tasks_to_cancel.append(("Token刷新任务", self.token_refresh_task))
if self.cleanup_task:
tasks_to_cancel.append(("清理任务", self.cleanup_task))
if self.cookie_refresh_task:
tasks_to_cancel.append(("Cookie刷新任务", self.cookie_refresh_task))
if not tasks_to_cancel:
logger.debug(f"{self.cookie_id}】没有后台任务需要取消")
return
logger.info(f"{self.cookie_id}】开始取消 {len(tasks_to_cancel)} 个后台任务...")
# 取消所有任务
for task_name, task in tasks_to_cancel:
try:
task.cancel()
logger.debug(f"{self.cookie_id}】已发送取消信号: {task_name}")
except Exception as e:
logger.warning(f"{self.cookie_id}】取消任务失败 {task_name}: {e}")
# 等待所有任务完成取消
tasks = [task for _, task in tasks_to_cancel]
try:
await asyncio.wait_for(
asyncio.gather(*tasks, return_exceptions=True),
timeout=5.0
)
logger.info(f"{self.cookie_id}】所有后台任务已取消")
except asyncio.TimeoutError:
logger.warning(f"{self.cookie_id}】等待任务取消超时,强制继续")
except Exception as e:
logger.warning(f"{self.cookie_id}】等待任务取消时出错: {e}")
# 重置任务引用
self.heartbeat_task = None
self.token_refresh_task = None
self.cleanup_task = None
self.cookie_refresh_task = None
def _calculate_retry_delay(self, error_msg: str) -> int:
"""根据错误类型和失败次数计算重试延迟"""
# WebSocket意外断开 - 短延迟
if "no close frame received or sent" in error_msg:
return min(3 * self.connection_failures, 15)
# 网络连接问题 - 长延迟
elif "Connection refused" in error_msg or "timeout" in error_msg.lower():
return min(10 * self.connection_failures, 60)
# 其他未知错误 - 中等延迟
else:
return min(5 * self.connection_failures, 30)
def _cleanup_instance_caches(self):
"""清理实例级别的缓存,防止内存泄漏"""
try:
current_time = time.time()
cleaned_total = 0
# 清理过期的通知记录保留1小时内的
max_notification_age = 3600 # 1小时
expired_notifications = [
key for key, last_time in self.last_notification_time.items()
if current_time - last_time > max_notification_age
]
for key in expired_notifications:
del self.last_notification_time[key]
if expired_notifications:
cleaned_total += len(expired_notifications)
logger.debug(f"{self.cookie_id}】清理了 {len(expired_notifications)} 个过期通知记录")
# 清理过期的发货记录保留1小时内的
max_delivery_age = 3600 # 1小时
expired_deliveries = [
order_id for order_id, last_time in self.last_delivery_time.items()
if current_time - last_time > max_delivery_age
]
for order_id in expired_deliveries:
del self.last_delivery_time[order_id]
if expired_deliveries:
cleaned_total += len(expired_deliveries)
logger.debug(f"{self.cookie_id}】清理了 {len(expired_deliveries)} 个过期发货记录")
# 清理过期的订单确认记录保留1小时内的
max_confirm_age = 3600 # 1小时
expired_confirms = [
order_id for order_id, last_time in self.confirmed_orders.items()
if current_time - last_time > max_confirm_age
]
for order_id in expired_confirms:
del self.confirmed_orders[order_id]
if expired_confirms:
cleaned_total += len(expired_confirms)
logger.debug(f"{self.cookie_id}】清理了 {len(expired_confirms)} 个过期订单确认记录")
# 只有实际清理了内容才记录总数日志
if cleaned_total > 0:
logger.info(f"{self.cookie_id}】实例缓存清理完成,共清理 {cleaned_total} 条记录")
logger.debug(f"{self.cookie_id}】当前缓存数量 - 通知: {len(self.last_notification_time)}, 发货: {len(self.last_delivery_time)}, 确认: {len(self.confirmed_orders)}")
except Exception as e:
logger.error(f"{self.cookie_id}】清理实例缓存时出错: {self._safe_str(e)}")
def __init__(self, cookies_str=None, cookie_id: str = "default", user_id: int = None):
"""初始化闲鱼直播类"""
logger.info(f"{cookie_id}】开始初始化XianyuLive...")
@ -239,7 +385,7 @@ class XianyuLive:
self.cookie_refresh_task = None
self.cookie_refresh_interval = 1200 # 1小时 = 3600秒
self.last_cookie_refresh_time = 0
self.cookie_refresh_running = False # 防止重复执行Cookie刷新
self.cookie_refresh_lock = asyncio.Lock() # 使用Lock防止重复执行Cookie刷新
self.cookie_refresh_enabled = True # 是否启用Cookie刷新功能
# 扫码登录Cookie刷新标志
@ -260,9 +406,11 @@ class XianyuLive:
self.max_captcha_verification_count = 3 # 最大滑块验证次数,防止无限递归
# WebSocket连接监控
self.connection_state = ConnectionState.DISCONNECTED # 连接状态
self.connection_failures = 0 # 连续连接失败次数
self.max_connection_failures = 5 # 最大连续失败次数
self.last_successful_connection = 0 # 上次成功连接时间
self.last_state_change_time = time.time() # 上次状态变化时间
# 后台任务追踪(用于清理未等待的任务)
self.background_tasks = set() # 追踪所有后台任务
@ -909,7 +1057,8 @@ class XianyuLive:
API_ENDPOINTS.get('token'),
params=params,
data=data,
headers=headers
headers=headers,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
res_json = await response.json()
@ -959,6 +1108,7 @@ class XianyuLive:
# 添加风控日志记录
log_id = None
try:
from db_manager import db_manager
success = db_manager.add_risk_control_log(
cookie_id=self.cookie_id,
event_type='slider_captcha',
@ -986,6 +1136,7 @@ class XianyuLive:
# 更新风控日志为成功状态
if 'log_id' in locals() and log_id:
try:
from db_manager import db_manager
db_manager.update_risk_control_log(
log_id=log_id,
processing_result=f"滑块验证成功,耗时: {captcha_duration:.2f}秒, cookies长度: {len(new_cookies_str)}",
@ -1005,6 +1156,7 @@ class XianyuLive:
# 更新风控日志为失败状态
if 'log_id' in locals() and log_id:
try:
from db_manager import db_manager
db_manager.update_risk_control_log(
log_id=log_id,
processing_result=f"滑块验证失败,耗时: {captcha_duration:.2f}秒, 原因: 未获取到新cookies",
@ -1022,6 +1174,7 @@ class XianyuLive:
captcha_duration = time.time() - captcha_start_time if 'captcha_start_time' in locals() else 0
if 'log_id' in locals() and log_id:
try:
from db_manager import db_manager
db_manager.update_risk_control_log(
log_id=log_id,
processing_result=f"滑块验证处理异常,耗时: {captcha_duration:.2f}",
@ -1609,7 +1762,9 @@ class XianyuLive:
current_time = time.time()
# 检查缓存是否在24小时内
if current_time - cache_time < 24 * 60 * 60: # 24小时
if current_time - cache_time < self._item_detail_cache_ttl:
# 更新访问时间用于LRU
cache_data['access_time'] = current_time
logger.info(f"从缓存获取商品详情: {item_id}")
return cache_data['detail']
else:
@ -1620,12 +1775,8 @@ class XianyuLive:
# 2. 尝试使用浏览器获取商品详情
detail_from_browser = await self._fetch_item_detail_from_browser(item_id)
if detail_from_browser:
# 保存到缓存
async with self._item_detail_cache_lock:
self._item_detail_cache[item_id] = {
'detail': detail_from_browser,
'timestamp': time.time()
}
# 保存到缓存(带大小限制)
await self._add_to_item_cache(item_id, detail_from_browser)
logger.info(f"成功通过浏览器获取商品详情: {item_id}, 长度: {len(detail_from_browser)}")
return detail_from_browser
@ -1633,12 +1784,8 @@ class XianyuLive:
logger.warning(f"浏览器获取商品详情失败尝试外部API: {item_id}")
detail_from_api = await self._fetch_item_detail_from_external_api(item_id)
if detail_from_api:
# 保存到缓存
async with self._item_detail_cache_lock:
self._item_detail_cache[item_id] = {
'detail': detail_from_api,
'timestamp': time.time()
}
# 保存到缓存(带大小限制)
await self._add_to_item_cache(item_id, detail_from_api)
logger.info(f"成功通过外部API获取商品详情: {item_id}, 长度: {len(detail_from_api)}")
return detail_from_api
@ -1649,8 +1796,62 @@ class XianyuLive:
logger.error(f"获取商品详情异常: {item_id}, 错误: {self._safe_str(e)}")
return ""
async def _add_to_item_cache(self, item_id: str, detail: str):
"""添加商品详情到缓存实现LRU策略和大小限制
Args:
item_id: 商品ID
detail: 商品详情
"""
async with self._item_detail_cache_lock:
current_time = time.time()
# 检查缓存大小,如果超过限制则清理
if len(self._item_detail_cache) >= self._item_detail_cache_max_size:
# 使用LRU策略删除最久未访问的项
if self._item_detail_cache:
# 找到最久未访问的项
oldest_item = min(
self._item_detail_cache.items(),
key=lambda x: x[1].get('access_time', x[1]['timestamp'])
)
oldest_item_id = oldest_item[0]
del self._item_detail_cache[oldest_item_id]
logger.debug(f"缓存已满,删除最旧项: {oldest_item_id}")
# 添加新项到缓存
self._item_detail_cache[item_id] = {
'detail': detail,
'timestamp': current_time,
'access_time': current_time
}
logger.debug(f"添加商品详情到缓存: {item_id}, 当前缓存大小: {len(self._item_detail_cache)}")
@classmethod
async def _cleanup_item_cache(cls):
"""清理过期的商品详情缓存"""
async with cls._item_detail_cache_lock:
current_time = time.time()
expired_items = []
# 找出所有过期的项
for item_id, cache_data in cls._item_detail_cache.items():
if current_time - cache_data['timestamp'] >= cls._item_detail_cache_ttl:
expired_items.append(item_id)
# 删除过期项
for item_id in expired_items:
del cls._item_detail_cache[item_id]
if expired_items:
logger.info(f"清理了 {len(expired_items)} 个过期的商品详情缓存")
return len(expired_items)
async def _fetch_item_detail_from_browser(self, item_id: str) -> str:
"""使用浏览器获取商品详情"""
playwright = None
browser = None
try:
from playwright.async_api import async_playwright
@ -1739,6 +1940,7 @@ class XianyuLive:
await asyncio.sleep(3)
# 获取商品详情内容
detail_text = ""
try:
# 等待目标元素出现
await page.wait_for_selector('.desc--GaIUKUQY', timeout=10000)
@ -1748,11 +1950,6 @@ class XianyuLive:
if detail_element:
detail_text = await detail_element.inner_text()
logger.info(f"成功获取商品详情: {item_id}, 长度: {len(detail_text)}")
# 清理资源
await browser.close()
await playwright.stop()
return detail_text.strip()
else:
logger.warning(f"未找到商品详情元素: {item_id}")
@ -1760,15 +1957,26 @@ class XianyuLive:
except Exception as e:
logger.warning(f"获取商品详情元素失败: {item_id}, 错误: {self._safe_str(e)}")
# 清理资源
await browser.close()
await playwright.stop()
return ""
except Exception as e:
logger.error(f"浏览器获取商品详情异常: {item_id}, 错误: {self._safe_str(e)}")
return ""
finally:
# 确保资源被正确清理
try:
if browser:
await browser.close()
logger.debug(f"Browser已关闭: {item_id}")
except Exception as e:
logger.warning(f"关闭browser时出错: {self._safe_str(e)}")
try:
if playwright:
await playwright.stop()
logger.debug(f"Playwright已停止: {item_id}")
except Exception as e:
logger.warning(f"停止playwright时出错: {self._safe_str(e)}")
async def _fetch_item_detail_from_external_api(self, item_id: str) -> str:
"""从外部API获取商品详情备用方案"""
@ -4184,7 +4392,7 @@ class XianyuLive:
return False
async def pause_cleanup_loop(self):
"""定期清理过期的暂停记录和锁"""
"""定期清理过期的暂停记录、锁和缓存"""
while True:
try:
# 检查账号是否启用
@ -4199,6 +4407,28 @@ class XianyuLive:
# 清理过期的锁每5分钟清理一次保留24小时内的锁
self.cleanup_expired_locks(max_age_hours=24)
# 清理过期的商品详情缓存
cleaned_count = await self._cleanup_item_cache()
if cleaned_count > 0:
logger.info(f"{self.cookie_id}】清理了 {cleaned_count} 个过期的商品详情缓存")
# 清理过期的通知、发货和订单确认记录(防止内存泄漏)
self._cleanup_instance_caches()
# 清理AI回复引擎未使用的客户端每5分钟检查一次
try:
from ai_reply_engine import ai_reply_engine
ai_reply_engine.cleanup_unused_clients(max_idle_hours=24)
except Exception as ai_clean_e:
logger.debug(f"{self.cookie_id}】清理AI客户端时出错: {ai_clean_e}")
# 清理QR登录过期会话每5分钟检查一次
try:
from utils.qr_login import qr_login_manager
qr_login_manager.cleanup_expired_sessions()
except Exception as qr_clean_e:
logger.debug(f"{self.cookie_id}】清理QR登录会话时出错: {qr_clean_e}")
# 每5分钟清理一次
await asyncio.sleep(300)
except Exception as e:
@ -4232,7 +4462,7 @@ class XianyuLive:
remaining_seconds = int(remaining_time % 60)
logger.debug(f"{self.cookie_id}】收到消息后冷却中,还需等待 {remaining_minutes}{remaining_seconds}秒 才能执行Cookie刷新")
# 检查是否已有Cookie刷新任务在执行
elif self.cookie_refresh_running:
elif self.cookie_refresh_lock.locked():
logger.debug(f"{self.cookie_id}】Cookie刷新任务已在执行中跳过本次触发")
else:
logger.info(f"{self.cookie_id}】开始执行Cookie刷新任务...")
@ -4248,10 +4478,8 @@ class XianyuLive:
async def _execute_cookie_refresh(self, current_time):
"""独立执行Cookie刷新任务避免阻塞主循环"""
# 设置运行状态,防止重复执行
self.cookie_refresh_running = True
# 使用Lock确保原子性防止重复执行
async with self.cookie_refresh_lock:
try:
logger.info(f"{self.cookie_id}】开始Cookie刷新任务暂时暂停心跳以避免连接冲突...")
@ -4295,9 +4523,6 @@ class XianyuLive:
logger.info(f"{self.cookie_id}】Cookie刷新完成心跳任务正常运行")
self.heartbeat_task = asyncio.create_task(self.heartbeat_loop(self.ws))
# 清除运行状态
self.cookie_refresh_running = False
# 清空消息接收标志允许下次正常执行Cookie刷新
self.last_message_received_time = 0
logger.debug(f"{self.cookie_id}】Cookie刷新完成已清空消息接收标志")
@ -5757,22 +5982,24 @@ class XianyuLive:
headers = WEBSOCKET_HEADERS.copy()
headers['Cookie'] = self.cookies_str
logger.info(f"{self.cookie_id}】准备建立WebSocket连接到: {self.base_url}")
logger.info(f"{self.cookie_id}】WebSocket headers: {headers}")
# 更新连接状态为连接中
self._set_connection_state(ConnectionState.CONNECTING, "准备建立WebSocket连接")
logger.info(f"{self.cookie_id}】WebSocket目标地址: {self.base_url}")
# 兼容不同版本的websockets库
async with await self._create_websocket_connection(headers) as websocket:
logger.info(f"{self.cookie_id}】WebSocket连接建立成功")
self.ws = websocket
logger.info(f"{self.cookie_id}】WebSocket连接建立成功开始初始化...")
# 更新连接状态
self.connection_failures = 0
self.last_successful_connection = time.time()
logger.info(f"{self.cookie_id}】开始初始化WebSocket连接...")
# 开始初始化
await self.init(websocket)
logger.info(f"{self.cookie_id}】WebSocket初始化完成")
# 初始化完成后才设置为已连接状态
self._set_connection_state(ConnectionState.CONNECTED, "初始化完成,连接就绪")
self.connection_failures = 0
self.last_successful_connection = time.time()
# 启动心跳任务
logger.info(f"{self.cookie_id}】启动心跳任务...")
self.heartbeat_task = asyncio.create_task(self.heartbeat_loop(websocket))
@ -5816,93 +6043,51 @@ class XianyuLive:
error_msg = self._safe_str(e)
self.connection_failures += 1
# 更新连接状态为重连中
self._set_connection_state(ConnectionState.RECONNECTING, f"{self.connection_failures}次失败")
# 打印详细的错误信息
import traceback
error_type = type(e).__name__
error_trace = traceback.format_exc()
logger.error(f"WebSocket连接异常 ({self.connection_failures}/{self.max_connection_failures})")
logger.error(f"异常类型: {error_type}")
logger.error(f"异常信息: {error_msg}")
logger.error(f"异常堆栈:\n{error_trace}")
logger.error(f"{self.cookie_id}】WebSocket连接异常 ({self.connection_failures}/{self.max_connection_failures})")
logger.error(f"{self.cookie_id}】异常类型: {error_type}")
logger.error(f"{self.cookie_id}】异常信息: {error_msg}")
logger.debug(f"{self.cookie_id}】异常堆栈:\n{traceback.format_exc()}")
# 检查是否超过最大失败次数
if self.connection_failures >= self.max_connection_failures:
logger.error(f"{self.cookie_id}】连续连接失败{self.max_connection_failures}次,准备重启实例...")
self._set_connection_state(ConnectionState.FAILED, f"连续失败{self.max_connection_failures}")
logger.error(f"{self.cookie_id}】准备重启实例...")
self.connection_failures = 0 # 重置失败计数
await self._restart_instance() # 重启实例
return # 重启后退出当前连接循环
# 根据错误类型和失败次数决定处理策略
if "no close frame received or sent" in error_msg:
logger.info(f"{self.cookie_id}】检测到WebSocket连接意外断开准备重新连接...")
retry_delay = min(3 * self.connection_failures, 15) # 递增重试间隔最大15秒
elif "Connection refused" in error_msg or "timeout" in error_msg.lower():
logger.warning(f"{self.cookie_id}】网络连接问题,延长重试间隔...")
retry_delay = min(10 * self.connection_failures, 60) # 递增重试间隔最大60秒
else:
logger.warning(f"{self.cookie_id}】未知WebSocket错误使用默认重试间隔...")
retry_delay = min(5 * self.connection_failures, 30) # 递增重试间隔最大30秒
# 计算重试延迟
retry_delay = self._calculate_retry_delay(error_msg)
logger.warning(f"{self.cookie_id}】将在 {retry_delay} 秒后重试连接...")
# 清空当前token确保重新连接时会重新获取
if self.current_token:
logger.info(f"{self.cookie_id}】清空当前token重新连接时将重新获取")
logger.debug(f"{self.cookie_id}】清空当前token重新连接时将重新获取")
self.current_token = None
# 取消所有任务并重置为None
tasks_to_cancel = []
if self.heartbeat_task:
tasks_to_cancel.append(self.heartbeat_task)
if self.token_refresh_task:
tasks_to_cancel.append(self.token_refresh_task)
if self.cleanup_task:
tasks_to_cancel.append(self.cleanup_task)
if self.cookie_refresh_task:
tasks_to_cancel.append(self.cookie_refresh_task)
# 使用统一的任务清理方法
await self._cancel_background_tasks()
# 取消所有任务
for task in tasks_to_cancel:
task.cancel()
# 等待所有任务完成取消
if tasks_to_cancel:
try:
await asyncio.gather(*tasks_to_cancel, return_exceptions=True)
except Exception:
pass
# 重置任务为None
self.heartbeat_task = None
self.token_refresh_task = None
self.cleanup_task = None
self.cookie_refresh_task = None
logger.info(f"{self.cookie_id}】等待 {retry_delay} 秒后重试连接...")
# 等待后重试
await asyncio.sleep(retry_delay)
continue
finally:
# 更新连接状态为已关闭
self._set_connection_state(ConnectionState.CLOSED, "程序退出")
# 清空当前token
if self.current_token:
logger.info(f"{self.cookie_id}】程序退出清空当前token")
self.current_token = None
# 清理所有任务
tasks_to_cancel = []
if self.heartbeat_task:
tasks_to_cancel.append(self.heartbeat_task)
if self.token_refresh_task:
tasks_to_cancel.append(self.token_refresh_task)
if self.cleanup_task:
tasks_to_cancel.append(self.cleanup_task)
if self.cookie_refresh_task:
tasks_to_cancel.append(self.cookie_refresh_task)
# 取消所有任务
for task in tasks_to_cancel:
task.cancel()
# 等待所有任务完成取消
if tasks_to_cancel:
await asyncio.gather(*tasks_to_cancel, return_exceptions=True)
# 使用统一的任务清理方法
await self._cancel_background_tasks()
# 清理所有后台任务
if self.background_tasks:
@ -5915,7 +6100,8 @@ class XianyuLive:
except asyncio.TimeoutError:
logger.warning(f"{self.cookie_id}】后台任务清理超时,强制继续")
await self.close_session() # 确保关闭session
# 确保关闭session
await self.close_session()
# 从全局实例字典中注销当前实例
self._unregister_instance()

View File

@ -20,6 +20,7 @@ class AIReplyEngine:
def __init__(self):
self.clients = {} # 存储不同账号的OpenAI客户端
self.agents = {} # 存储不同账号的Agent实例
self.client_last_used = {} # 记录客户端最后使用时间 {cookie_id: timestamp}
self._init_default_prompts()
def _init_default_prompts(self):
@ -71,6 +72,8 @@ class AIReplyEngine:
logger.error(f"创建OpenAI客户端失败 {cookie_id}: {e}")
return None
# 记录使用时间
self.client_last_used[cookie_id] = time.time()
return self.clients[cookie_id]
def _is_dashscope_api(self, settings: dict) -> bool:
@ -374,11 +377,43 @@ class AIReplyEngine:
"""清理客户端缓存"""
if cookie_id:
self.clients.pop(cookie_id, None)
self.client_last_used.pop(cookie_id, None)
logger.info(f"清理账号 {cookie_id} 的客户端缓存")
else:
self.clients.clear()
self.client_last_used.clear()
logger.info("清理所有客户端缓存")
def cleanup_unused_clients(self, max_idle_hours: int = 24):
"""清理长时间未使用的客户端(防止内存泄漏)
Args:
max_idle_hours: 最大空闲时间小时默认24小时
"""
try:
current_time = time.time()
max_idle_seconds = max_idle_hours * 3600
# 找出超过最大空闲时间的客户端
expired_clients = [
cookie_id for cookie_id, last_used in self.client_last_used.items()
if current_time - last_used > max_idle_seconds
]
# 清理过期客户端
for cookie_id in expired_clients:
self.clients.pop(cookie_id, None)
self.client_last_used.pop(cookie_id, None)
self.agents.pop(cookie_id, None)
if expired_clients:
logger.info(f"AI回复引擎清理了 {len(expired_clients)} 个长时间未使用的客户端")
logger.debug(f"清理的账号: {expired_clients}")
logger.debug(f"当前活跃客户端数量: {len(self.clients)}")
except Exception as e:
logger.error(f"AI回复引擎清理未使用客户端时出错: {e}")
# 全局AI回复引擎实例
ai_reply_engine = AIReplyEngine()

View File

@ -100,6 +100,15 @@ class CookieManager:
task = self.tasks.pop(cookie_id, None)
if task:
task.cancel()
try:
# 等待任务完全清理,确保资源释放
await task
except asyncio.CancelledError:
# 任务被取消是预期行为
pass
except Exception as e:
logger.error(f"等待任务清理时出错: {cookie_id}, {e}")
self.cookies.pop(cookie_id, None)
self.keywords.pop(cookie_id, None)
# 从数据库删除
@ -166,6 +175,14 @@ class CookieManager:
task = self.tasks.pop(cookie_id, None)
if task:
task.cancel()
try:
# 等待任务完全清理,确保资源释放
await task
except asyncio.CancelledError:
# 任务被取消是预期行为
pass
except Exception as e:
logger.error(f"等待任务清理时出错: {cookie_id}, {e}")
# 更新Cookie值
self.cookies[cookie_id] = new_value
@ -277,16 +294,41 @@ class CookieManager:
logger.warning(f"Cookie任务不存在跳过停止: {cookie_id}")
return
async def _stop_task_async():
"""异步停止任务并等待清理"""
try:
task = self.tasks[cookie_id]
if not task.done():
task.cancel()
try:
# 等待任务完全清理,确保资源释放
await task
except asyncio.CancelledError:
# 任务被取消是预期行为
pass
except Exception as e:
logger.error(f"等待任务清理时出错: {cookie_id}, {e}")
logger.info(f"已取消Cookie任务: {cookie_id}")
del self.tasks[cookie_id]
logger.info(f"成功停止Cookie任务: {cookie_id}")
except Exception as e:
logger.error(f"停止Cookie任务失败: {cookie_id}, {e}")
try:
# 在事件循环中执行异步停止
if hasattr(self.loop, 'is_running') and self.loop.is_running():
fut = asyncio.run_coroutine_threadsafe(_stop_task_async(), self.loop)
fut.result(timeout=10) # 等待最多10秒
else:
logger.warning(f"事件循环未运行,无法正常等待任务清理: {cookie_id}")
# 直接取消任务(非最佳方案)
task = self.tasks[cookie_id]
if not task.done():
task.cancel()
del self.tasks[cookie_id]
except Exception as e:
logger.error(f"停止Cookie任务失败: {cookie_id}, {e}")
def update_auto_confirm_setting(self, cookie_id: str, auto_confirm: bool):
"""实时更新账号的自动确认发货设置"""
try:

View File

@ -8,6 +8,7 @@ import json
import time
import uuid
import threading
import asyncio
from loguru import logger
from typing import Optional, Dict, Any
@ -68,7 +69,9 @@ class OrderStatusHandler:
# 用于退款撤销时回退到上一次状态
self._order_status_history = {}
# 线程锁,保护并发访问
# 使用threading.RLock保护并发访问
# 注意虽然在async环境中asyncio.Lock更理想但本类的所有方法都是同步的
# 且被同步代码调用因此保持使用threading.RLock是合适的
self._lock = threading.RLock()
# 设置日志级别