This commit is contained in:
zhinianboke 2025-09-29 14:18:29 +08:00
parent d208e7acc9
commit a3503e0be5
7 changed files with 290 additions and 60 deletions

View File

@ -108,6 +108,9 @@ RUN playwright install chromium && \
RUN mkdir -p /app/logs /app/data /app/backups /app/static/uploads/images && \
chmod 777 /app/logs /app/data /app/backups /app/static/uploads /app/static/uploads/images
# 配置系统限制防止core文件生成
RUN echo "ulimit -c 0" >> /etc/profile
# 注意: 为了简化权限问题使用root用户运行
# 在生产环境中,建议配置适当的用户映射

View File

@ -75,8 +75,10 @@ RUN apt-get update && \
libx11-xcb1 \
libxfixes3 \
xdg-utils \
# DrissionPage需要的Chrome/Chromium浏览器
chromium \
# OpenCV运行时依赖
libgl1 \
libglib2.0-0 \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* \
&& rm -rf /tmp/* \

View File

@ -161,7 +161,7 @@ class XianyuLive:
# 类级别的实例管理字典用于API调用
_instances = {} # {cookie_id: XianyuLive实例}
_instances_lock = asyncio.Lock()
def _safe_str(self, e):
"""安全地将异常转换为字符串"""
try:
@ -251,6 +251,8 @@ class XianyuLive:
# 浏览器Cookie刷新成功标志
self.browser_cookie_refreshed = False # 标记_refresh_cookies_via_browser是否成功更新过数据库
self.restarted_in_browser_refresh = False # 刷新流程内部是否已触发重启(用于去重)
# 滑块验证相关
self.captcha_verification_count = 0 # 滑块验证次数计数器
@ -428,7 +430,7 @@ class XianyuLive:
except Exception as e:
logger.error(f"{self.cookie_id}】清理过期锁时发生错误: {self._safe_str(e)}")
def _is_auto_delivery_trigger(self, message: str) -> bool:
"""检查消息是否为自动发货触发关键字"""
@ -772,6 +774,8 @@ class XianyuLive:
logger.info(f"{self.cookie_id}】开始刷新token... (滑块验证重试次数: {captcha_retry_count})")
# 标记本次刷新状态
self.last_token_refresh_status = "started"
# 重置“刷新流程内已重启”标记,避免多次重启
self.restarted_in_browser_refresh = False
# 检查滑块验证重试次数,防止无限递归
if captcha_retry_count >= self.max_captcha_verification_count:
@ -1025,11 +1029,21 @@ class XianyuLive:
refresh_success = await self._refresh_cookies_via_browser(triggered_by_refresh_token=True)
if refresh_success:
# 如果在刷新流程内部已经触发过重启,则跳过重复重启
if getattr(self, 'restarted_in_browser_refresh', False):
logger.info(f"{self.cookie_id}】Cookie刷新成功刷新流程内已重启跳过重复重启")
self.last_token_refresh_status = "restarted_after_cookie_refresh"
return None
logger.info(f"{self.cookie_id}】Cookie刷新成功准备重启实例...")
# Cookie刷新成功后重启实例
await self._restart_instance()
logger.info(f"{self.cookie_id}】实例重启完成")
# 标记重启标志无需主动关闭WS重启由管理器处理
self.connection_restart_flag = True
# 标记为已重启(正常情况)
self.last_token_refresh_status = "restarted_after_cookie_refresh"
return None
@ -1134,14 +1148,14 @@ class XianyuLive:
logger.info(f"{self.cookie_id}】验证URL: {verification_url}")
# 优先使用增强反检测滑块验证器
# 使用增强反检测滑块验证器(独立实例,解决并发冲突)
try:
from utils.xianyu_slider_stealth import XianyuSliderStealth
logger.info(f"{self.cookie_id}】XianyuSliderStealth导入成功使用增强反检测滑块验证")
# 创建增强反检测滑块验证器实例
# 创建独立的滑块验证实例(每个用户独立实例,避免并发冲突)
slider_stealth = XianyuSliderStealth(
user_id=self.cookie_id,
user_id=f"{self.cookie_id}_{int(time.time() * 1000)}", # 使用唯一ID避免冲突
enable_learning=True # 启用学习功能
)
@ -1158,9 +1172,6 @@ class XianyuLive:
verification_url
)
# 关闭浏览器
slider_stealth.close_browser()
if success and cookies:
logger.info(f"{self.cookie_id}】增强反检测滑块验证成功获取到新的cookies")
@ -1211,7 +1222,7 @@ class XianyuLive:
await self.update_config_cookies()
logger.info(f"{self.cookie_id}】滑块验证成功后数据库cookies已自动更新")
# 记录成功更新到日志文件包含x5相关的cookie信息
x5sec_cookies_str = "; ".join([f"{k}={v}" for k, v in x5sec_cookies.items()]) if x5sec_cookies else ""
log_captcha_event(self.cookie_id, "滑块验证成功并自动更新数据库", True,
@ -1610,6 +1621,7 @@ class XianyuLive:
# 在Docker环境中添加额外参数移除 --single-process避免崩溃强制使用软件渲染
if os.getenv('DOCKER_ENV'):
browser_args.extend([
'--single-process',
'--disable-background-networking',
'--disable-client-side-phishing-detection',
'--disable-hang-monitor',
@ -1620,8 +1632,7 @@ class XianyuLive:
'--safebrowsing-disable-auto-update',
'--enable-automation',
'--password-store=basic',
'--use-mock-keychain',
'--use-gl=swiftshader'
'--use-mock-keychain'
])
browser = await playwright.chromium.launch(
@ -2544,12 +2555,16 @@ class XianyuLive:
async with session.get(api_url, params=params, timeout=10) as response:
response_text = await response.text()
logger.info(f"📱 QQ通知 - 响应状态: {response.status}")
logger.info(f"📱 QQ通知 - 响应内容: {response_text}")
if response.status == 200 or response.status == 502:
# 需求502 视为成功,且不打印返回内容
if response.status == 502:
logger.info(f"📱 QQ通知发送成功: {qq_number} (状态码: {response.status})")
elif response.status == 200:
logger.info(f"📱 QQ通知发送成功: {qq_number} (状态码: {response.status})")
logger.debug(f"📱 QQ通知 - 响应内容: {response_text}")
else:
logger.warning(f"📱 QQ通知发送失败: HTTP {response.status}, 响应: {response_text}")
logger.warning(f"📱 QQ通知发送失败: HTTP {response.status}")
logger.debug(f"📱 QQ通知 - 响应内容: {response_text}")
except Exception as e:
logger.error(f"📱 发送QQ通知异常: {self._safe_str(e)}")
@ -2955,14 +2970,17 @@ class XianyuLive:
return
# 构造通知消息
notification_msg = f"""🔴 闲鱼账号Token刷新异常
账号ID: {self.cookie_id}
聊天ID: {chat_id or '未知'}
异常时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}
异常信息: {error_message}
请检查账号Cookie是否过期如有需要请及时更新Cookie配置"""
# 判断异常信息中是否包含"滑块验证成功"
if "滑块验证成功" in error_message:
notification_msg = f"{error_message}\n\n" \
f"账号: {self.cookie_id}\n" \
f"时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n"
else:
notification_msg = f"Token刷新异常\n\n" \
f"账号ID: {self.cookie_id}\n" \
f"异常时间: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}\n" \
f"异常信息: {error_message}\n\n" \
f"请检查账号Cookie是否过期如有需要请及时更新Cookie配置。\n"
logger.info(f"准备发送Token刷新异常通知: {self.cookie_id}")
@ -3151,6 +3169,9 @@ class XianyuLive:
case 'telegram':
await self._send_telegram_notification(config_data, notification_message)
logger.info(f"已发送自动发货通知到Telegram")
case 'bark':
await self._send_bark_notification(config_data, notification_message)
logger.info(f"已发送自动发货通知到Bark")
case _:
logger.warning(f"不支持的通知渠道类型: {channel_type}")
@ -3229,7 +3250,7 @@ class XianyuLive:
async with order_detail_lock:
logger.info(f"🔍 【{self.cookie_id}】获取订单详情锁 {order_id},开始处理...")
try:
logger.info(f"{self.cookie_id}】开始获取订单详情: {order_id}")
@ -3862,8 +3883,7 @@ class XianyuLive:
# 注意refresh_token方法中已经调用了_restart_instance()
# 这里只需要关闭当前连接让main循环重新开始
self.connection_restart_flag = True
if self.ws:
await self.ws.close()
await self._restart_instance()
break
else:
# 根据上一次刷新状态决定日志级别(冷却/已重启为正常情况)
@ -4305,6 +4325,7 @@ class XianyuLive:
# 在Docker环境中添加额外参数移除 --single-process避免崩溃强制使用软件渲染
if os.getenv('DOCKER_ENV'):
browser_args.extend([
'--single-process',
'--disable-background-networking',
'--disable-client-side-phishing-detection',
'--disable-hang-monitor',
@ -4315,8 +4336,7 @@ class XianyuLive:
'--safebrowsing-disable-auto-update',
'--enable-automation',
'--password-store=basic',
'--use-mock-keychain',
'--use-gl=swiftshader'
'--use-mock-keychain'
])
# 使用无头浏览器
@ -4648,6 +4668,7 @@ class XianyuLive:
# 在Docker环境中添加额外参数移除 --single-process避免崩溃强制使用软件渲染
if os.getenv('DOCKER_ENV'):
browser_args.extend([
'--single-process',
'--disable-background-networking',
'--disable-client-side-phishing-detection',
'--disable-hang-monitor',
@ -4658,8 +4679,7 @@ class XianyuLive:
'--safebrowsing-disable-auto-update',
'--enable-automation',
'--password-store=basic',
'--use-mock-keychain',
'--use-gl=swiftshader'
'--use-mock-keychain'
])
# Cookie刷新模式使用无头浏览器
@ -4814,6 +4834,20 @@ class XianyuLive:
if triggered_by_refresh_token:
self.browser_cookie_refreshed = True
logger.info(f"{self.cookie_id}】由refresh_token触发浏览器Cookie刷新成功标志已设置为True")
# 兜底:直接在此处触发实例重启,避免外层协程在返回后被取消导致未重启
try:
# 标记“刷新流程内已触发重启”,供外层去重
self.restarted_in_browser_refresh = True
logger.info(f"{self.cookie_id}】Cookie刷新成功准备重启实例...(via _refresh_cookies_via_browser)")
await self._restart_instance()
logger.info(f"{self.cookie_id}】实例重启完成(via _refresh_cookies_via_browser)")
# 标记重启标志无需主动关闭WS重启由管理器处理
self.connection_restart_flag = True
except Exception as e:
logger.error(f"{self.cookie_id}】兜底重启失败: {self._safe_str(e)}")
else:
logger.info(f"{self.cookie_id}】由定时任务触发不设置浏览器Cookie刷新成功标志")
@ -4826,10 +4860,12 @@ class XianyuLive:
finally:
# 确保资源清理
try:
if browser:
if 'browser' in locals() and browser:
await browser.close()
if playwright:
logger.debug(f"{self.cookie_id}】浏览器已关闭")
if 'playwright' in locals() and playwright:
await playwright.stop()
logger.debug(f"{self.cookie_id}】Playwright已停止")
except Exception as cleanup_e:
logger.warning(f"{self.cookie_id}】清理浏览器资源时出错: {self._safe_str(cleanup_e)}")

View File

@ -70,11 +70,11 @@ services:
deploy:
resources:
limits:
memory: ${MEMORY_LIMIT:-512}M
cpus: '${CPU_LIMIT:-0.5}'
memory: ${MEMORY_LIMIT:-2048}M
cpus: '${CPU_LIMIT:-2.0}'
reservations:
memory: ${MEMORY_RESERVATION:-256}M
cpus: '${CPU_RESERVATION:-0.25}'
memory: ${MEMORY_RESERVATION:-512}M
cpus: '${CPU_RESERVATION:-0.5}'
# 可选添加Nginx反向代理
nginx:

View File

@ -70,11 +70,11 @@ services:
deploy:
resources:
limits:
memory: ${MEMORY_LIMIT:-512}M
cpus: '${CPU_LIMIT:-0.5}'
memory: ${MEMORY_LIMIT:-2048}M
cpus: '${CPU_LIMIT:-2.0}'
reservations:
memory: ${MEMORY_RESERVATION:-256}M
cpus: '${CPU_RESERVATION:-0.25}'
memory: ${MEMORY_RESERVATION:-512}M
cpus: '${CPU_RESERVATION:-0.5}'
# 可选添加Nginx反向代理
nginx:

View File

@ -60,6 +60,9 @@ MANUAL_MODE:
MESSAGE_EXPIRE_TIME: 300000
TOKEN_REFRESH_INTERVAL: 600 # 从3600秒(1小时)增加到72000秒(20小时)
TOKEN_RETRY_INTERVAL: 600 # 从300秒(5分钟)增加到7200秒(2小时)
SLIDER_VERIFICATION:
max_concurrent: 3 # 滑块验证最大并发数
wait_timeout: 60 # 等待排队超时时间(秒)
WEBSOCKET_HEADERS:
Accept-Encoding: gzip, deflate, br, zstd
Accept-Language: zh-CN,zh;q=0.9

View File

@ -8,12 +8,24 @@
import time
import random
import logging
import asyncio
import json
import os
import threading
import tempfile
import shutil
from playwright.sync_api import sync_playwright, ElementHandle
from typing import Optional, Tuple, List, Dict, Any
# 导入配置
try:
from config import config
SLIDER_MAX_CONCURRENT = config.get('SLIDER_VERIFICATION.max_concurrent', 3)
SLIDER_WAIT_TIMEOUT = config.get('SLIDER_VERIFICATION.wait_timeout', 60)
except ImportError:
# 如果无法导入配置,使用默认值
SLIDER_MAX_CONCURRENT = 3
SLIDER_WAIT_TIMEOUT = 60
# 配置日志
logging.basicConfig(
level=logging.INFO,
@ -22,6 +34,96 @@ logging.basicConfig(
)
logger = logging.getLogger(__name__)
# 全局并发控制
class SliderConcurrencyManager:
"""滑块验证并发管理器"""
_instance = None
_lock = threading.Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if not self._initialized:
self.max_concurrent = SLIDER_MAX_CONCURRENT # 从配置文件读取最大并发数
self.wait_timeout = SLIDER_WAIT_TIMEOUT # 从配置文件读取等待超时时间
self.active_instances = {} # 活跃实例
self.waiting_queue = [] # 等待队列
self.instance_lock = threading.Lock()
self._initialized = True
logger.info(f"滑块验证并发管理器初始化: 最大并发数={self.max_concurrent}, 等待超时={self.wait_timeout}")
def can_start_instance(self, user_id: str) -> bool:
"""检查是否可以启动新实例"""
with self.instance_lock:
return len(self.active_instances) < self.max_concurrent
def wait_for_slot(self, user_id: str, timeout: int = None) -> bool:
"""等待可用槽位"""
if timeout is None:
timeout = self.wait_timeout
start_time = time.time()
while time.time() - start_time < timeout:
with self.instance_lock:
if len(self.active_instances) < self.max_concurrent:
return True
# 检查是否在等待队列中
with self.instance_lock:
if user_id not in self.waiting_queue:
self.waiting_queue.append(user_id)
logger.info(f"用户 {user_id} 进入等待队列,当前队列长度: {len(self.waiting_queue)}")
# 等待1秒后重试
time.sleep(1)
# 超时后从队列中移除
with self.instance_lock:
if user_id in self.waiting_queue:
self.waiting_queue.remove(user_id)
logger.warning(f"用户 {user_id} 等待超时,从队列中移除")
return False
def register_instance(self, user_id: str, instance):
"""注册实例"""
with self.instance_lock:
self.active_instances[user_id] = {
'instance': instance,
'start_time': time.time()
}
# 从等待队列中移除
if user_id in self.waiting_queue:
self.waiting_queue.remove(user_id)
def unregister_instance(self, user_id: str):
"""注销实例"""
with self.instance_lock:
if user_id in self.active_instances:
del self.active_instances[user_id]
logger.info(f"用户 {user_id} 实例已注销,当前活跃: {len(self.active_instances)}")
def get_stats(self):
"""获取统计信息"""
with self.instance_lock:
return {
'active_count': len(self.active_instances),
'max_concurrent': self.max_concurrent,
'available_slots': self.max_concurrent - len(self.active_instances),
'queue_length': len(self.waiting_queue),
'waiting_users': self.waiting_queue.copy()
}
# 全局并发管理器实例
concurrency_manager = SliderConcurrencyManager()
class XianyuSliderStealth:
def __init__(self, user_id: str = "default", enable_learning: bool = True):
@ -32,6 +134,22 @@ class XianyuSliderStealth:
self.context = None
self.playwright = None
# 为每个实例创建独立的临时目录
self.temp_dir = tempfile.mkdtemp(prefix=f"slider_{user_id}_")
logger.debug(f"用户 {self.user_id} 创建临时目录: {self.temp_dir}")
# 等待可用槽位(排队机制)
logger.info(f"用户 {self.user_id} 检查并发限制...")
if not concurrency_manager.wait_for_slot(self.user_id):
stats = concurrency_manager.get_stats()
logger.error(f"用户 {self.user_id} 等待槽位超时,当前活跃: {stats['active_count']}/{stats['max_concurrent']}")
raise Exception(f"滑块验证等待槽位超时,请稍后重试")
# 注册实例
concurrency_manager.register_instance(self.user_id, self)
stats = concurrency_manager.get_stats()
logger.info(f"用户 {self.user_id} 实例已注册,当前并发: {stats['active_count']}/{stats['max_concurrent']}")
# 轨迹学习相关属性
self.success_history_file = f"trajectory_history/{user_id}_success.json"
self.trajectory_params = {
@ -68,7 +186,6 @@ class XianyuSliderStealth:
"--no-first-run",
"--no-zygote",
"--disable-gpu",
"--use-gl=swiftshader",
"--disable-web-security",
"--disable-features=VizDisplayCompositor",
f"--window-size={browser_features['window_size']}",
@ -136,8 +253,40 @@ class XianyuSliderStealth:
return self.page
except Exception as e:
logger.error(f"用户 {self.user_id} 初始化浏览器失败: {e}")
# 确保在异常时也清理已创建的资源
self._cleanup_on_init_failure()
raise
def _cleanup_on_init_failure(self):
"""初始化失败时的清理"""
try:
if hasattr(self, 'page') and self.page:
self.page.close()
self.page = None
except Exception as e:
logger.warning(f"用户 {self.user_id} 清理页面时出错: {e}")
try:
if hasattr(self, 'context') and self.context:
self.context.close()
self.context = None
except Exception as e:
logger.warning(f"用户 {self.user_id} 清理上下文时出错: {e}")
try:
if hasattr(self, 'browser') and self.browser:
self.browser.close()
self.browser = None
except Exception as e:
logger.warning(f"用户 {self.user_id} 清理浏览器时出错: {e}")
try:
if hasattr(self, 'playwright') and self.playwright:
self.playwright.stop()
self.playwright = None
except Exception as e:
logger.warning(f"用户 {self.user_id} 清理Playwright时出错: {e}")
def _load_success_history(self) -> List[Dict[str, Any]]:
"""加载历史成功数据"""
try:
@ -309,8 +458,8 @@ class XianyuSliderStealth:
# 记录所有cookie的详细信息
logger.info(f"用户 {self.user_id} 获取到的所有cookie: {list(new_cookies.keys())}")
# 只提取指定的cookie: _m_h5_tk, _m_h5_tk_enc, cookie2, t, sgcookie, unb, uc1, uc3, uc4
target_cookies = ['x5sec', '_m_h5_tk', '_m_h5_tk_enc', 'cookie2', 't', 'sgcookie', 'unb', 'uc1', 'uc3', 'uc4']
# 只提取指定的cookie: x5sec, _m_h5_tk, _m_h5_tk_enc, cookie2, t, sgcookie, unb, uc1, uc3, uc4
target_cookies = ['x5sec', '_m_h5_tk', '_m_h5_tk_enc', 'cookie2', 't']
filtered_cookies = {}
for cookie_name in target_cookies:
@ -334,7 +483,7 @@ class XianyuSliderStealth:
logger.info(f"用户 {self.user_id} 返回过滤后的cookie: {list(filtered_cookies.keys())}")
return filtered_cookies
else:
logger.warning(f"用户 {self.user_id} 未找到目标cookie (_m_h5_tk, _m_h5_tk_enc, cookie2, t, sgcookie, unb, uc1, uc3, uc4)")
logger.warning(f"用户 {self.user_id} 未找到目标cookie (x5sec, _m_h5_tk, _m_h5_tk_enc, cookie2, t)")
return None
else:
logger.warning(f"用户 {self.user_id} 未获取到任何cookie")
@ -1257,7 +1406,28 @@ class XianyuSliderStealth:
return False
def close_browser(self):
"""安全关闭浏览器"""
"""安全关闭浏览器并清理资源"""
logger.info(f"用户 {self.user_id} 开始清理资源...")
# 清理页面
try:
if hasattr(self, 'page') and self.page:
self.page.close()
logger.debug(f"用户 {self.user_id} 页面已关闭")
self.page = None
except Exception as e:
logger.warning(f"用户 {self.user_id} 关闭页面时出错: {e}")
# 清理上下文
try:
if hasattr(self, 'context') and self.context:
self.context.close()
logger.debug(f"用户 {self.user_id} 上下文已关闭")
self.context = None
except Exception as e:
logger.warning(f"用户 {self.user_id} 关闭上下文时出错: {e}")
# 清理浏览器
try:
if hasattr(self, 'browser') and self.browser:
self.browser.close()
@ -1266,6 +1436,7 @@ class XianyuSliderStealth:
except Exception as e:
logger.warning(f"用户 {self.user_id} 关闭浏览器时出错: {e}")
# 清理Playwright
try:
if hasattr(self, 'playwright') and self.playwright:
self.playwright.stop()
@ -1273,6 +1444,24 @@ class XianyuSliderStealth:
self.playwright = None
except Exception as e:
logger.warning(f"用户 {self.user_id} 停止Playwright时出错: {e}")
# 清理临时目录
try:
if hasattr(self, 'temp_dir') and self.temp_dir:
shutil.rmtree(self.temp_dir, ignore_errors=True)
logger.debug(f"用户 {self.user_id} 临时目录已清理: {self.temp_dir}")
except Exception as e:
logger.warning(f"用户 {self.user_id} 清理临时目录时出错: {e}")
# 注销实例(最后执行,确保其他清理完成)
try:
concurrency_manager.unregister_instance(self.user_id)
stats = concurrency_manager.get_stats()
logger.info(f"用户 {self.user_id} 实例已注销,当前并发: {stats['active_count']}/{stats['max_concurrent']},等待队列: {stats['queue_length']}")
except Exception as e:
logger.warning(f"用户 {self.user_id} 注销实例时出错: {e}")
logger.info(f"用户 {self.user_id} 资源清理完成")
def run(self, url: str):
"""运行主流程,返回(成功状态, cookie数据)"""
@ -1355,18 +1544,9 @@ class XianyuSliderStealth:
# 关闭浏览器
self.close_browser()
def process_user_url(user_id: str, url: str, enable_learning: bool = True):
"""处理用户URL的滑块验证 - 增强反检测版本,返回(成功状态, cookie数据)"""
slider = XianyuSliderStealth(user_id, enable_learning)
try:
# run方法已经返回(成功状态, cookie数据)
return slider.run(url)
except Exception as e:
logger.error(f"用户 {user_id} 滑块验证处理异常: {str(e)}")
return False, None
finally:
# 安全关闭浏览器run方法中已经会调用close_browser
slider.close_browser()
def get_slider_stats():
"""获取滑块验证并发统计信息"""
return concurrency_manager.get_stats()
if __name__ == "__main__":
# 简单的命令行示例
@ -1376,5 +1556,11 @@ if __name__ == "__main__":
sys.exit(1)
url = sys.argv[1]
result = process_user_url("test_user", url)
print(f"验证结果: {'成功' if result else '失败'}")
slider = XianyuSliderStealth("test_user", enable_learning=True)
try:
success, cookies = slider.run(url)
print(f"验证结果: {'成功' if success else '失败'}")
if cookies:
print(f"获取到 {len(cookies)} 个cookies")
except Exception as e:
print(f"验证异常: {e}")