This commit is contained in:
zhinianboke 2025-10-23 21:47:03 +08:00
parent f829727ab5
commit ef6cc7ffaa
3 changed files with 235 additions and 19 deletions

View File

@ -275,8 +275,8 @@ class XianyuLive:
current_time = time.time()
cleaned_total = 0
# 清理过期的通知记录(保留1小时内的
max_notification_age = 3600 # 1小时
# 清理过期的通知记录(保留30分钟内的从1小时优化
max_notification_age = 1800 # 30分钟从3600优化
expired_notifications = [
key for key, last_time in self.last_notification_time.items()
if current_time - last_time > max_notification_age
@ -287,8 +287,8 @@ class XianyuLive:
cleaned_total += len(expired_notifications)
logger.debug(f"{self.cookie_id}】清理了 {len(expired_notifications)} 个过期通知记录")
# 清理过期的发货记录(保留1小时内的)
max_delivery_age = 3600 # 1小时
# 清理过期的发货记录(保留30分钟内的)
max_delivery_age = 1800 # 30分钟
expired_deliveries = [
order_id for order_id, last_time in self.last_delivery_time.items()
if current_time - last_time > max_delivery_age
@ -299,8 +299,8 @@ class XianyuLive:
cleaned_total += len(expired_deliveries)
logger.debug(f"{self.cookie_id}】清理了 {len(expired_deliveries)} 个过期发货记录")
# 清理过期的订单确认记录(保留1小时内的)
max_confirm_age = 3600 # 1小时
# 清理过期的订单确认记录(保留30分钟内的)
max_confirm_age = 1800 # 30分钟
expired_confirms = [
order_id for order_id, last_time in self.confirmed_orders.items()
if current_time - last_time > max_confirm_age
@ -318,6 +318,57 @@ class XianyuLive:
except Exception as e:
logger.error(f"{self.cookie_id}】清理实例缓存时出错: {self._safe_str(e)}")
async def _cleanup_playwright_cache(self):
"""清理Playwright浏览器临时文件和缓存Docker环境专用"""
try:
import shutil
import glob
# 定义需要清理的临时目录路径
temp_paths = [
'/tmp/playwright-*', # Playwright临时会话
'/tmp/chromium-*', # Chromium临时文件
'/ms-playwright/chromium-*/Default/Cache', # 浏览器缓存
'/ms-playwright/chromium-*/Default/Code Cache', # 代码缓存
'/ms-playwright/chromium-*/Default/GPUCache', # GPU缓存
]
total_cleaned = 0
total_size_mb = 0
for pattern in temp_paths:
try:
matching_paths = glob.glob(pattern)
for path in matching_paths:
try:
if os.path.exists(path):
# 计算大小
if os.path.isdir(path):
size = sum(
os.path.getsize(os.path.join(dirpath, filename))
for dirpath, _, filenames in os.walk(path)
for filename in filenames
)
shutil.rmtree(path, ignore_errors=True)
else:
size = os.path.getsize(path)
os.remove(path)
total_size_mb += size / (1024 * 1024)
total_cleaned += 1
except Exception as e:
logger.debug(f"清理路径 {path} 时出错: {e}")
except Exception as e:
logger.debug(f"匹配路径 {pattern} 时出错: {e}")
if total_cleaned > 0:
logger.info(f"{self.cookie_id}】Playwright缓存清理完成: 删除了 {total_cleaned} 个文件/目录,释放 {total_size_mb:.2f} MB")
else:
logger.debug(f"{self.cookie_id}】Playwright缓存清理: 没有需要清理的临时文件")
except Exception as e:
logger.debug(f"{self.cookie_id}】清理Playwright缓存时出错: {self._safe_str(e)}")
def __init__(self, cookies_str=None, cookie_id: str = "default", user_id: int = None):
"""初始化闲鱼直播类"""
@ -414,6 +465,10 @@ class XianyuLive:
# 后台任务追踪(用于清理未等待的任务)
self.background_tasks = set() # 追踪所有后台任务
# 消息处理并发控制(防止内存泄漏)
self.message_semaphore = asyncio.Semaphore(100) # 最多100个并发消息处理任务
self.active_message_tasks = 0 # 当前活跃的消息处理任务数
# 初始化订单状态处理器
self._init_order_status_handler()
@ -1392,7 +1447,7 @@ class XianyuLive:
# user_id=f"{self.cookie_id}_{int(time.time() * 1000)}", # 使用唯一ID避免冲突
user_id=f"{self.cookie_id}", # 使用唯一ID避免冲突
enable_learning=True, # 启用学习功能
headless=True # 使用无头模式
headless=False # 使用有头模式(可视化浏览器)
)
# 在线程池中执行滑块验证
@ -1496,11 +1551,22 @@ class XianyuLive:
log_captcha_event(self.cookie_id, "滑块验证失败", False,
f"XianyuSliderStealth执行失败, 环境: {'Docker' if os.getenv('DOCKER_ENV') else '本地'}")
# 发送通知
await self.send_token_refresh_notification(
f"滑块验证失败需要手动处理。验证URL: {verification_url}",
"captcha_verification_failed"
# 发送通知检查WebSocket连接状态
# 只有在WebSocket未连接时才发送通知已连接说明可能是暂时性问题
is_ws_connected = (
self.connection_state == ConnectionState.CONNECTED and
self.ws and
not self.ws.closed
)
if is_ws_connected:
logger.info(f"{self.cookie_id}】WebSocket连接正常滑块验证失败可能是暂时的跳过通知")
else:
logger.warning(f"{self.cookie_id}】WebSocket未连接发送滑块验证失败通知")
await self.send_token_refresh_notification(
f"滑块验证失败需要手动处理。验证URL: {verification_url}",
"captcha_verification_failed"
)
return None
except ImportError as import_e:
@ -1525,11 +1591,22 @@ class XianyuLive:
log_captcha_event(self.cookie_id, "滑块验证异常", False,
f"执行异常, 错误: {self._safe_str(stealth_e)[:100]}")
# 发送通知
await self.send_token_refresh_notification(
f"滑块验证执行异常需要手动处理。验证URL: {verification_url}",
"captcha_execution_error"
# 发送通知检查WebSocket连接状态
# 只有在WebSocket未连接时才发送通知已连接说明可能是暂时性问题
is_ws_connected = (
self.connection_state == ConnectionState.CONNECTED and
self.ws and
not self.ws.closed
)
if is_ws_connected:
logger.info(f"{self.cookie_id}】WebSocket连接正常滑块验证执行异常可能是暂时的跳过通知")
else:
logger.warning(f"{self.cookie_id}】WebSocket未连接发送滑块验证执行异常通知")
await self.send_token_refresh_notification(
f"滑块验证执行异常需要手动处理。验证URL: {verification_url}",
"captcha_execution_error"
)
return None
@ -4452,6 +4529,34 @@ class XianyuLive:
qr_login_manager.cleanup_expired_sessions()
except Exception as qr_clean_e:
logger.debug(f"{self.cookie_id}】清理QR登录会话时出错: {qr_clean_e}")
# 清理Playwright浏览器临时文件和缓存每5分钟检查一次
try:
await self._cleanup_playwright_cache()
except Exception as pw_clean_e:
logger.debug(f"{self.cookie_id}】清理Playwright缓存时出错: {pw_clean_e}")
# 清理数据库历史数据每天一次保留90天数据
# 为避免所有实例同时执行,只让第一个实例执行
try:
if hasattr(self.__class__, '_last_db_cleanup_time'):
last_cleanup = self.__class__._last_db_cleanup_time
else:
self.__class__._last_db_cleanup_time = 0
last_cleanup = 0
current_time = time.time()
# 每24小时清理一次
if current_time - last_cleanup > 86400:
logger.info(f"{self.cookie_id}】开始执行数据库历史数据清理...")
stats = db_manager.cleanup_old_data(days=90)
if 'error' not in stats:
logger.info(f"{self.cookie_id}】数据库清理完成: {stats}")
self.__class__._last_db_cleanup_time = current_time
else:
logger.error(f"{self.cookie_id}】数据库清理失败: {stats['error']}")
except Exception as db_clean_e:
logger.debug(f"{self.cookie_id}】清理数据库历史数据时出错: {db_clean_e}")
# 每5分钟清理一次
await asyncio.sleep(300)
@ -5480,6 +5585,18 @@ class XianyuLive:
logger.error(f"调用API出错: {self._safe_str(e)}")
return None
async def _handle_message_with_semaphore(self, message_data, websocket):
"""带信号量的消息处理包装器,防止并发任务过多"""
async with self.message_semaphore:
self.active_message_tasks += 1
try:
await self.handle_message(message_data, websocket)
finally:
self.active_message_tasks -= 1
# 定期记录活跃任务数每100个任务记录一次
if self.active_message_tasks % 100 == 0 and self.active_message_tasks > 0:
logger.info(f"{self.cookie_id}】当前活跃消息处理任务数: {self.active_message_tasks}")
async def handle_message(self, message_data, websocket):
"""处理所有类型的消息"""
try:
@ -6056,8 +6173,9 @@ class XianyuLive:
continue
# 处理其他消息
# 使用异步任务处理消息,防止阻塞后续消息接收
asyncio.create_task(self.handle_message(message_data, websocket))
# 使用追踪的异步任务处理消息,防止阻塞后续消息接收
# 并通过信号量控制并发数量,防止内存泄漏
self._create_tracked_task(self._handle_message_with_semaphore(message_data, websocket))
except Exception as e:
logger.error(f"处理消息出错: {self._safe_str(e)}")

View File

@ -4945,6 +4945,104 @@ class DBManager:
except Exception as e:
logger.error(f"删除风控日志失败: {e}")
return False
def cleanup_old_data(self, days: int = 90) -> dict:
"""清理过期的历史数据,防止数据库无限增长
Args:
days: 保留最近N天的数据默认90天
Returns:
清理统计信息
"""
try:
with self.lock:
cursor = self.conn.cursor()
stats = {}
# 清理AI对话历史保留最近90天
try:
cursor.execute(
"DELETE FROM ai_conversations WHERE created_at < datetime('now', '-' || ? || ' days')",
(days,)
)
stats['ai_conversations'] = cursor.rowcount
if cursor.rowcount > 0:
logger.info(f"清理了 {cursor.rowcount} 条过期的AI对话记录{days}天前)")
except Exception as e:
logger.warning(f"清理AI对话历史失败: {e}")
stats['ai_conversations'] = 0
# 清理风控日志保留最近90天
try:
cursor.execute(
"DELETE FROM risk_control_logs WHERE created_at < datetime('now', '-' || ? || ' days')",
(days,)
)
stats['risk_control_logs'] = cursor.rowcount
if cursor.rowcount > 0:
logger.info(f"清理了 {cursor.rowcount} 条过期的风控日志({days}天前)")
except Exception as e:
logger.warning(f"清理风控日志失败: {e}")
stats['risk_control_logs'] = 0
# 清理AI商品缓存保留最近30天
cache_days = min(days, 30) # AI商品缓存最多保留30天
try:
cursor.execute(
"DELETE FROM ai_item_cache WHERE last_updated < datetime('now', '-' || ? || ' days')",
(cache_days,)
)
stats['ai_item_cache'] = cursor.rowcount
if cursor.rowcount > 0:
logger.info(f"清理了 {cursor.rowcount} 条过期的AI商品缓存{cache_days}天前)")
except Exception as e:
logger.warning(f"清理AI商品缓存失败: {e}")
stats['ai_item_cache'] = 0
# 清理验证码记录保留最近1天
try:
cursor.execute(
"DELETE FROM captcha_codes WHERE created_at < datetime('now', '-1 day')"
)
stats['captcha_codes'] = cursor.rowcount
if cursor.rowcount > 0:
logger.info(f"清理了 {cursor.rowcount} 条过期的验证码记录")
except Exception as e:
logger.warning(f"清理验证码记录失败: {e}")
stats['captcha_codes'] = 0
# 清理邮箱验证记录保留最近7天
try:
cursor.execute(
"DELETE FROM email_verifications WHERE created_at < datetime('now', '-7 days')"
)
stats['email_verifications'] = cursor.rowcount
if cursor.rowcount > 0:
logger.info(f"清理了 {cursor.rowcount} 条过期的邮箱验证记录")
except Exception as e:
logger.warning(f"清理邮箱验证记录失败: {e}")
stats['email_verifications'] = 0
# 提交更改
self.conn.commit()
# 执行VACUUM以释放磁盘空间仅当清理了大量数据时
total_cleaned = sum(stats.values())
if total_cleaned > 100:
logger.info(f"共清理了 {total_cleaned} 条记录执行VACUUM以释放磁盘空间...")
cursor.execute("VACUUM")
logger.info("VACUUM执行完成")
stats['vacuum_executed'] = True
else:
stats['vacuum_executed'] = False
stats['total_cleaned'] = total_cleaned
return stats
except Exception as e:
logger.error(f"清理历史数据时出错: {e}")
return {'error': str(e)}
# 全局单例

View File

@ -63,9 +63,9 @@ class FileLogCollector:
logger.add(
self.log_file,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level} | {name}:{function}:{line} - {message}",
level="DEBUG",
level="INFO", # 从DEBUG改为INFO减少日志量
rotation="10 MB",
retention="7 days",
retention="3 days", # 从7天改为3天减少磁盘占用
enqueue=False, # 改为False避免队列延迟
buffering=1 # 行缓冲,立即写入
)