优化滑块逻辑

This commit is contained in:
zhinianboke 2025-11-24 16:24:09 +08:00
parent 2b82e33f1a
commit c69840e9c8
19 changed files with 10086 additions and 1132 deletions

1
.gitignore vendored
View File

@ -705,6 +705,7 @@ playwright-report/**
test-results/**
playwright/.browsers/**
playwright-state/**
browser_data/**
# 统计数据完整覆盖
user_stats.txt

View File

@ -1,8 +1,9 @@
"""项目启动入口:
1. 创建 CookieManager按配置文件 / 环境变量初始化账号任务
2. 在后台线程启动 FastAPI (reply_server) 提供管理与自动回复接口
3. 主协程保持运行
1. 自动初始化必要的目录和数据库
2. 创建 CookieManager按配置文件 / 环境变量初始化账号任务
3. 在后台线程启动 FastAPI (reply_server) 提供管理与自动回复接口
4. 主协程保持运行
"""
import os
@ -10,10 +11,45 @@ import sys
import shutil
from pathlib import Path
# ==================== 在导入任何模块之前先迁移数据库 ====================
# ==================== 初始化目录和配置 ====================
def _init_directories():
"""初始化必要的目录结构"""
print("=" * 50)
print("闲鱼自动回复系统 - 启动中...")
print("=" * 50)
# 创建必要的目录
directories = ['data', 'logs', 'backups']
for dir_name in directories:
dir_path = Path(dir_name)
if not dir_path.exists():
dir_path.mkdir(parents=True, exist_ok=True)
print(f"✓ 创建 {dir_name}/ 目录")
else:
print(f"{dir_name}/ 目录已存在")
return True
def _init_database():
"""初始化数据库(如果不存在)"""
db_path = Path("data/xianyu_data.db")
if not db_path.exists():
print("✓ 首次启动,将自动创建数据库...")
# 数据库会在 db_manager 导入时自动创建
return True
else:
print(f"✓ 数据库已存在: {db_path}")
return False
# 在导入任何模块之前先初始化
_init_directories()
_init_database()
# ==================== 数据库文件迁移(兼容旧版本) ====================
def _migrate_database_files_early():
"""在启动前检查并迁移数据库文件到data目录使用print因为logger还未初始化"""
print("检查数据库文件位置...")
print("\n检查旧版本数据库文件...")
# 确保data目录存在
data_dir = Path("data")
@ -143,13 +179,17 @@ def _start_api_server():
# 在后台线程中创建独立事件循环并直接运行 server.serve()
import uvicorn
try:
config = uvicorn.Config("reply_server:app", host=host, port=port, log_level="info")
# 直接导入 app 对象,避免字符串引用在打包后无法工作
from reply_server import app
config = uvicorn.Config(app, host=host, port=port, log_level="info")
server = uvicorn.Server(config)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(server.serve())
except Exception as e:
logger.error(f"uvicorn服务器启动失败: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
try:
# 确保线程内事件循环被正确关闭
loop = asyncio.get_event_loop()

File diff suppressed because it is too large Load Diff

View File

@ -1,13 +1,20 @@
"""
AI回复引擎模块
集成XianyuAutoAgent的AI回复功能到现有项目中
P0/P1 最小化修改版
- 修复 P1-1 (高成本): detect_intent 改为本地关键词
- 修复 P0-2 (部署陷阱): 移除客户端缓存实现无状态
- 修复 P1-3 (健壮性): 增强 Gemini 消息格式化
- 遵照指示未修复 P0-1 (议价竞争条件)
"""
import os
import json
import time
import sqlite3
import requests
import requests # 确保已导入
import threading
from typing import List, Dict, Optional
from loguru import logger
from openai import OpenAI
@ -18,21 +25,19 @@ class AIReplyEngine:
"""AI回复引擎"""
def __init__(self):
self.clients = {} # 存储不同账号的OpenAI客户端
self.agents = {} # 存储不同账号的Agent实例
self.client_last_used = {} # 记录客户端最后使用时间 {cookie_id: timestamp}
# 修复 P0-2: 移除有状态的缓存,以支持多进程部署
# self.clients = {} # 已移除
# self.agents = {} # 已移除
# self.client_last_used = {} # 已移除
self._init_default_prompts()
# 用于控制同一chat_id消息的串行处理
self._chat_locks = {}
self._chat_locks_lock = threading.Lock()
def _init_default_prompts(self):
"""初始化默认提示词"""
self.default_prompts = {
'classify': '''你是一个意图分类专家,需要判断用户消息的意图类型。
请根据用户消息内容返回以下意图之一
- price: 价格相关议价优惠降价等
- tech: 技术相关产品参数使用方法故障等
- default: 其他一般咨询
只返回意图类型不要其他内容''',
'classify': '''你是一个意图分类专家...(此提示词已不再被 detect_intent 使用)''',
'price': '''你是一位经验丰富的销售专家,擅长议价。
语言要求简短直接每句10总字数40
@ -54,35 +59,32 @@ class AIReplyEngine:
注意结合商品信息给出实用建议'''
}
def get_client(self, cookie_id: str) -> Optional[OpenAI]:
"""获取指定账号的OpenAI客户端"""
if cookie_id not in self.clients:
settings = db_manager.get_ai_reply_settings(cookie_id)
if not settings['ai_enabled'] or not settings['api_key']:
return None
try:
logger.info(f"创建OpenAI客户端 {cookie_id}: base_url={settings['base_url']}, api_key={'***' + settings['api_key'][-4:] if settings['api_key'] else 'None'}")
self.clients[cookie_id] = OpenAI(
api_key=settings['api_key'],
base_url=settings['base_url']
)
logger.info(f"为账号 {cookie_id} 创建OpenAI客户端成功实际base_url: {self.clients[cookie_id].base_url}")
except Exception as e:
logger.error(f"创建OpenAI客户端失败 {cookie_id}: {e}")
return None
def _create_openai_client(self, cookie_id: str) -> Optional[OpenAI]:
"""
( get_client) 创建指定账号的OpenAI客户端
修复 P0-2: 移除了缓存逻辑以支持多进程无状态部署
"""
settings = db_manager.get_ai_reply_settings(cookie_id)
if not settings['ai_enabled'] or not settings['api_key']:
return None
# 记录使用时间
self.client_last_used[cookie_id] = time.time()
return self.clients[cookie_id]
try:
logger.info(f"创建新的OpenAI客户端实例 {cookie_id}: base_url={settings['base_url']}, api_key={'***' + settings['api_key'][-4:] if settings['api_key'] else 'None'}")
client = OpenAI(
api_key=settings['api_key'],
base_url=settings['base_url']
)
logger.info(f"为账号 {cookie_id} 创建OpenAI客户端成功实际base_url: {client.base_url}")
return client
except Exception as e:
logger.error(f"创建OpenAI客户端失败 {cookie_id}: {e}")
return None
def _is_dashscope_api(self, settings: dict) -> bool:
"""判断是否为DashScope API - 只有选择自定义模型时才使用"""
model_name = settings.get('model_name', '')
base_url = settings.get('base_url', '')
# 只有当模型名称为"custom"或"自定义"时才使用DashScope API格式
# 其他情况都使用OpenAI兼容格式
is_custom_model = model_name.lower() in ['custom', '自定义', 'dashscope', 'qwen-custom']
is_dashscope_url = 'dashscope.aliyuncs.com' in base_url
@ -90,29 +92,29 @@ class AIReplyEngine:
return is_custom_model and is_dashscope_url
def _is_gemini_api(self, settings: dict) -> bool:
"""判断是否为Gemini API (通过模型名称)"""
model_name = settings.get('model_name', '').lower()
return 'gemini' in model_name
def _call_dashscope_api(self, settings: dict, messages: list, max_tokens: int = 100, temperature: float = 0.7) -> str:
"""调用DashScope API"""
# 提取app_id从base_url
base_url = settings['base_url']
if '/apps/' in base_url:
app_id = base_url.split('/apps/')[-1].split('/')[0]
else:
raise ValueError("DashScope API URL中未找到app_id")
# 构建请求URL
url = f"https://dashscope.aliyuncs.com/api/v1/apps/{app_id}/completion"
# 构建提示词将messages合并为单个prompt
system_content = ""
user_content = ""
for msg in messages:
if msg['role'] == 'system':
system_content = msg['content']
elif msg['role'] == 'user':
user_content = msg['content']
user_content = msg['content'] # 假设 user prompt 已在 generate_reply 中构建好
# 构建更清晰的prompt格式
if system_content and user_content:
prompt = f"{system_content}\n\n用户问题:{user_content}\n\n请直接回答用户的问题:"
elif user_content:
@ -120,25 +122,18 @@ class AIReplyEngine:
else:
prompt = "\n".join([f"{msg['role']}: {msg['content']}" for msg in messages])
# 构建请求数据
data = {
"input": {
"prompt": prompt
},
"parameters": {
"max_tokens": max_tokens,
"temperature": temperature
},
"input": {"prompt": prompt},
"parameters": {"max_tokens": max_tokens, "temperature": temperature},
"debug": {}
}
headers = {
"Authorization": f"Bearer {settings['api_key']}",
"Content-Type": "application/json"
}
logger.info(f"DashScope API请求: {url}")
logger.info(f"发送的prompt: {prompt}")
logger.info(f"发送的prompt: {prompt[:100]}...") # 避免 prompt 过长
logger.debug(f"请求数据: {json.dumps(data, ensure_ascii=False)}")
response = requests.post(url, headers=headers, json=data, timeout=30)
@ -150,12 +145,80 @@ class AIReplyEngine:
result = response.json()
logger.debug(f"DashScope API响应: {json.dumps(result, ensure_ascii=False)}")
# 提取回复内容
if 'output' in result and 'text' in result['output']:
return result['output']['text'].strip()
else:
raise Exception(f"DashScope API响应格式错误: {result}")
def _call_gemini_api(self, settings: dict, messages: list, max_tokens: int = 100, temperature: float = 0.7) -> str:
"""
调用Google Gemini REST API (v1beta)
"""
api_key = settings['api_key']
model_name = settings['model_name']
url = f"https://generativelanguage.googleapis.com/v1beta/models/{model_name}:generateContent?key={api_key}"
headers = {"Content-Type": "application/json"}
# --- 转换消息格式 (修复 P1-3: 增强健壮性) ---
system_instruction = ""
user_content_parts = []
# 遍历消息,找到 system 和所有的 user parts
for msg in messages:
if msg['role'] == 'system':
system_instruction = msg['content']
elif msg['role'] == 'user':
# 我们只关心 user content
user_content_parts.append(msg['content'])
# 将所有 user parts 合并为最后的 user_content
# 在我们的使用场景中 (generate_reply),只会有一个 user part但这样更安全
user_content = "\n".join(user_content_parts)
if not user_content:
logger.warning(f"Gemini API 调用: 未在消息中找到 'user' 角色内容。Messages: {messages}")
raise ValueError("未在消息中找到用户内容 (user content)")
# --- 消息格式转换结束 ---
payload = {
"contents": [
{
"role": "user",
"parts": [{"text": user_content}]
}
],
"generationConfig": {
"temperature": temperature,
"maxOutputTokens": max_tokens
}
}
if system_instruction:
payload["systemInstruction"] = {
"parts": [{"text": system_instruction}]
}
logger.info(f"Calling Gemini REST API: {url.split('?')[0]}")
logger.debug(f"Gemini Payload: {json.dumps(payload, ensure_ascii=False)}")
response = requests.post(url, headers=headers, json=payload, timeout=30)
if response.status_code != 200:
logger.error(f"Gemini API 请求失败: {response.status_code} - {response.text}")
raise Exception(f"Gemini API 请求失败: {response.status_code} - {response.text}")
result = response.json()
logger.debug(f"Gemini API 响应: {json.dumps(result, ensure_ascii=False)}")
try:
reply_text = result['candidates'][0]['content']['parts'][0]['text']
return reply_text.strip()
except (KeyError, IndexError, TypeError) as e:
logger.error(f"Gemini API 响应格式错误: {result} - {e}")
raise Exception(f"Gemini API 响应格式错误: {result}")
def _call_openai_api(self, client: OpenAI, settings: dict, messages: list, max_tokens: int = 100, temperature: float = 0.7) -> str:
"""调用OpenAI兼容API"""
response = client.chat.completions.create(
@ -172,100 +235,131 @@ class AIReplyEngine:
return settings['ai_enabled']
def detect_intent(self, message: str, cookie_id: str) -> str:
"""检测用户消息意图"""
"""
检测用户消息意图 (基于关键词的本地检测)
修复 P1-1: 移除了AI调用以降低成本和延迟
"""
try:
# 检查AI是否启用如果未启用不应执行任何AI相关逻辑
# 注意:此检查在 generate_reply 的开头已经做过,但保留此处作为第二道防线
settings = db_manager.get_ai_reply_settings(cookie_id)
if not settings['ai_enabled'] or not settings['api_key']:
if not settings['ai_enabled']:
return 'default'
custom_prompts = json.loads(settings['custom_prompts']) if settings['custom_prompts'] else {}
classify_prompt = custom_prompts.get('classify', self.default_prompts['classify'])
msg_lower = message.lower()
# 打印调试信息
logger.info(f"AI设置调试 {cookie_id}: base_url={settings['base_url']}, model={settings['model_name']}")
messages = [
{"role": "system", "content": classify_prompt},
{"role": "user", "content": message}
# 价格相关关键词
price_keywords = [
'便宜', '优惠', '', '降价', '包邮', '价格', '多少钱', '能少', '还能', '最低', '底价',
'实诚价', '到100', '能到', '包个邮', '给个价', '什么价' # <-- 增加这些“口语化”的词
]
# 同样,你也可以通过正则表达式来匹配纯数字,比如 "100" "80"
# 但那可能有点复杂,先加关键词是最小改动
if any(kw in msg_lower for kw in price_keywords):
logger.debug(f"本地意图检测: price ({message})")
return 'price'
# 根据API类型选择调用方式
if self._is_dashscope_api(settings):
logger.info(f"使用DashScope API进行意图检测")
response_text = self._call_dashscope_api(settings, messages, max_tokens=10, temperature=0.1)
else:
logger.info(f"使用OpenAI兼容API进行意图检测")
client = self.get_client(cookie_id)
if not client:
return 'default'
logger.info(f"OpenAI客户端base_url: {client.base_url}")
response_text = self._call_openai_api(client, settings, messages, max_tokens=10, temperature=0.1)
intent = response_text.lower()
if intent in ['price', 'tech', 'default']:
return intent
else:
return 'default'
# 技术相关关键词
tech_keywords = ['怎么用', '参数', '坏了', '故障', '设置', '说明书', '功能', '用法', '教程', '驱动']
if any(kw in msg_lower for kw in tech_keywords):
logger.debug(f"本地意图检测: tech ({message})")
return 'tech'
logger.debug(f"本地意图检测: default ({message})")
return 'default'
except Exception as e:
logger.error(f"意图检测失败 {cookie_id}: {e}")
# 打印更详细的错误信息
if hasattr(e, 'response') and hasattr(e.response, 'url'):
logger.error(f"请求URL: {e.response.url}")
if hasattr(e, 'request') and hasattr(e.request, 'url'):
logger.error(f"请求URL: {e.request.url}")
logger.error(f"本地意图检测失败 {cookie_id}: {e}")
return 'default'
def _get_chat_lock(self, chat_id: str) -> threading.Lock:
"""获取指定chat_id的锁如果不存在则创建"""
with self._chat_locks_lock:
if chat_id not in self._chat_locks:
self._chat_locks[chat_id] = threading.Lock()
return self._chat_locks[chat_id]
def generate_reply(self, message: str, item_info: dict, chat_id: str,
cookie_id: str, user_id: str, item_id: str) -> Optional[str]:
cookie_id: str, user_id: str, item_id: str,
skip_wait: bool = False) -> Optional[str]:
"""生成AI回复"""
if not self.is_ai_enabled(cookie_id):
return None
try:
# 1. 获取AI回复设置
settings = db_manager.get_ai_reply_settings(cookie_id)
# 2. 检测意图
# 先检测意图(用于后续保存)
intent = self.detect_intent(message, cookie_id)
logger.info(f"检测到意图: {intent} (账号: {cookie_id})")
# 在锁外先保存用户消息到数据库,让所有消息都能立即保存
message_created_at = self.save_conversation(chat_id, cookie_id, user_id, item_id, "user", message, intent)
# 如果调用方已经实现了去抖debounce可以通过 skip_wait=True 跳过内部等待
if not skip_wait:
logger.info(f"{cookie_id}】消息已保存等待10秒收集后续消息: {message[:20]}... (时间:{message_created_at})")
# 固定等待10秒等待可能的后续消息在锁外延迟避免阻塞其他消息保存
time.sleep(10)
else:
logger.info(f"{cookie_id}】消息已保存(外部防抖已启用,跳过内部等待): {message[:20]}... (时间:{message_created_at})")
# 获取该chat_id的锁确保同一对话的消息串行处理
chat_lock = self._get_chat_lock(chat_id)
# 使用锁确保同一chat_id的消息串行处理
with chat_lock:
# 获取最近时间窗口内的所有用户消息
# 如果 skip_wait=True外部防抖查询窗口为8秒3秒防抖 + 5秒缓冲
# 如果 skip_wait=False内部等待查询窗口为25秒10秒等待 + 10秒消息间隔 + 5秒缓冲
query_seconds = 8 if skip_wait else 25
recent_messages = self._get_recent_user_messages(chat_id, cookie_id, seconds=query_seconds)
logger.info(f"{cookie_id}】最近{query_seconds}秒内的消息: {[msg['content'][:20] for msg in recent_messages]}")
if recent_messages and len(recent_messages) > 0:
# 只处理最后一条消息(时间戳最新的)
latest_message = recent_messages[-1]
if message_created_at != latest_message['created_at']:
logger.info(f"{cookie_id}】检测到有更新的消息,跳过当前消息: {message[:20]}... (时间:{message_created_at}),最新消息: {latest_message['content'][:20]}... (时间:{latest_message['created_at']})")
return None
else:
logger.info(f"{cookie_id}】当前消息是最新消息,开始处理: {message[:20]}... (时间:{message_created_at})")
# 1. 获取AI回复设置
settings = db_manager.get_ai_reply_settings(cookie_id)
# 3. 获取对话历史
context = self.get_conversation_context(chat_id, cookie_id)
# 3. 获取对话历史
context = self.get_conversation_context(chat_id, cookie_id)
# 4. 获取议价次数
bargain_count = self.get_bargain_count(chat_id, cookie_id)
# 4. 获取议价次数
bargain_count = self.get_bargain_count(chat_id, cookie_id)
# 5. 检查议价轮数限制
if intent == "price":
# 5. 检查议价轮数限制 (P0-1 竞争条件风险点 - 遵照指示未修改)
if intent == "price":
max_bargain_rounds = settings.get('max_bargain_rounds', 3)
if bargain_count >= max_bargain_rounds:
logger.info(f"议价次数已达上限 ({bargain_count}/{max_bargain_rounds}),拒绝继续议价")
refuse_reply = f"抱歉,这个价格已经是最优惠的了,不能再便宜了哦!"
self.save_conversation(chat_id, cookie_id, user_id, item_id, "assistant", refuse_reply, intent)
return refuse_reply
# 6. 构建提示词
custom_prompts = json.loads(settings['custom_prompts']) if settings['custom_prompts'] else {}
system_prompt = custom_prompts.get(intent, self.default_prompts[intent])
# 7. 构建商品信息
item_desc = f"商品标题: {item_info.get('title', '未知')}\n"
item_desc += f"商品价格: {item_info.get('price', '未知')}\n"
item_desc += f"商品描述: {item_info.get('desc', '')}"
# 8. 构建对话历史
context_str = "\n".join([f"{msg['role']}: {msg['content']}" for msg in context[-10:]]) # 最近10条
# 9. 构建用户消息
max_bargain_rounds = settings.get('max_bargain_rounds', 3)
if bargain_count >= max_bargain_rounds:
logger.info(f"议价次数已达上限 ({bargain_count}/{max_bargain_rounds}),拒绝继续议价")
# 返回拒绝议价的回复
refuse_reply = f"抱歉,这个价格已经是最优惠的了,不能再便宜了哦!"
# 保存对话记录
self.save_conversation(chat_id, cookie_id, user_id, item_id, "user", message, intent)
self.save_conversation(chat_id, cookie_id, user_id, item_id, "assistant", refuse_reply, intent)
return refuse_reply
max_discount_percent = settings.get('max_discount_percent', 10)
max_discount_amount = settings.get('max_discount_amount', 100)
# 6. 构建提示词
custom_prompts = json.loads(settings['custom_prompts']) if settings['custom_prompts'] else {}
system_prompt = custom_prompts.get(intent, self.default_prompts[intent])
# 7. 构建商品信息
item_desc = f"商品标题: {item_info.get('title', '未知')}\n"
item_desc += f"商品价格: {item_info.get('price', '未知')}\n"
item_desc += f"商品描述: {item_info.get('desc', '')}"
# 8. 构建对话历史
context_str = "\n".join([f"{msg['role']}: {msg['content']}" for msg in context[-10:]]) # 最近10条
# 9. 构建用户消息
max_bargain_rounds = settings.get('max_bargain_rounds', 3)
max_discount_percent = settings.get('max_discount_percent', 10)
max_discount_amount = settings.get('max_discount_amount', 100)
user_prompt = f"""商品信息:
user_prompt = f"""商品信息:
{item_desc}
对话历史
@ -281,42 +375,63 @@ class AIReplyEngine:
请根据以上信息生成回复"""
# 10. 调用AI生成回复
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
# 10. 调用AI生成回复
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
# 根据API类型选择调用方式
if self._is_dashscope_api(settings):
logger.info(f"使用DashScope API生成回复")
reply = self._call_dashscope_api(settings, messages, max_tokens=100, temperature=0.7)
else:
logger.info(f"使用OpenAI兼容API生成回复")
client = self.get_client(cookie_id)
if not client:
return None
reply = self._call_openai_api(client, settings, messages, max_tokens=100, temperature=0.7)
reply = None # 初始化 reply 变量
# 11. 保存对话记录
self.save_conversation(chat_id, cookie_id, user_id, item_id, "user", message, intent)
self.save_conversation(chat_id, cookie_id, user_id, item_id, "assistant", reply, intent)
if self._is_dashscope_api(settings):
logger.info(f"使用DashScope API生成回复")
reply = self._call_dashscope_api(settings, messages, max_tokens=100, temperature=0.7)
elif self._is_gemini_api(settings):
logger.info(f"使用Gemini API生成回复")
reply = self._call_gemini_api(settings, messages, max_tokens=100, temperature=0.7)
else:
logger.info(f"使用OpenAI兼容API生成回复")
# 修复 P0-2: 调用已修改的无状态客户端创建方法
client = self._create_openai_client(cookie_id)
if not client:
return None
logger.info(f"messages:{messages}")
reply = self._call_openai_api(client, settings, messages, max_tokens=100, temperature=0.7)
# 12. 更新议价次数
if intent == "price":
self.increment_bargain_count(chat_id, cookie_id)
logger.info(f"AI回复生成成功 (账号: {cookie_id}): {reply}")
return reply
# 11. 保存AI回复到对话记录
self.save_conversation(chat_id, cookie_id, user_id, item_id, "assistant", reply, intent)
# 12. 更新议价次数 (此方法已在 get_bargain_count 中通过 SQL COUNT(*) 隐式实现)
if intent == "price":
# self.increment_bargain_count(chat_id, cookie_id) # 此行原先就没有,保持不变
pass
logger.info(f"AI回复生成成功 (账号: {cookie_id}): {reply}")
return reply
except Exception as e:
logger.error(f"AI回复生成失败 {cookie_id}: {e}")
# 打印更详细的错误信息
if hasattr(e, 'response') and hasattr(e.response, 'url'):
logger.error(f"请求URL: {e.response.url}")
if hasattr(e, 'request') and hasattr(e.request, 'url'):
logger.error(f"请求URL: {e.request.url}")
return None
async def generate_reply_async(self, message: str, item_info: dict, chat_id: str,
cookie_id: str, user_id: str, item_id: str,
skip_wait: bool = False) -> Optional[str]:
"""
异步包装器在独立线程池中执行同步的 `generate_reply`并返回结果
这样可以在异步代码中直接 await而不阻塞事件循环
"""
try:
import asyncio as _asyncio
return await _asyncio.to_thread(self.generate_reply, message, item_info, chat_id, cookie_id, user_id, item_id, skip_wait)
except Exception as e:
logger.error(f"异步生成回复失败: {e}")
return None
def get_conversation_context(self, chat_id: str, cookie_id: str, limit: int = 20) -> List[Dict]:
"""获取对话上下文"""
@ -330,7 +445,6 @@ class AIReplyEngine:
''', (chat_id, cookie_id, limit))
results = cursor.fetchall()
# 反转顺序,使其按时间正序
context = [{"role": row[0], "content": row[1]} for row in reversed(results)]
return context
except Exception as e:
@ -338,8 +452,8 @@ class AIReplyEngine:
return []
def save_conversation(self, chat_id: str, cookie_id: str, user_id: str,
item_id: str, role: str, content: str, intent: str = None):
"""保存对话记录"""
item_id: str, role: str, content: str, intent: str = None) -> Optional[str]:
"""保存对话记录,返回创建时间"""
try:
with db_manager.lock:
cursor = db_manager.conn.cursor()
@ -349,9 +463,17 @@ class AIReplyEngine:
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (cookie_id, chat_id, user_id, item_id, role, content, intent))
db_manager.conn.commit()
# 获取刚插入记录的created_at
cursor.execute('''
SELECT created_at FROM ai_conversations
WHERE rowid = last_insert_rowid()
''')
result = cursor.fetchone()
return result[0] if result else None
except Exception as e:
logger.error(f"保存对话记录失败: {e}")
return None
def get_bargain_count(self, chat_id: str, cookie_id: str) -> int:
"""获取议价次数"""
try:
@ -368,51 +490,53 @@ class AIReplyEngine:
logger.error(f"获取议价次数失败: {e}")
return 0
def _get_recent_user_messages(self, chat_id: str, cookie_id: str, seconds: int = 2) -> List[Dict]:
"""获取最近seconds秒内的所有用户消息包含内容和时间戳"""
try:
with db_manager.lock:
cursor = db_manager.conn.cursor()
# 先查询所有该chat的user消息用于调试
cursor.execute('''
SELECT content, created_at,
julianday('now') - julianday(created_at) as time_diff_days,
(julianday('now') - julianday(created_at)) * 86400.0 as time_diff_seconds
FROM ai_conversations
WHERE chat_id = ? AND cookie_id = ? AND role = 'user'
ORDER BY created_at DESC LIMIT 10
''', (chat_id, cookie_id))
all_messages = cursor.fetchall()
logger.info(f"【调试】chat_id={chat_id} 最近10条user消息: {[(msg[0][:10], msg[1], f'{msg[3]:.2f}秒前') for msg in all_messages]}")
# 正式查询
cursor.execute('''
SELECT content, created_at FROM ai_conversations
WHERE chat_id = ? AND cookie_id = ? AND role = 'user'
AND julianday('now') - julianday(created_at) < (? / 86400.0)
ORDER BY created_at ASC
''', (chat_id, cookie_id, seconds))
results = cursor.fetchall()
return [{"content": row[0], "created_at": row[1]} for row in results]
except Exception as e:
logger.error(f"获取最近用户消息列表失败: {e}")
return []
def increment_bargain_count(self, chat_id: str, cookie_id: str):
"""增加议价次数(通过保存记录自动增加)"""
# 议价次数通过查询price意图的用户消息数量来计算无需单独操作
"""(此方法已废弃,通过 get_bargain_count 的 SQL 查询实现)"""
pass
def clear_client_cache(self, cookie_id: str = None):
"""清理客户端缓存"""
if cookie_id:
self.clients.pop(cookie_id, None)
self.client_last_used.pop(cookie_id, None)
logger.info(f"清理账号 {cookie_id} 的客户端缓存")
else:
self.clients.clear()
self.client_last_used.clear()
logger.info("清理所有客户端缓存")
#
# --- 修复 P0-2: 移除所有有状态的缓存管理方法 ---
#
def cleanup_unused_clients(self, max_idle_hours: int = 24):
"""清理长时间未使用的客户端(防止内存泄漏)
Args:
max_idle_hours: 最大空闲时间小时默认24小时
"""
try:
current_time = time.time()
max_idle_seconds = max_idle_hours * 3600
# 找出超过最大空闲时间的客户端
expired_clients = [
cookie_id for cookie_id, last_used in self.client_last_used.items()
if current_time - last_used > max_idle_seconds
]
# 清理过期客户端
for cookie_id in expired_clients:
self.clients.pop(cookie_id, None)
self.client_last_used.pop(cookie_id, None)
self.agents.pop(cookie_id, None)
if expired_clients:
logger.info(f"AI回复引擎清理了 {len(expired_clients)} 个长时间未使用的客户端")
logger.debug(f"清理的账号: {expired_clients}")
logger.debug(f"当前活跃客户端数量: {len(self.clients)}")
except Exception as e:
logger.error(f"AI回复引擎清理未使用客户端时出错: {e}")
# def clear_client_cache(self, cookie_id: str = None):
# """(已移除) 清理客户端缓存"""
# pass
# def cleanup_unused_clients(self, max_idle_hours: int = 24):
# """(已移除) 清理长时间未使用的客户端"""
# pass
# 全局AI回复引擎实例

318
api_captcha_remote.py Normal file
View File

@ -0,0 +1,318 @@
"""
刮刮乐远程控制 API 路由
提供 WebSocket HTTP 接口用于远程操作滑块验证
"""
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.responses import HTMLResponse, FileResponse
from pydantic import BaseModel
from typing import Optional, List
import asyncio
import os
from loguru import logger
from utils.captcha_remote_control import captcha_controller
# 创建路由器
router = APIRouter(prefix="/api/captcha", tags=["captcha"])
class MouseEvent(BaseModel):
"""鼠标事件模型"""
session_id: str
event_type: str # down, move, up
x: int
y: int
class SessionCheckRequest(BaseModel):
"""会话检查请求"""
session_id: str
# =============================================================================
# WebSocket 端点 - 实时通信
# =============================================================================
@router.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: str):
"""
WebSocket 连接用于实时传输截图和接收鼠标事件
"""
await websocket.accept()
logger.info(f"🔌 WebSocket 连接建立: {session_id}")
# 注册 WebSocket 连接
captcha_controller.websocket_connections[session_id] = websocket
try:
# 发送初始会话信息
if session_id in captcha_controller.active_sessions:
session_data = captcha_controller.active_sessions[session_id]
await websocket.send_json({
'type': 'session_info',
'screenshot': session_data['screenshot'],
'captcha_info': session_data['captcha_info'],
'viewport': session_data['viewport']
})
# 不启动自动刷新,改为只在操作时更新(极速优化)
# refresh_task = asyncio.create_task(
# captcha_controller.auto_refresh_screenshot(session_id, interval=1.5)
# )
else:
await websocket.send_json({
'type': 'error',
'message': '会话不存在'
})
await websocket.close()
return
# 持续接收客户端消息
while True:
data = await websocket.receive_json()
msg_type = data.get('type')
if msg_type == 'mouse_event':
# 处理鼠标事件
event_type = data.get('event_type')
x = data.get('x')
y = data.get('y')
success = await captcha_controller.handle_mouse_event(
session_id, event_type, x, y
)
if success:
# 只在鼠标释放后才检查完成状态
if event_type == 'up':
# 等待页面更新(给验证码一些反应时间)
await asyncio.sleep(1.0)
# 多次确认滑块确实消失
completed = await captcha_controller.check_completion(session_id)
if completed:
# 再次确认(避免误判)
await asyncio.sleep(0.5)
completed = await captcha_controller.check_completion(session_id)
if completed:
await websocket.send_json({
'type': 'completed',
'message': '验证成功!'
})
logger.success(f"✅ 验证完成: {session_id}")
break
else:
# 更新截图显示验证结果
screenshot = await captcha_controller.update_screenshot(session_id)
if screenshot:
await websocket.send_json({
'type': 'screenshot_update',
'screenshot': screenshot
})
else:
# 按下或移动时,实时更新截图(截取整个验证码容器)
if event_type in ['down', 'move']:
# 截取整个验证码容器,降低质量换取速度
screenshot = await captcha_controller.update_screenshot(session_id, quality=30)
if screenshot:
await websocket.send_json({
'type': 'screenshot_update',
'screenshot': screenshot
})
elif msg_type == 'check_completion':
# 手动检查完成状态
completed = await captcha_controller.check_completion(session_id)
await websocket.send_json({
'type': 'completion_status',
'completed': completed
})
if completed:
break
elif msg_type == 'ping':
# 心跳
await websocket.send_json({'type': 'pong'})
except WebSocketDisconnect:
logger.info(f"🔌 WebSocket 连接断开: {session_id}")
except Exception as e:
logger.error(f"❌ WebSocket 错误: {e}")
import traceback
logger.error(traceback.format_exc())
finally:
# 清理
if session_id in captcha_controller.websocket_connections:
del captcha_controller.websocket_connections[session_id]
logger.info(f"🔒 WebSocket 会话结束: {session_id}")
# =============================================================================
# HTTP 端点 - REST API
# =============================================================================
@router.get("/sessions")
async def get_active_sessions():
"""获取所有活跃的验证会话"""
sessions = []
for session_id, data in captcha_controller.active_sessions.items():
sessions.append({
'session_id': session_id,
'completed': data.get('completed', False),
'has_websocket': session_id in captcha_controller.websocket_connections
})
return {
'count': len(sessions),
'sessions': sessions
}
@router.get("/session/{session_id}")
async def get_session_info(session_id: str):
"""获取指定会话的信息"""
if session_id not in captcha_controller.active_sessions:
raise HTTPException(status_code=404, detail="会话不存在")
session_data = captcha_controller.active_sessions[session_id]
return {
'session_id': session_id,
'screenshot': session_data['screenshot'],
'captcha_info': session_data['captcha_info'],
'viewport': session_data['viewport'],
'completed': session_data.get('completed', False)
}
@router.get("/screenshot/{session_id}")
async def get_screenshot(session_id: str):
"""获取最新截图"""
screenshot = await captcha_controller.update_screenshot(session_id)
if not screenshot:
raise HTTPException(status_code=404, detail="无法获取截图")
return {'screenshot': screenshot}
@router.post("/mouse_event")
async def handle_mouse_event(event: MouseEvent):
"""处理鼠标事件HTTP方式不推荐建议使用WebSocket"""
success = await captcha_controller.handle_mouse_event(
event.session_id,
event.event_type,
event.x,
event.y
)
if not success:
raise HTTPException(status_code=400, detail="处理失败")
# 检查是否完成
completed = await captcha_controller.check_completion(event.session_id)
return {
'success': True,
'completed': completed
}
@router.post("/check_completion")
async def check_completion(request: SessionCheckRequest):
"""检查验证是否完成"""
completed = await captcha_controller.check_completion(request.session_id)
return {
'session_id': request.session_id,
'completed': completed
}
@router.delete("/session/{session_id}")
async def close_session(session_id: str):
"""关闭会话"""
await captcha_controller.close_session(session_id)
return {'success': True}
# =============================================================================
# 前端页面
# =============================================================================
@router.get("/status/{session_id}")
async def get_captcha_status(session_id: str):
"""
获取验证状态
用于前端轮询检查验证是否完成
"""
try:
is_completed = captcha_controller.is_completed(session_id)
session_exists = captcha_controller.session_exists(session_id)
return {
"success": True,
"completed": is_completed,
"session_exists": session_exists,
"session_id": session_id
}
except Exception as e:
logger.error(f"获取验证状态失败: {e}")
return {
"success": False,
"completed": False,
"session_exists": False,
"session_id": session_id,
"error": str(e)
}
@router.get("/control", response_class=HTMLResponse)
async def captcha_control_page():
"""返回滑块控制页面"""
html_file = "captcha_control.html"
if os.path.exists(html_file):
return FileResponse(html_file, media_type="text/html")
else:
# 返回简单的提示页面
return HTMLResponse(content="""
<!DOCTYPE html>
<html>
<head>
<title>验证码控制面板</title>
</head>
<body>
<h1>验证码控制面板</h1>
<p>前端页面文件 captcha_control.html 不存在</p>
<p>请查看文档了解如何创建前端页面</p>
</body>
</html>
""")
@router.get("/control/{session_id}", response_class=HTMLResponse)
async def captcha_control_page_with_session(session_id: str):
"""返回带会话ID的滑块控制页面"""
html_file = "captcha_control.html"
if os.path.exists(html_file):
with open(html_file, 'r', encoding='utf-8') as f:
html_content = f.read()
# 注入会话ID
html_content = html_content.replace(
'</body>',
f'<script>window.INITIAL_SESSION_ID = "{session_id}";</script></body>'
)
return HTMLResponse(content=html_content)
else:
raise HTTPException(status_code=404, detail="前端页面不存在")

571
captcha_control.html Normal file
View File

@ -0,0 +1,571 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>刮刮乐验证控制面板</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
min-height: 100vh;
display: flex;
justify-content: center;
align-items: center;
padding: 20px;
}
.container {
background: white;
border-radius: 16px;
box-shadow: 0 20px 60px rgba(0, 0, 0, 0.3);
padding: 30px;
max-width: 900px;
width: 100%;
}
.header {
text-align: center;
margin-bottom: 30px;
}
.header h1 {
color: #333;
font-size: 28px;
margin-bottom: 10px;
}
.status {
display: flex;
align-items: center;
justify-content: center;
gap: 10px;
padding: 10px;
border-radius: 8px;
margin-bottom: 20px;
}
.status.connected {
background: #d4edda;
color: #155724;
}
.status.disconnected {
background: #f8d7da;
color: #721c24;
}
.status.completed {
background: #d1ecf1;
color: #0c5460;
}
.status-indicator {
width: 12px;
height: 12px;
border-radius: 50%;
animation: pulse 2s infinite;
}
.status.connected .status-indicator {
background: #28a745;
}
.status.disconnected .status-indicator {
background: #dc3545;
}
.status.completed .status-indicator {
background: #17a2b8;
}
@keyframes pulse {
0%, 100% { opacity: 1; }
50% { opacity: 0.5; }
}
.canvas-container {
position: relative;
background: #f8f9fa;
border: 2px solid #dee2e6;
border-radius: 8px;
overflow: hidden;
margin-bottom: 20px;
max-height: 600px;
display: flex;
justify-content: center;
align-items: center;
}
#captchaCanvas {
max-width: 100%;
display: block;
cursor: crosshair;
}
.loading {
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
text-align: center;
}
.spinner {
border: 4px solid #f3f3f3;
border-top: 4px solid #667eea;
border-radius: 50%;
width: 50px;
height: 50px;
animation: spin 1s linear infinite;
margin: 0 auto 10px;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
.info-panel {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 15px;
margin-bottom: 20px;
}
.info-card {
background: #f8f9fa;
padding: 15px;
border-radius: 8px;
border-left: 4px solid #667eea;
}
.info-card h3 {
color: #666;
font-size: 12px;
text-transform: uppercase;
margin-bottom: 5px;
}
.info-card p {
color: #333;
font-size: 18px;
font-weight: bold;
}
.instructions {
background: #fff3cd;
border: 1px solid #ffc107;
border-radius: 8px;
padding: 15px;
margin-bottom: 20px;
}
.instructions h3 {
color: #856404;
margin-bottom: 10px;
font-size: 16px;
}
.instructions ol {
margin-left: 20px;
color: #856404;
}
.instructions li {
margin-bottom: 5px;
}
.session-input {
display: flex;
gap: 10px;
margin-bottom: 20px;
}
.session-input input {
flex: 1;
padding: 12px;
border: 2px solid #dee2e6;
border-radius: 8px;
font-size: 14px;
}
.session-input button {
padding: 12px 24px;
background: #667eea;
color: white;
border: none;
border-radius: 8px;
font-size: 14px;
font-weight: bold;
cursor: pointer;
transition: background 0.3s;
}
.session-input button:hover {
background: #5568d3;
}
.session-input button:disabled {
background: #ccc;
cursor: not-allowed;
}
.log {
background: #2d3748;
color: #a0aec0;
border-radius: 8px;
padding: 15px;
max-height: 200px;
overflow-y: auto;
font-family: 'Courier New', monospace;
font-size: 12px;
}
.log-entry {
margin-bottom: 5px;
}
.log-entry.success {
color: #48bb78;
}
.log-entry.error {
color: #f56565;
}
.log-entry.info {
color: #4299e1;
}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🎨 刮刮乐验证控制面板</h1>
</div>
<div id="status" class="status disconnected">
<div class="status-indicator"></div>
<span id="statusText">未连接</span>
</div>
<div class="session-input">
<input type="text" id="sessionIdInput" placeholder="输入会话ID通常是用户ID" />
<button id="connectBtn" onclick="connectSession()">连接</button>
</div>
<div class="instructions">
<h3>📋 使用说明</h3>
<ol>
<li>输入会话ID并点击"连接"按钮</li>
<li>等待页面截图加载</li>
<li>使用鼠标在滑块按钮上按下并拖动</li>
<li>释放鼠标完成验证</li>
</ol>
</div>
<div class="info-panel">
<div class="info-card">
<h3>会话ID</h3>
<p id="sessionIdDisplay">-</p>
</div>
<div class="info-card">
<h3>连接状态</h3>
<p id="connectionStatus">未连接</p>
</div>
<div class="info-card">
<h3>验证状态</h3>
<p id="verificationStatus">待处理</p>
</div>
</div>
<div class="canvas-container" id="canvasContainer">
<div class="loading" id="loading">
<div class="spinner"></div>
<p>加载中...</p>
</div>
<canvas id="captchaCanvas"></canvas>
</div>
<div class="log" id="log">
<div class="log-entry info">等待连接...</div>
</div>
</div>
<script>
let ws = null;
let currentSessionId = null;
let canvas = null;
let ctx = null;
let isMouseDown = false;
let lastMoveTime = 0;
let moveThrottle = 5; // 节流每10ms最多发送一次极致流畅
let captchaOffset = {x: 0, y: 0}; // 滑块区域在页面中的偏移量
// 初始化
document.addEventListener('DOMContentLoaded', () => {
canvas = document.getElementById('captchaCanvas');
ctx = canvas.getContext('2d');
// 检查是否为嵌入模式从URL参数或iframe检测
const urlParams = new URLSearchParams(window.location.search);
const isEmbed = urlParams.get('embed') === '1' || window.self !== window.top;
if (isEmbed) {
// 隐藏不必要的元素
document.querySelector('.header').style.display = 'none';
document.querySelector('.session-input').style.display = 'none';
document.querySelector('.instructions').style.display = 'none';
document.querySelector('.info-panel').style.display = 'none';
document.querySelector('.log').style.display = 'none';
document.getElementById('status').style.display = 'none';
// 调整容器样式
document.querySelector('.container').style.padding = '0';
document.querySelector('.container').style.boxShadow = 'none';
document.querySelector('.container').style.background = 'transparent';
document.body.style.padding = '0';
}
// 检查是否有预设的会话ID
if (window.INITIAL_SESSION_ID) {
document.getElementById('sessionIdInput').value = window.INITIAL_SESSION_ID;
connectSession();
}
// 鼠标事件
canvas.addEventListener('mousedown', handleMouseDown);
canvas.addEventListener('mousemove', handleMouseMove);
canvas.addEventListener('mouseup', handleMouseUp);
canvas.addEventListener('mouseleave', handleMouseUp);
});
function log(message, type = 'info') {
const logEl = document.getElementById('log');
const entry = document.createElement('div');
entry.className = `log-entry ${type}`;
const timestamp = new Date().toLocaleTimeString();
entry.textContent = `[${timestamp}] ${message}`;
logEl.appendChild(entry);
logEl.scrollTop = logEl.scrollHeight;
}
function updateStatus(status, text) {
const statusEl = document.getElementById('status');
const statusTextEl = document.getElementById('statusText');
statusEl.className = `status ${status}`;
statusTextEl.textContent = text;
document.getElementById('connectionStatus').textContent = text;
}
async function connectSession() {
const sessionId = document.getElementById('sessionIdInput').value.trim();
if (!sessionId) {
alert('请输入会话ID');
return;
}
currentSessionId = sessionId;
document.getElementById('sessionIdDisplay').textContent = sessionId;
document.getElementById('connectBtn').disabled = true;
log(`正在连接会话: ${sessionId}`, 'info');
try {
// 建立 WebSocket 连接
const wsUrl = `ws://${window.location.host}/api/captcha/ws/${sessionId}`;
log(`WebSocket URL: ${wsUrl}`, 'info');
ws = new WebSocket(wsUrl);
ws.onopen = () => {
log('WebSocket 连接成功', 'success');
updateStatus('connected', '已连接');
};
ws.onmessage = async (event) => {
const data = JSON.parse(event.data);
handleWebSocketMessage(data);
};
ws.onerror = (error) => {
log(`WebSocket 错误: ${error}`, 'error');
updateStatus('disconnected', '连接错误');
document.getElementById('connectBtn').disabled = false;
};
ws.onclose = () => {
log('WebSocket 连接关闭', 'info');
updateStatus('disconnected', '连接已关闭');
document.getElementById('connectBtn').disabled = false;
};
} catch (error) {
log(`连接失败: ${error}`, 'error');
updateStatus('disconnected', '连接失败');
document.getElementById('connectBtn').disabled = false;
}
}
function handleWebSocketMessage(data) {
const type = data.type;
if (type === 'session_info') {
log('收到会话信息', 'success');
// 保存验证码容器偏移量
if (data.captcha_info && data.captcha_info.x !== undefined) {
captchaOffset.x = Math.max(0, data.captcha_info.x - 10);
captchaOffset.y = Math.max(0, data.captcha_info.y - 10);
log(`验证码容器偏移: (${captchaOffset.x}, ${captchaOffset.y})`, 'info');
log(`容器大小: ${data.captcha_info.width}x${data.captcha_info.height}`, 'info');
}
displayScreenshot(data.screenshot);
}
else if (type === 'screenshot_update') {
displayScreenshot(data.screenshot);
// 如果是在等待验证结果时收到截图更新,说明验证失败
const currentStatus = document.getElementById('verificationStatus').textContent;
if (currentStatus === '验证中...') {
log('⚠️ 验证未通过,请重试', 'error');
document.getElementById('verificationStatus').textContent = '待处理';
}
}
else if (type === 'completed') {
log('✅ 验证成功!', 'success');
updateStatus('completed', '验证成功');
document.getElementById('verificationStatus').textContent = '已完成';
// 显示成功提示
setTimeout(() => {
alert('✅ 验证成功!页面即将关闭...');
if (ws) {
ws.close();
}
}, 500);
}
else if (type === 'error') {
log(`错误: ${data.message}`, 'error');
}
}
function displayScreenshot(base64Image) {
const img = new Image();
img.onload = () => {
// 调整 canvas 大小(只在必要时调整)
if (canvas.width !== img.width || canvas.height !== img.height) {
canvas.width = img.width;
canvas.height = img.height;
}
// 使用 imageSmoothingEnabled 优化渲染
ctx.imageSmoothingEnabled = true;
ctx.imageSmoothingQuality = 'low'; // 低质量平滑,提升性能
// 绘制图片
ctx.drawImage(img, 0, 0);
// 隐藏加载提示
document.getElementById('loading').style.display = 'none';
canvas.style.display = 'block';
};
// 使用 JPEG 格式(后端已改为 JPEG
img.src = `data:image/jpeg;base64,${base64Image}`;
}
function handleMouseDown(event) {
if (!ws || ws.readyState !== WebSocket.OPEN) {
return;
}
isMouseDown = true;
const coords = getMouseCoords(event);
log(`鼠标按下: (${coords.x}, ${coords.y})`, 'info');
ws.send(JSON.stringify({
type: 'mouse_event',
event_type: 'down',
x: coords.x,
y: coords.y
}));
}
function handleMouseMove(event) {
if (!ws || ws.readyState !== WebSocket.OPEN || !isMouseDown) {
return;
}
// 节流:限制发送频率,避免卡顿
const now = Date.now();
if (now - lastMoveTime < moveThrottle) {
return; // 跳过这次事件
}
lastMoveTime = now;
const coords = getMouseCoords(event);
ws.send(JSON.stringify({
type: 'mouse_event',
event_type: 'move',
x: coords.x,
y: coords.y
}));
}
function handleMouseUp(event) {
if (!ws || ws.readyState !== WebSocket.OPEN || !isMouseDown) {
return;
}
isMouseDown = false;
const coords = getMouseCoords(event);
log(`鼠标释放: (${coords.x}, ${coords.y})`, 'info');
log(`等待验证结果...`, 'info');
// 更新状态显示
document.getElementById('verificationStatus').textContent = '验证中...';
ws.send(JSON.stringify({
type: 'mouse_event',
event_type: 'up',
x: coords.x,
y: coords.y
}));
}
function getMouseCoords(event) {
const rect = canvas.getBoundingClientRect();
const scaleX = canvas.width / rect.width;
const scaleY = canvas.height / rect.height;
// 计算canvas上的坐标
const canvasX = Math.round((event.clientX - rect.left) * scaleX);
const canvasY = Math.round((event.clientY - rect.top) * scaleY);
// 转换为页面上的实际坐标(加上偏移量)
return {
x: canvasX + captchaOffset.x,
y: canvasY + captchaOffset.y
};
}
</script>
</body>
</html>

View File

@ -91,7 +91,7 @@ config = Config()
# 导出常用配置项
COOKIES_STR = config.get('COOKIES.value', '')
COOKIES_LAST_UPDATE = config.get('COOKIES.last_update_time', '')
WEBSOCKET_URL = config.get('WEBSOCKET_URL', 'wss://wss-goofish.dingtalk.com/')
WEBSOCKET_URL = config.get('WEBSOCKET_URL', '://wss-goofish.dingtalk.com/')
HEARTBEAT_INTERVAL = config.get('HEARTBEAT_INTERVAL', 15)
HEARTBEAT_TIMEOUT = config.get('HEARTBEAT_TIMEOUT', 30)
TOKEN_REFRESH_INTERVAL = config.get('TOKEN_REFRESH_INTERVAL', 72000)

View File

@ -17,6 +17,7 @@ class CookieManager:
self.keywords: Dict[str, List[Tuple[str, str]]] = {}
self.cookie_status: Dict[str, bool] = {} # 账号启用状态
self.auto_confirm_settings: Dict[str, bool] = {} # 自动确认发货设置
self._task_locks: Dict[str, asyncio.Lock] = {} # 每个cookie_id的任务锁防止重复创建
self._load_from_db()
def _load_from_db(self):
@ -69,51 +70,115 @@ class CookieManager:
logger.info(f"{cookie_id}】Cookie值长度: {len(cookie_value)}")
live = XianyuLive(cookie_value, cookie_id=cookie_id, user_id=user_id)
logger.info(f"{cookie_id}】XianyuLive实例创建成功开始调用main()...")
# 强制刷新日志,确保日志被写入
try:
import sys
sys.stdout.flush()
except:
pass
await live.main()
# main() 正常退出不应该发生因为main()内部有无限循环)
logger.warning(f"{cookie_id}】XianyuLive.main() 正常退出(这通常不应该发生)")
except asyncio.CancelledError:
logger.info(f"XianyuLive 任务已取消: {cookie_id}")
logger.info(f"{cookie_id}】XianyuLive 任务已取消")
# 强制刷新日志
try:
import sys
sys.stdout.flush()
except:
pass
except Exception as e:
logger.error(f"XianyuLive 任务异常({cookie_id}): {e}")
logger.error(f"{cookie_id}】XianyuLive 任务异常: {e}")
import traceback
logger.error(f"详细错误信息: {traceback.format_exc()}")
logger.error(f"{cookie_id}】详细错误信息:\n{traceback.format_exc()}")
# 强制刷新日志
try:
import sys
sys.stdout.flush()
except:
pass
finally:
logger.info(f"{cookie_id}】_run_xianyu方法执行结束")
# 确保日志被刷新
try:
import sys
sys.stdout.flush()
except:
pass
async def _add_cookie_async(self, cookie_id: str, cookie_value: str, user_id: int = None):
if cookie_id in self.tasks:
raise ValueError("Cookie ID already exists")
self.cookies[cookie_id] = cookie_value
# 保存到数据库如果没有指定user_id则保持原有绑定关系
db_manager.save_cookie(cookie_id, cookie_value, user_id)
# 获取或创建该cookie_id的锁
if cookie_id not in self._task_locks:
self._task_locks[cookie_id] = asyncio.Lock()
async with self._task_locks[cookie_id]:
# 检查是否已存在任务
if cookie_id in self.tasks:
existing_task = self.tasks[cookie_id]
# 检查任务是否还在运行
if not existing_task.done():
logger.warning(f"{cookie_id}】任务已存在且正在运行,先停止旧任务...")
existing_task.cancel()
try:
await existing_task
except asyncio.CancelledError:
pass
except Exception as e:
logger.error(f"等待旧任务停止时出错: {cookie_id}, {e}")
# 从字典中移除
self.tasks.pop(cookie_id, None)
logger.info(f"{cookie_id}】旧任务已停止")
else:
# 任务已完成,直接移除
self.tasks.pop(cookie_id, None)
logger.info(f"{cookie_id}】旧任务已完成,已移除")
self.cookies[cookie_id] = cookie_value
# 保存到数据库如果没有指定user_id则保持原有绑定关系
db_manager.save_cookie(cookie_id, cookie_value, user_id)
# 获取实际保存的user_id如果没有指定数据库会返回实际的user_id
actual_user_id = user_id
if actual_user_id is None:
# 从数据库获取Cookie对应的user_id
cookie_info = db_manager.get_cookie_details(cookie_id)
if cookie_info:
actual_user_id = cookie_info.get('user_id')
# 获取实际保存的user_id如果没有指定数据库会返回实际的user_id
actual_user_id = user_id
if actual_user_id is None:
# 从数据库获取Cookie对应的user_id
cookie_info = db_manager.get_cookie_details(cookie_id)
if cookie_info:
actual_user_id = cookie_info.get('user_id')
task = self.loop.create_task(self._run_xianyu(cookie_id, cookie_value, actual_user_id))
self.tasks[cookie_id] = task
logger.info(f"已启动账号任务: {cookie_id} (用户ID: {actual_user_id})")
task = self.loop.create_task(self._run_xianyu(cookie_id, cookie_value, actual_user_id))
self.tasks[cookie_id] = task
logger.info(f"已启动账号任务: {cookie_id} (用户ID: {actual_user_id})")
async def _remove_cookie_async(self, cookie_id: str):
task = self.tasks.pop(cookie_id, None)
if task:
task.cancel()
try:
# 等待任务完全清理,确保资源释放
await task
except asyncio.CancelledError:
# 任务被取消是预期行为
pass
except Exception as e:
logger.error(f"等待任务清理时出错: {cookie_id}, {e}")
# 获取或创建该cookie_id的锁
if cookie_id not in self._task_locks:
self._task_locks[cookie_id] = asyncio.Lock()
self.cookies.pop(cookie_id, None)
self.keywords.pop(cookie_id, None)
# 从数据库删除
db_manager.delete_cookie(cookie_id)
logger.info(f"已移除账号: {cookie_id}")
async with self._task_locks[cookie_id]:
task = self.tasks.pop(cookie_id, None)
if task:
task.cancel()
try:
# 等待任务完全清理,确保资源释放
await asyncio.wait_for(task, timeout=10.0)
except asyncio.TimeoutError:
logger.warning(f"{cookie_id}】等待任务停止超时10秒强制继续")
except asyncio.CancelledError:
# 任务被取消是预期行为
pass
except Exception as e:
logger.error(f"等待任务清理时出错: {cookie_id}, {e}")
self.cookies.pop(cookie_id, None)
self.keywords.pop(cookie_id, None)
# 清理锁
self._task_locks.pop(cookie_id, None)
# 从数据库删除
db_manager.delete_cookie(cookie_id)
logger.info(f"已移除账号: {cookie_id}")
# ------------------------ 对外线程安全接口 ------------------------
def add_cookie(self, cookie_id: str, cookie_value: str, kw_list: Optional[List[Tuple[str, str]]] = None, user_id: int = None):
@ -156,50 +221,60 @@ class CookieManager:
save_to_db: 是否保存到数据库默认True当API层已经更新数据库时应设为False避免覆盖其他字段
"""
async def _update():
# 获取原有的user_id和关键词
original_user_id = None
original_keywords = []
original_status = True
cookie_info = db_manager.get_cookie_details(cookie_id)
if cookie_info:
original_user_id = cookie_info.get('user_id')
# 保存原有的关键词和状态
if cookie_id in self.keywords:
original_keywords = self.keywords[cookie_id].copy()
if cookie_id in self.cookie_status:
original_status = self.cookie_status[cookie_id]
# 先移除任务(但不删除数据库记录)
task = self.tasks.pop(cookie_id, None)
if task:
task.cancel()
try:
# 等待任务完全清理,确保资源释放
await task
except asyncio.CancelledError:
# 任务被取消是预期行为
pass
except Exception as e:
logger.error(f"等待任务清理时出错: {cookie_id}, {e}")
# 更新Cookie值
self.cookies[cookie_id] = new_value
# 获取或创建该cookie_id的锁
if cookie_id not in self._task_locks:
self._task_locks[cookie_id] = asyncio.Lock()
# 只有在需要时才保存到数据库避免覆盖其他字段如pause_duration、remark等
if save_to_db:
db_manager.save_cookie(cookie_id, new_value, original_user_id)
async with self._task_locks[cookie_id]:
# 获取原有的user_id和关键词
original_user_id = None
original_keywords = []
original_status = True
# 恢复关键词和状态
self.keywords[cookie_id] = original_keywords
self.cookie_status[cookie_id] = original_status
cookie_info = db_manager.get_cookie_details(cookie_id)
if cookie_info:
original_user_id = cookie_info.get('user_id')
# 重新启动任务
task = self.loop.create_task(self._run_xianyu(cookie_id, new_value, original_user_id))
self.tasks[cookie_id] = task
# 保存原有的关键词和状态
if cookie_id in self.keywords:
original_keywords = self.keywords[cookie_id].copy()
if cookie_id in self.cookie_status:
original_status = self.cookie_status[cookie_id]
logger.info(f"已更新Cookie并重启任务: {cookie_id} (用户ID: {original_user_id}, 关键词: {len(original_keywords)}条)")
# 先移除任务(但不删除数据库记录)
task = self.tasks.pop(cookie_id, None)
if task:
logger.info(f"{cookie_id}】正在停止旧任务...")
task.cancel()
try:
# 等待任务完全清理,确保资源释放
await asyncio.wait_for(task, timeout=10.0)
except asyncio.TimeoutError:
logger.warning(f"{cookie_id}】等待旧任务停止超时10秒强制继续")
except asyncio.CancelledError:
# 任务被取消是预期行为
logger.debug(f"{cookie_id}】旧任务已取消")
pass
except Exception as e:
logger.error(f"等待任务清理时出错: {cookie_id}, {e}")
logger.info(f"{cookie_id}】旧任务已停止")
# 更新Cookie值
self.cookies[cookie_id] = new_value
# 只有在需要时才保存到数据库避免覆盖其他字段如pause_duration、remark等
if save_to_db:
db_manager.save_cookie(cookie_id, new_value, original_user_id)
# 恢复关键词和状态
self.keywords[cookie_id] = original_keywords
self.cookie_status[cookie_id] = original_status
# 重新启动任务
task = self.loop.create_task(self._run_xianyu(cookie_id, new_value, original_user_id))
self.tasks[cookie_id] = task
logger.info(f"已更新Cookie并重启任务: {cookie_id} (用户ID: {original_user_id}, 关键词: {len(original_keywords)}条)")
try:
current_loop = asyncio.get_running_loop()

View File

@ -1350,45 +1350,94 @@ class DBManager:
logger.error(f"获取账号自动回复暂停时间失败: {e}")
return 10
def update_cookie_account_info(self, cookie_id: str, cookie_value: str = None, username: str = None, password: str = None, show_browser: bool = None) -> bool:
"""更新Cookie的账号信息包括cookie值、用户名、密码和显示浏览器设置"""
def update_cookie_account_info(self, cookie_id: str, cookie_value: str = None, username: str = None, password: str = None, show_browser: bool = None, user_id: int = None) -> bool:
"""更新Cookie的账号信息包括cookie值、用户名、密码和显示浏览器设置
如果记录不存在会先创建记录需要提供cookie_value和user_id
"""
with self.lock:
try:
cursor = self.conn.cursor()
# 构建动态SQL更新语句
update_fields = []
params = []
# 检查记录是否存在
self._execute_sql(cursor, "SELECT id FROM cookies WHERE id = ?", (cookie_id,))
exists = cursor.fetchone() is not None
if cookie_value is not None:
update_fields.append("value = ?")
params.append(cookie_value)
if username is not None:
update_fields.append("username = ?")
params.append(username)
if password is not None:
update_fields.append("password = ?")
params.append(password)
if show_browser is not None:
update_fields.append("show_browser = ?")
params.append(1 if show_browser else 0)
if not update_fields:
logger.warning(f"更新账号 {cookie_id} 信息时没有提供任何更新字段")
return False
params.append(cookie_id)
sql = f"UPDATE cookies SET {', '.join(update_fields)} WHERE id = ?"
self._execute_sql(cursor, sql, tuple(params))
self.conn.commit()
logger.info(f"更新账号 {cookie_id} 信息成功: {update_fields}")
return True
if not exists:
# 记录不存在,需要创建新记录
if cookie_value is None:
logger.warning(f"账号 {cookie_id} 不存在且未提供cookie_value无法创建新记录")
return False
# 如果没有提供user_id尝试从现有记录获取否则使用admin用户ID
if user_id is None:
# 获取admin用户ID作为默认值
self._execute_sql(cursor, "SELECT id FROM users WHERE username = 'admin'")
admin_user = cursor.fetchone()
user_id = admin_user[0] if admin_user else 1
# 构建插入语句
insert_fields = ['id', 'value', 'user_id']
insert_values = [cookie_id, cookie_value, user_id]
insert_placeholders = ['?', '?', '?']
if username is not None:
insert_fields.append('username')
insert_values.append(username)
insert_placeholders.append('?')
if password is not None:
insert_fields.append('password')
insert_values.append(password)
insert_placeholders.append('?')
if show_browser is not None:
insert_fields.append('show_browser')
insert_values.append(1 if show_browser else 0)
insert_placeholders.append('?')
sql = f"INSERT INTO cookies ({', '.join(insert_fields)}) VALUES ({', '.join(insert_placeholders)})"
self._execute_sql(cursor, sql, tuple(insert_values))
self.conn.commit()
logger.info(f"创建新账号 {cookie_id} 并保存信息成功: {insert_fields}")
return True
else:
# 记录存在,执行更新
# 构建动态SQL更新语句
update_fields = []
params = []
if cookie_value is not None:
update_fields.append("value = ?")
params.append(cookie_value)
if username is not None:
update_fields.append("username = ?")
params.append(username)
if password is not None:
update_fields.append("password = ?")
params.append(password)
if show_browser is not None:
update_fields.append("show_browser = ?")
params.append(1 if show_browser else 0)
if not update_fields:
logger.warning(f"更新账号 {cookie_id} 信息时没有提供任何更新字段")
return False
params.append(cookie_id)
sql = f"UPDATE cookies SET {', '.join(update_fields)} WHERE id = ?"
self._execute_sql(cursor, sql, tuple(params))
self.conn.commit()
logger.info(f"更新账号 {cookie_id} 信息成功: {update_fields}")
return True
except Exception as e:
logger.error(f"更新账号信息失败: {e}")
import traceback
logger.error(traceback.format_exc())
self.conn.rollback()
return False
def get_auto_confirm(self, cookie_id: str) -> bool:
@ -3739,7 +3788,7 @@ class DBManager:
item_info['item_detail_parsed'] = json.loads(item_info['item_detail'])
except:
item_info['item_detail_parsed'] = {}
logger.info(f"item_info: {item_info}")
return item_info
return None

View File

@ -11,6 +11,7 @@ import secrets
import time
import json
import os
import re
import uvicorn
import pandas as pd
import io
@ -27,6 +28,14 @@ from utils.image_utils import image_manager
from loguru import logger
# 刮刮乐远程控制路由
try:
from api_captcha_remote import router as captcha_router
CAPTCHA_ROUTER_AVAILABLE = True
except ImportError:
logger.warning("⚠️ api_captcha_remote 未找到,刮刮乐远程控制功能不可用")
CAPTCHA_ROUTER_AVAILABLE = False
# 关键字文件路径
KEYWORDS_FILE = Path(__file__).parent / "回复关键字.txt"
@ -43,6 +52,10 @@ security = HTTPBearer(auto_error=False)
qr_check_locks = defaultdict(lambda: asyncio.Lock())
qr_check_processed = {} # 记录已处理的session: {session_id: {'processed': bool, 'timestamp': float}}
# 账号密码登录会话管理
password_login_sessions = {} # {session_id: {'account_id': str, 'account': str, 'password': str, 'show_browser': bool, 'status': str, 'verification_url': str, 'qr_code_url': str, 'slider_instance': object, 'task': asyncio.Task, 'timestamp': float}}
password_login_locks = defaultdict(lambda: asyncio.Lock())
# 不再需要单独的密码初始化,由数据库初始化时处理
@ -300,6 +313,13 @@ app = FastAPI(
redoc_url="/redoc"
)
# 注册刮刮乐远程控制路由
if CAPTCHA_ROUTER_AVAILABLE:
app.include_router(captcha_router)
logger.info("✅ 已注册刮刮乐远程控制路由: /api/captcha")
else:
logger.warning("⚠️ 刮刮乐远程控制路由未注册")
# 初始化文件日志收集器
setup_file_logging()
@ -1274,6 +1294,412 @@ def get_cookie_account_details(cid: str, current_user: Dict[str, Any] = Depends(
raise HTTPException(status_code=400, detail=str(e))
# ========================= 账号密码登录相关接口 =========================
async def _execute_password_login(session_id: str, account_id: str, account: str, password: str, show_browser: bool, user_id: int, current_user: Dict[str, Any]):
"""后台执行账号密码登录任务"""
try:
log_with_user('info', f"开始执行账号密码登录任务: {session_id}, 账号: {account_id}", current_user)
# 导入 XianyuSliderStealth
from utils.xianyu_slider_stealth import XianyuSliderStealth
import base64
import io
# 创建 XianyuSliderStealth 实例
slider_instance = XianyuSliderStealth(
user_id=account_id,
enable_learning=True,
headless=not show_browser
)
# 更新会话信息
password_login_sessions[session_id]['slider_instance'] = slider_instance
# 定义通知回调函数,用于检测到人脸认证时返回验证链接(同步函数)
def notification_callback(message: str, screenshot_path: str = None, verification_url: str = None):
"""人脸认证通知回调(同步)"""
try:
# 优先使用验证链接
if verification_url:
# 更新会话状态,保存验证链接
password_login_sessions[session_id]['status'] = 'verification_required'
password_login_sessions[session_id]['verification_url'] = verification_url
password_login_sessions[session_id]['qr_code_url'] = None # 不再使用截图
log_with_user('info', f"人脸认证验证链接已保存: {session_id}, URL: {verification_url}", current_user)
# 发送通知到用户配置的渠道
def send_face_verification_notification():
"""在后台线程中发送人脸验证通知"""
try:
from XianyuAutoAsync import XianyuLive
log_with_user('info', f"开始尝试发送人脸验证通知: {account_id}", current_user)
# 尝试获取XianyuLive实例如果账号已经存在
live_instance = XianyuLive.get_instance(account_id)
if live_instance:
log_with_user('info', f"找到账号实例,准备发送通知: {account_id}", current_user)
# 创建新的事件循环来运行异步通知
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
new_loop.run_until_complete(
live_instance.send_token_refresh_notification(
error_message=message,
notification_type="face_verification",
verification_url=verification_url
)
)
log_with_user('info', f"✅ 已发送人脸验证通知: {account_id}", current_user)
except Exception as notify_err:
log_with_user('error', f"发送人脸验证通知失败: {str(notify_err)}", current_user)
import traceback
log_with_user('error', f"通知错误详情: {traceback.format_exc()}", current_user)
finally:
new_loop.close()
else:
# 如果账号实例不存在,记录警告并尝试从数据库获取通知配置
log_with_user('warning', f"账号实例不存在: {account_id},尝试从数据库获取通知配置", current_user)
try:
# 尝试从数据库获取通知配置
notifications = db_manager.get_account_notifications(account_id)
if notifications:
log_with_user('info', f"找到 {len(notifications)} 个通知配置,但需要账号实例才能发送", current_user)
log_with_user('warning', f"账号实例不存在,无法发送通知: {account_id}。请确保账号已登录并运行中。", current_user)
else:
log_with_user('warning', f"账号 {account_id} 未配置通知渠道", current_user)
except Exception as db_err:
log_with_user('error', f"获取通知配置失败: {str(db_err)}", current_user)
except Exception as notify_err:
log_with_user('error', f"发送人脸验证通知时出错: {str(notify_err)}", current_user)
import traceback
log_with_user('error', f"通知错误详情: {traceback.format_exc()}", current_user)
# 在后台线程中发送通知,避免阻塞登录流程
import threading
notification_thread = threading.Thread(target=send_face_verification_notification)
notification_thread.daemon = True
notification_thread.start()
log_with_user('info', f"已启动人脸验证通知发送线程: {account_id}", current_user)
elif screenshot_path and os.path.exists(screenshot_path):
# 兼容旧版本:如果有截图路径,仍然处理(但不再使用)
log_with_user('info', f"收到截图路径(已弃用): {session_id}", current_user)
except Exception as e:
log_with_user('error', f"处理人脸认证通知失败: {str(e)}", current_user)
# 调用登录方法(同步方法,需要在后台线程中执行)
import threading
def run_login():
try:
cookies_dict = slider_instance.login_with_password_playwright(
account=account,
password=password,
show_browser=show_browser,
notification_callback=notification_callback
)
if cookies_dict is None:
password_login_sessions[session_id]['status'] = 'failed'
password_login_sessions[session_id]['error'] = '登录失败,请检查账号密码是否正确'
log_with_user('error', f"账号密码登录失败: {account_id}", current_user)
return
# 将cookie字典转换为字符串格式
cookies_str = '; '.join([f"{k}={v}" for k, v in cookies_dict.items()])
log_with_user('info', f"账号密码登录成功,获取到 {len(cookies_dict)} 个Cookie字段: {account_id}", current_user)
# 检查是否已存在相同账号ID的Cookie
existing_cookies = db_manager.get_all_cookies(user_id)
is_new_account = account_id not in existing_cookies
# 保存账号密码和Cookie到数据库
# 使用 update_cookie_account_info 来保存,它会自动处理新账号和现有账号的情况
update_success = db_manager.update_cookie_account_info(
account_id,
cookie_value=cookies_str,
username=account,
password=password,
show_browser=show_browser,
user_id=user_id # 新账号时需要提供user_id
)
if update_success:
if is_new_account:
log_with_user('info', f"新账号Cookie和账号密码已保存: {account_id}", current_user)
else:
log_with_user('info', f"现有账号Cookie和账号密码已更新: {account_id}", current_user)
else:
log_with_user('error', f"保存账号信息失败: {account_id}", current_user)
# 添加到或更新cookie_manager注意不要在这里调用add_cookie或update_cookie因为它们会覆盖账号密码
# 账号密码已经在上面通过update_cookie_account_info保存了
# 这里只需要更新内存中的cookie值不保存到数据库避免覆盖账号密码
if cookie_manager.manager:
# 更新内存中的cookie值
cookie_manager.manager.cookies[account_id] = cookies_str
log_with_user('info', f"已更新cookie_manager中的Cookie内存: {account_id}", current_user)
# 如果是新账号,需要启动任务
if is_new_account:
# 使用异步方式启动任务,但不保存到数据库(避免覆盖账号密码)
try:
import asyncio
loop = cookie_manager.manager.loop
if loop:
# 确保关键词列表存在
if account_id not in cookie_manager.manager.keywords:
cookie_manager.manager.keywords[account_id] = []
# 在后台启动任务使用线程安全的方式因为run_login是在后台线程中运行的
try:
# 尝试使用run_coroutine_threadsafe这是线程安全的方式
fut = asyncio.run_coroutine_threadsafe(
cookie_manager.manager._run_xianyu(account_id, cookies_str, user_id),
loop
)
# 不等待结果,让它在后台运行
log_with_user('info', f"已启动新账号任务: {account_id}", current_user)
except RuntimeError as e:
# 如果事件循环未运行,记录警告但不影响登录成功
log_with_user('warning', f"事件循环未运行,无法启动新账号任务: {account_id}, 错误: {str(e)}", current_user)
log_with_user('info', f"账号已保存,将在系统重启后自动启动任务: {account_id}", current_user)
except Exception as task_err:
log_with_user('warning', f"启动新账号任务失败: {account_id}, 错误: {str(task_err)}", current_user)
import traceback
logger.error(traceback.format_exc())
# 登录成功后调用_refresh_cookies_via_browser刷新Cookie
try:
log_with_user('info', f"开始调用_refresh_cookies_via_browser刷新Cookie: {account_id}", current_user)
from XianyuAutoAsync import XianyuLive
# 创建临时的XianyuLive实例来刷新Cookie
temp_xianyu = XianyuLive(
cookies_str=cookies_str,
cookie_id=account_id,
user_id=user_id
)
# 重置扫码登录Cookie刷新标志确保账号密码登录后能立即刷新
try:
temp_xianyu.reset_qr_cookie_refresh_flag()
log_with_user('info', f"已重置扫码登录Cookie刷新标志: {account_id}", current_user)
except Exception as reset_err:
log_with_user('debug', f"重置扫码登录Cookie刷新标志失败不影响刷新: {str(reset_err)}", current_user)
# 在后台异步执行刷新(不阻塞主流程)
async def refresh_cookies_task():
try:
refresh_success = await temp_xianyu._refresh_cookies_via_browser(triggered_by_refresh_token=False)
if refresh_success:
log_with_user('info', f"Cookie刷新成功: {account_id}", current_user)
# 刷新成功后从数据库获取更新后的Cookie
updated_cookie_info = db_manager.get_cookie_details(account_id)
if updated_cookie_info:
refreshed_cookies = updated_cookie_info.get('value', '')
if refreshed_cookies:
# 更新cookie_manager中的Cookie
if cookie_manager.manager:
cookie_manager.manager.update_cookie(account_id, refreshed_cookies, save_to_db=False)
log_with_user('info', f"已更新刷新后的Cookie到cookie_manager: {account_id}", current_user)
else:
log_with_user('warning', f"Cookie刷新失败或跳过: {account_id}", current_user)
except Exception as refresh_e:
log_with_user('error', f"刷新Cookie时出错: {account_id}, 错误: {str(refresh_e)}", current_user)
import traceback
logger.error(traceback.format_exc())
# 在后台线程中运行异步任务
# 由于run_login是在线程中运行的需要创建新的事件循环
def run_async_refresh():
try:
import asyncio
# 创建新的事件循环
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
try:
new_loop.run_until_complete(refresh_cookies_task())
finally:
new_loop.close()
except Exception as e:
log_with_user('error', f"运行异步刷新任务失败: {account_id}, 错误: {str(e)}", current_user)
# 在后台线程中执行刷新任务
refresh_thread = threading.Thread(target=run_async_refresh, daemon=True)
refresh_thread.start()
except Exception as refresh_err:
log_with_user('warning', f"调用_refresh_cookies_via_browser失败: {account_id}, 错误: {str(refresh_err)}", current_user)
# 刷新失败不影响登录成功
# 更新会话状态
password_login_sessions[session_id]['status'] = 'success'
password_login_sessions[session_id]['account_id'] = account_id
password_login_sessions[session_id]['is_new_account'] = is_new_account
password_login_sessions[session_id]['cookie_count'] = len(cookies_dict)
except Exception as e:
error_msg = str(e)
password_login_sessions[session_id]['status'] = 'failed'
password_login_sessions[session_id]['error'] = error_msg
log_with_user('error', f"账号密码登录失败: {account_id}, 错误: {error_msg}", current_user)
logger.info(f"会话 {session_id} 状态已更新为 failed错误消息: {error_msg}") # 添加日志确认状态更新
import traceback
logger.error(traceback.format_exc())
finally:
# 清理实例(释放并发槽位)
try:
from utils.xianyu_slider_stealth import concurrency_manager
concurrency_manager.unregister_instance(account_id)
log_with_user('debug', f"已释放并发槽位: {account_id}", current_user)
except Exception as cleanup_e:
log_with_user('warning', f"清理实例时出错: {str(cleanup_e)}", current_user)
# 在后台线程中执行登录
login_thread = threading.Thread(target=run_login, daemon=True)
login_thread.start()
except Exception as e:
password_login_sessions[session_id]['status'] = 'failed'
password_login_sessions[session_id]['error'] = str(e)
log_with_user('error', f"执行账号密码登录任务异常: {str(e)}", current_user)
import traceback
logger.error(traceback.format_exc())
@app.post("/password-login")
async def password_login(
request: Dict[str, Any],
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""账号密码登录接口(异步,支持人脸认证)"""
try:
account_id = request.get('account_id')
account = request.get('account')
password = request.get('password')
show_browser = request.get('show_browser', False)
if not account_id or not account or not password:
return {'success': False, 'message': '账号ID、登录账号和密码不能为空'}
log_with_user('info', f"开始账号密码登录: {account_id}, 账号: {account}", current_user)
# 生成会话ID
import secrets
session_id = secrets.token_urlsafe(16)
user_id = current_user['user_id']
# 创建登录会话
password_login_sessions[session_id] = {
'account_id': account_id,
'account': account,
'password': password,
'show_browser': show_browser,
'status': 'processing',
'verification_url': None,
'qr_code_url': None,
'slider_instance': None,
'task': None,
'timestamp': time.time(),
'user_id': user_id
}
# 启动后台登录任务
task = asyncio.create_task(_execute_password_login(
session_id, account_id, account, password, show_browser, user_id, current_user
))
password_login_sessions[session_id]['task'] = task
return {
'success': True,
'session_id': session_id,
'status': 'processing',
'message': '登录任务已启动,请等待...'
}
except Exception as e:
log_with_user('error', f"账号密码登录异常: {str(e)}", current_user)
import traceback
logger.error(traceback.format_exc())
return {'success': False, 'message': f'登录失败: {str(e)}'}
@app.get("/password-login/check/{session_id}")
async def check_password_login_status(
session_id: str,
current_user: Dict[str, Any] = Depends(get_current_user)
):
"""检查账号密码登录状态"""
try:
# 清理过期会话超过1小时
current_time = time.time()
expired_sessions = [
sid for sid, session in password_login_sessions.items()
if current_time - session['timestamp'] > 3600
]
for sid in expired_sessions:
if sid in password_login_sessions:
del password_login_sessions[sid]
if session_id not in password_login_sessions:
return {'status': 'not_found', 'message': '会话不存在或已过期'}
session = password_login_sessions[session_id]
# 检查用户权限
if session['user_id'] != current_user['user_id']:
return {'status': 'forbidden', 'message': '无权限访问该会话'}
status = session['status']
if status == 'verification_required':
# 需要人脸认证
return {
'status': 'verification_required',
'verification_url': session.get('verification_url'),
'qr_code_url': session.get('qr_code_url'), # 保留兼容性
'message': '需要人脸验证,请点击验证链接'
}
elif status == 'success':
# 登录成功
result = {
'status': 'success',
'message': f'账号 {session["account_id"]} 登录成功',
'account_id': session['account_id'],
'is_new_account': session.get('is_new_account', False),
'cookie_count': session.get('cookie_count', 0)
}
# 清理会话
del password_login_sessions[session_id]
return result
elif status == 'failed':
# 登录失败
error_msg = session.get('error', '登录失败')
log_with_user('info', f"返回登录失败状态: {session_id}, 错误消息: {error_msg}", current_user) # 添加日志
result = {
'status': 'failed',
'message': error_msg,
'error': error_msg # 也包含error字段确保前端能获取到
}
# 清理会话
del password_login_sessions[session_id]
return result
else:
# 处理中
return {
'status': 'processing',
'message': '登录处理中,请稍候...'
}
except Exception as e:
log_with_user('error', f"检查账号密码登录状态异常: {str(e)}", current_user)
return {'status': 'error', 'message': str(e)}
# ========================= 扫码登录相关接口 =========================
@app.post("/qr-login/generate")
@ -1330,10 +1756,12 @@ async def check_qr_code_status(session_id: str, current_user: Dict[str, Any] = D
# 获取会话状态
status_info = qr_login_manager.get_session_status(session_id)
log_with_user('info', f"获取会话状态1111111: {status_info}", current_user)
if status_info['status'] == 'success':
log_with_user('info', f"获取会话状态22222222: {status_info}", current_user)
# 登录成功处理Cookie现在包含获取真实cookie的逻辑
cookies_info = qr_login_manager.get_session_cookies(session_id)
log_with_user('info', f"获取会话Cookie: {cookies_info}", current_user)
if cookies_info:
account_info = await process_qr_login_cookies(
cookies_info['cookies'],
@ -3633,8 +4061,6 @@ def update_ai_reply_settings(cookie_id: str, settings: AIReplySettings, current_
success = db_manager.save_ai_reply_settings(cookie_id, settings_dict)
if success:
# 清理客户端缓存,强制重新创建
ai_reply_engine.clear_client_cache(cookie_id)
# 如果启用了AI回复记录日志
if settings.ai_enabled:
@ -4186,6 +4612,99 @@ def get_system_logs(admin_user: Dict[str, Any] = Depends(require_admin),
log_with_user('error', f"获取系统日志失败: {str(e)}", admin_user)
return {"logs": [], "message": f"获取系统日志失败: {str(e)}", "success": False}
@app.get('/admin/log-files')
def list_log_files(admin_user: Dict[str, Any] = Depends(require_admin)):
"""列出所有可用的系统日志文件"""
import os
import glob
from datetime import datetime
try:
log_with_user('info', "查询日志文件列表", admin_user)
log_dir = "logs"
if not os.path.exists(log_dir):
logger.warning("日志目录不存在")
return {"success": True, "files": []}
log_pattern = os.path.join(log_dir, "xianyu_*.log")
log_files = glob.glob(log_pattern)
files_info = []
for file_path in log_files:
try:
stat_info = os.stat(file_path)
files_info.append({
"name": os.path.basename(file_path),
"size": stat_info.st_size,
"modified_at": datetime.fromtimestamp(stat_info.st_mtime).isoformat(),
"modified_ts": stat_info.st_mtime
})
except OSError as e:
logger.warning(f"读取日志文件信息失败 {file_path}: {e}")
# 按修改时间倒序排序
files_info.sort(key=lambda item: item.get("modified_ts", 0), reverse=True)
logger.info(f"返回日志文件列表,共 {len(files_info)} 个文件")
return {"success": True, "files": files_info}
except Exception as e:
logger.error(f"获取日志文件列表失败: {str(e)}")
log_with_user('error', f"获取日志文件列表失败: {str(e)}", admin_user)
raise HTTPException(status_code=500, detail=str(e))
@app.get('/admin/logs/export')
def export_log_file(file: str, admin_user: Dict[str, Any] = Depends(require_admin)):
"""导出指定的日志文件"""
import os
from fastapi.responses import StreamingResponse
try:
if not file:
raise HTTPException(status_code=400, detail="缺少文件参数")
safe_name = os.path.basename(file)
log_dir = os.path.abspath("logs")
target_path = os.path.abspath(os.path.join(log_dir, safe_name))
# 防止目录遍历
if not target_path.startswith(log_dir):
log_with_user('warning', f"尝试访问非法日志文件: {file}", admin_user)
raise HTTPException(status_code=400, detail="非法的日志文件路径")
if not os.path.exists(target_path):
log_with_user('warning', f"日志文件不存在: {file}", admin_user)
raise HTTPException(status_code=404, detail="日志文件不存在")
log_with_user('info', f"导出日志文件: {safe_name}", admin_user)
def iter_file(path: str):
file_handle = open(path, 'rb')
try:
while True:
chunk = file_handle.read(8192)
if not chunk:
break
yield chunk
finally:
file_handle.close()
headers = {
"Content-Disposition": f'attachment; filename="{safe_name}"'
}
return StreamingResponse(
iter_file(target_path),
media_type='text/plain; charset=utf-8',
headers=headers
)
except HTTPException:
raise
except Exception as e:
logger.error(f"导出日志文件失败: {str(e)}")
log_with_user('error', f"导出日志文件失败: {str(e)}", admin_user)
raise HTTPException(status_code=500, detail=str(e))
@app.get('/admin/stats')
def get_system_stats(admin_user: Dict[str, Any] = Depends(require_admin)):
"""获取系统统计信息(管理员专用)"""

View File

@ -260,6 +260,12 @@
<br>
<small class="opacity-75">推荐方式</small>
</button>
<button type="button" class="btn btn-outline-primary btn-lg flex-fill password-login-btn" onclick="togglePasswordLogin()" style="max-width: 300px;">
<i class="bi bi-key me-2"></i>
<span class="fw-bold">账号密码登录</span>
<br>
<small class="opacity-75">使用账号和密码登录</small>
</button>
<button type="button" class="btn btn-outline-secondary btn-lg flex-fill manual-input-btn" onclick="toggleManualInput()" style="max-width: 300px;">
<i class="bi bi-keyboard me-2"></i>
<span class="fw-bold">手动输入</span>
@ -270,6 +276,44 @@
</div>
</div>
<!-- 账号密码登录表单(默认隐藏) -->
<div id="passwordLoginForm" style="display: none;">
<div class="alert alert-info">
<i class="bi bi-info-circle me-2"></i>
<strong>提示:</strong>使用账号密码登录系统将自动获取Cookie。登录过程可能需要处理滑块验证请耐心等待。
</div>
<form id="passwordLoginFormElement" class="row g-3">
<div class="col-md-4">
<label for="passwordLoginAccountId" class="form-label">账号ID</label>
<input type="text" class="form-control" id="passwordLoginAccountId" placeholder="唯一标识" required>
</div>
<div class="col-md-4">
<label for="passwordLoginAccount" class="form-label">登录账号</label>
<input type="text" class="form-control" id="passwordLoginAccount" placeholder="手机号或邮箱" required>
</div>
<div class="col-md-4">
<label for="passwordLoginPassword" class="form-label">登录密码</label>
<input type="password" class="form-control" id="passwordLoginPassword" placeholder="请输入密码" required>
</div>
<div class="col-12">
<div class="form-check">
<input class="form-check-input" type="checkbox" id="passwordLoginShowBrowser" value="1">
<label class="form-check-label" for="passwordLoginShowBrowser">
显示浏览器窗口(如果使用服务器,请勿打开该开关,会导致登录失败)
</label>
</div>
</div>
<div class="col-12">
<button type="submit" class="btn btn-primary">
<i class="bi bi-box-arrow-in-right me-1"></i>开始登录
</button>
<button type="button" class="btn btn-secondary ms-2" onclick="togglePasswordLogin()">
<i class="bi bi-x-circle me-1"></i>取消
</button>
</div>
</form>
</div>
<!-- 手动输入表单(默认隐藏) -->
<div id="manualInputForm" style="display: none;">
<div class="alert alert-info">
@ -1273,10 +1317,13 @@
<div class="content-body">
<!-- 日志控制面板 -->
<div class="card mb-4">
<div class="card-header">
<div class="card-header d-flex justify-content-between align-items-center">
<h5 class="mb-0">
<i class="bi bi-sliders"></i> 日志控制
</h5>
<button class="btn btn-sm btn-primary" onclick="openLogExportModal()">
<i class="bi bi-box-arrow-up-right me-1"></i>导出日志
</button>
</div>
<div class="card-body">
<div class="row align-items-center">
@ -1390,6 +1437,51 @@
</div>
</div>
<!-- 导出日志模态框 -->
<div class="modal fade" id="exportLogModal" tabindex="-1" aria-labelledby="exportLogModalLabel" aria-hidden="true">
<div class="modal-dialog modal-lg">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title" id="exportLogModalLabel">
<i class="bi bi-box-arrow-down me-2"></i>导出系统日志
</h5>
<button type="button" class="btn-close" data-bs-dismiss="modal" aria-label="Close"></button>
</div>
<div class="modal-body">
<div class="d-flex justify-content-between align-items-center flex-wrap gap-2 mb-3">
<div class="text-muted small">
选择要导出的日志文件,可用于排查问题或备份。
</div>
<button type="button" class="btn btn-sm btn-outline-secondary" onclick="refreshLogFileList()">
<i class="bi bi-arrow-clockwise me-1"></i>刷新列表
</button>
</div>
<div id="logFileLoading" class="text-center py-4">
<div class="spinner-border" role="status">
<span class="visually-hidden">加载中...</span>
</div>
<p class="mt-2 text-muted">正在加载日志文件列表...</p>
</div>
<div id="logFileError" class="alert alert-danger d-none" role="alert"></div>
<div id="logFileEmpty" class="text-center text-muted py-4 d-none">
<i class="bi bi-file-earmark-text mb-2" style="font-size: 2rem;"></i>
<p class="mb-0">暂无日志文件</p>
</div>
<div class="list-group" id="logFileList"></div>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-secondary" data-bs-dismiss="modal">
<i class="bi bi-x-circle me-1"></i>关闭
</button>
</div>
</div>
</div>
</div>
<!-- 风控日志内容 -->
<div id="risk-control-logs-section" class="content-section">
<div class="content-header">
@ -3583,5 +3675,35 @@
</div>
</div>
<!-- 滑块验证模态框 -->
<div class="modal fade" id="captchaVerifyModal" tabindex="-1" data-bs-backdrop="static" data-bs-keyboard="false">
<div class="modal-dialog modal-xl modal-dialog-centered">
<div class="modal-content">
<div class="modal-header bg-warning">
<h5 class="modal-title">
<i class="bi bi-shield-exclamation me-2"></i>
需要滑块验证
</h5>
</div>
<div class="modal-body p-0">
<div class="alert alert-info m-3 mb-0">
<i class="bi bi-info-circle me-2"></i>
<strong>提示:</strong>检测到滑块验证,请完成滑块验证后继续搜索。验证成功后将自动关闭此窗口。
</div>
<iframe id="captchaIframe"
style="width: 100%; height: 600px; border: none;"
sandbox="allow-same-origin allow-scripts allow-forms">
</iframe>
<div id="captchaLoadingIndicator" class="text-center py-5">
<div class="spinner-border text-primary mb-3" role="status">
<span class="visually-hidden">加载中...</span>
</div>
<p class="text-muted">正在加载滑块验证...</p>
</div>
</div>
</div>
</div>
</div>
</body>
</html>

File diff suppressed because it is too large Load Diff

View File

@ -1 +1 @@
v1.0.3
v1.0.4

View File

@ -0,0 +1,368 @@
"""
刮刮乐远程控制模块
通过 WebSocket 实时传输页面截图到前端并接收用户操作
"""
import asyncio
import base64
import json
from typing import Optional, Dict, Any
from loguru import logger
from playwright.async_api import Page
class CaptchaRemoteController:
"""刮刮乐远程控制器"""
def __init__(self):
self.active_sessions: Dict[str, Dict[str, Any]] = {}
self.websocket_connections: Dict[str, Any] = {}
async def create_session(self, session_id: str, page: Page) -> Dict[str, str]:
"""
创建远程控制会话
Args:
session_id: 会话ID通常是用户ID
page: Playwright Page 对象
Returns:
包含会话信息的字典
"""
# 获取滑块元素位置
captcha_info = await self._get_captcha_info(page)
# 只截取滑块区域,不截取整个页面(性能优化)
screenshot_bytes = await self._screenshot_captcha_area(page, captcha_info)
screenshot_base64 = base64.b64encode(screenshot_bytes).decode('utf-8')
# 获取视口大小
try:
viewport = page.viewport_size
if viewport is None:
# 如果没有设置viewport使用默认值或通过JS获取
viewport = await page.evaluate("() => ({width: window.innerWidth, height: window.innerHeight})")
except:
viewport = {'width': 1280, 'height': 720} # 默认值
# 存储会话
self.active_sessions[session_id] = {
'page': page,
'screenshot': screenshot_base64,
'captcha_info': captcha_info,
'completed': False,
'viewport': viewport
}
logger.info(f"✅ 创建远程控制会话: {session_id}")
return {
'session_id': session_id,
'screenshot': screenshot_base64,
'captcha_info': captcha_info,
'viewport': self.active_sessions[session_id]['viewport']
}
async def _screenshot_captcha_area(self, page: Page, captcha_info: Dict[str, Any]) -> bytes:
"""截取整个验证码容器区域"""
try:
if captcha_info and 'x' in captcha_info:
# 直接截取整个容器,稍微留一点边距
x = max(0, captcha_info['x'] - 10)
y = max(0, captcha_info['y'] - 10)
width = captcha_info['width'] + 20
height = captcha_info['height'] + 20
# 截取整个验证码容器
screenshot_bytes = await page.screenshot(
type='jpeg',
quality=80, # 验证码区域用高质量
clip={
'x': x,
'y': y,
'width': width,
'height': height
}
)
logger.info(f"✅ 截取验证码容器: {width}x{height} (包含完整验证码)")
return screenshot_bytes
else:
# 如果没有找到滑块,截取整个页面
logger.warning("未找到滑块位置,截取整个页面")
return await page.screenshot(type='jpeg', quality=75, full_page=False)
except Exception as e:
logger.warning(f"截取滑块区域失败,使用全页面: {e}")
return await page.screenshot(type='jpeg', quality=75, full_page=False)
async def _get_captcha_info(self, page: Page) -> Dict[str, Any]:
"""获取滑块验证码信息(查找整个容器)"""
try:
# 优先查找整个验证码容器(不是按钮)
container_selectors = [
'#nocaptcha', # 完整的验证码容器
'.scratch-captcha-container',
'[id*="captcha"]',
'.nc-container'
]
# 先在主页面查找
for selector in container_selectors:
try:
element = await page.query_selector(selector)
if element:
box = await element.bounding_box()
if box and box['width'] > 100 and box['height'] > 100: # 确保找到的是容器
logger.info(f"✅ 在主页面找到验证码容器: {selector}, 大小: {box['width']}x{box['height']}")
return {
'selector': selector,
'x': box['x'],
'y': box['y'],
'width': box['width'],
'height': box['height'],
'in_iframe': False
}
except Exception as e:
logger.debug(f"检查选择器 {selector} 失败: {e}")
continue
# 在 iframe 中查找
frames = page.frames
for frame in frames:
if frame != page.main_frame:
for selector in container_selectors:
try:
element = await frame.query_selector(selector)
if element:
box = await element.bounding_box()
if box and box['width'] > 100 and box['height'] > 100:
logger.info(f"✅ 在iframe找到验证码容器: {selector}, 大小: {box['width']}x{box['height']}")
return {
'selector': selector,
'x': box['x'],
'y': box['y'],
'width': box['width'],
'height': box['height'],
'in_iframe': True
# 注意:不保存 frame 对象,因为不能被 JSON 序列化
}
except Exception as e:
logger.debug(f"iframe检查选择器 {selector} 失败: {e}")
continue
logger.warning("⚠️ 未找到验证码容器")
return None
except Exception as e:
logger.error(f"获取滑块信息失败: {e}")
return None
async def update_screenshot(self, session_id: str, quality: int = 75) -> Optional[str]:
"""更新会话的截图(截取整个验证码容器)"""
if session_id not in self.active_sessions:
return None
try:
page = self.active_sessions[session_id]['page']
captcha_info = self.active_sessions[session_id].get('captcha_info')
# 截取整个验证码容器
if captcha_info and 'x' in captcha_info:
x = max(0, captcha_info['x'] - 10)
y = max(0, captcha_info['y'] - 10)
width = captcha_info['width'] + 20
height = captcha_info['height'] + 20
screenshot_bytes = await page.screenshot(
type='jpeg',
quality=quality,
clip={'x': x, 'y': y, 'width': width, 'height': height}
)
else:
# 降级方案:截取整个页面
screenshot_bytes = await page.screenshot(
type='jpeg',
quality=quality,
full_page=False
)
screenshot_base64 = base64.b64encode(screenshot_bytes).decode('utf-8')
self.active_sessions[session_id]['screenshot'] = screenshot_base64
return screenshot_base64
except Exception as e:
logger.error(f"更新截图失败: {e}")
return None
async def handle_mouse_event(self, session_id: str, event_type: str, x: int, y: int) -> bool:
"""
处理鼠标事件
Args:
session_id: 会话ID
event_type: 事件类型 (down/move/up)
x: X坐标
y: Y坐标
Returns:
是否成功
"""
if session_id not in self.active_sessions:
logger.warning(f"会话不存在: {session_id}")
return False
try:
page = self.active_sessions[session_id]['page']
if event_type == 'down':
await page.mouse.move(x, y)
await page.mouse.down()
logger.debug(f"鼠标按下: ({x}, {y})")
elif event_type == 'move':
await page.mouse.move(x, y)
logger.debug(f"鼠标移动: ({x}, {y})")
elif event_type == 'up':
await page.mouse.up()
logger.debug(f"鼠标释放: ({x}, {y})")
else:
logger.warning(f"未知事件类型: {event_type}")
return False
return True
except Exception as e:
logger.error(f"处理鼠标事件失败: {e}")
return False
async def check_completion(self, session_id: str) -> bool:
"""检查验证是否完成(更严格的判断)"""
if session_id not in self.active_sessions:
return False
try:
page = self.active_sessions[session_id]['page']
# 多个选择器检查,确保更准确
captcha_selectors = [
'#nocaptcha',
'#scratch-captcha-btn',
'.scratch-captcha-container',
'.scratch-captcha-slider'
]
found_visible_captcha = False
# 检查主页面
for selector in captcha_selectors:
try:
element = await page.query_selector(selector)
if element:
is_visible = await element.is_visible()
if is_visible:
logger.debug(f"主页面发现可见滑块: {selector}")
found_visible_captcha = True
break
except:
continue
if found_visible_captcha:
return False
# 检查所有 iframe
frames = page.frames
for frame in frames:
if frame != page.main_frame:
for selector in captcha_selectors:
try:
element = await frame.query_selector(selector)
if element:
is_visible = await element.is_visible()
if is_visible:
logger.debug(f"iframe中发现可见滑块: {selector}")
found_visible_captcha = True
break
except:
continue
if found_visible_captcha:
break
if found_visible_captcha:
return False
# 额外检查:看页面内容是否还包含滑块相关文字
try:
page_content = await page.content()
captcha_keywords = ['scratch-captcha', 'nocaptcha', 'slider-btn']
# 如果页面中仍然有大量滑块相关内容,可能还未完成
keyword_count = sum(1 for kw in captcha_keywords if kw in page_content)
if keyword_count >= 2:
logger.debug(f"页面中仍有 {keyword_count} 个滑块关键词")
return False
except:
pass
# 所有检查都通过,认为验证完成
logger.success(f"✅ 验证完成(所有滑块元素已消失): {session_id}")
self.active_sessions[session_id]['completed'] = True
return True
except Exception as e:
logger.error(f"检查完成状态失败: {e}")
# 出错时返回 False不要误判为成功
return False
def is_completed(self, session_id: str) -> bool:
"""检查会话是否已完成"""
if session_id not in self.active_sessions:
return False
return self.active_sessions[session_id].get('completed', False)
def session_exists(self, session_id: str) -> bool:
"""检查会话是否存在"""
return session_id in self.active_sessions
async def close_session(self, session_id: str):
"""关闭会话"""
if session_id in self.active_sessions:
del self.active_sessions[session_id]
logger.info(f"🔒 关闭远程控制会话: {session_id}")
async def auto_refresh_screenshot(self, session_id: str, interval: float = 1.0):
"""自动刷新截图(优化版:按需更新)"""
last_update_time = asyncio.get_event_loop().time()
while session_id in self.active_sessions and not self.is_completed(session_id):
try:
current_time = asyncio.get_event_loop().time()
# 使用自适应刷新:空闲时降低频率
if current_time - last_update_time >= interval:
screenshot = await self.update_screenshot(session_id, quality=55) # 降低质量提升性能
if screenshot and session_id in self.websocket_connections:
try:
ws = self.websocket_connections[session_id]
await ws.send_json({
'type': 'screenshot_update',
'screenshot': screenshot
})
last_update_time = current_time
except:
# WebSocket 可能已断开
break
# 降低检查频率,减少 CPU 使用
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"自动刷新截图失败: {e}")
await asyncio.sleep(1) # 出错时等待更长时间
# 全局实例
captcha_controller = CaptchaRemoteController()

View File

@ -47,6 +47,580 @@ class XianyuSearcher:
self.context = None
self.page = None
self.api_responses = []
self.user_id = "default" # 默认用户ID
async def _handle_scratch_captcha_manual(self, page, max_retries=3, wait_for_completion=True):
"""人工处理刮刮乐滑块(远程控制 + 截图备份)
参数:
wait_for_completion: 是否等待用户完成验证
- True: 等待用户完成验证默认用于直接处理
- False: 创建会话后立即返回用于前端处理
"""
import random
logger.warning("=" * 60)
logger.warning("🎨 检测到刮刮乐验证,需要人工处理!")
logger.warning("=" * 60)
# 获取会话ID
session_id = getattr(self, 'user_id', 'default')
# 【新方案】启用远程控制
use_remote_control = getattr(self, 'use_remote_control', True)
if use_remote_control:
try:
from utils.captcha_remote_control import captcha_controller
# 创建远程控制会话
logger.warning(f"🌐 启动远程控制会话: {session_id}")
session_info = await captcha_controller.create_session(session_id, page)
# 获取控制页面URL
import socket
import os
# 尝试多种方式获取IP
local_ip = "localhost"
# 方法1从环境变量获取Docker/配置文件)
local_ip = os.getenv('SERVER_HOST') or os.getenv('PUBLIC_IP')
if not local_ip:
# 方法2尝试获取外网IP
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
local_ip = s.getsockname()[0]
s.close()
# 检查是否是Docker内网IP172.x.x.x 或 10.x.x.x
if local_ip.startswith('172.') or local_ip.startswith('10.'):
logger.warning(f"⚠️ 检测到Docker内网IP: {local_ip}")
local_ip = None # 重置使用localhost
except:
pass
if not local_ip:
local_ip = "localhost"
logger.warning("⚠️ 无法获取外网IP使用 localhost")
logger.warning("💡 如果在Docker中请设置环境变量 SERVER_HOST 为公网IP")
control_url = f"http://{local_ip}:8000/api/captcha/control/{session_id}"
logger.warning("=" * 60)
logger.warning(f"🌐 远程控制已启动!")
logger.warning(f"📱 请访问以下网址进行验证:")
logger.warning(f" {control_url}")
logger.warning("=" * 60)
logger.warning(f"💡 或直接访问: http://{local_ip}:8000/api/captcha/control")
logger.warning(f" 然后输入会话ID: {session_id}")
logger.warning("=" * 60)
# 如果不等待完成,立即返回特殊值给调用者
if not wait_for_completion:
logger.warning("⚠️ 不等待验证完成,立即返回给前端处理")
return 'need_captcha' # 返回特殊值,表示需要前端处理
# 等待用户完成
logger.warning("⏳ 等待用户通过网页完成验证...")
# 循环检查是否完成
max_wait_time = 180 # 3分钟
check_interval = 1 # 每秒检查一次
elapsed_time = 0
while elapsed_time < max_wait_time:
await asyncio.sleep(check_interval)
elapsed_time += check_interval
# 检查是否完成
if captcha_controller.is_completed(session_id):
logger.success("✅ 远程验证成功!")
await captcha_controller.close_session(session_id)
return True
# 每10秒提示一次
if elapsed_time % 10 == 0:
logger.info(f"⏳ 仍在等待...已等待 {elapsed_time}")
logger.error(f"❌ 远程验证超时({max_wait_time}秒)")
await captcha_controller.close_session(session_id)
return False
except Exception as e:
logger.error(f"远程控制启动失败: {e}")
logger.warning("⚠️ 降级使用传统方式")
logger.error("❌ 人工验证超时,已达到最大等待时间")
return False
async def _handle_scratch_captcha_async(self, page, max_retries=15):
"""异步处理刮刮乐类型滑块"""
import random
# 保存原始page对象用于鼠标操作
original_page = page
for attempt in range(1, max_retries + 1):
try:
logger.info(f"🎨 刮刮乐滑块处理尝试 {attempt}/{max_retries}")
# 重置page为原始对象
page = original_page
# 短暂等待(滑块已经存在,无需长时间等待)
if attempt == 1:
await asyncio.sleep(0.3)
else:
await asyncio.sleep(0.5)
# 1. 快速检查刮刮乐容器(不阻塞,极短超时)
try:
await page.wait_for_selector('#nocaptcha', timeout=500, state='attached')
logger.debug("✅ 刮刮乐容器 #nocaptcha 已加载")
await asyncio.sleep(0.2) # 等待容器内部元素加载
except:
# 容器未找到也继续,可能滑块还没出现
logger.debug("刮刮乐容器未立即加载,继续查找按钮...")
# 2. 查找滑块按钮先尝试主页面再尝试iframe
button_selectors = [
'#scratch-captcha-btn',
'.button#scratch-captcha-btn',
'div#scratch-captcha-btn',
'.scratch-captcha-slider .button',
'#nocaptcha .button',
'#nocaptcha .scratch-captcha-slider .button',
'.button'
]
slider_button = None
found_in_iframe = False
search_context = page # 用于查找元素的上下文
# 先在主页面查找(极速查找)
for selector in button_selectors:
try:
# 先尝试等待可见(极短超时)
slider_button = await page.wait_for_selector(selector, timeout=800, state='visible')
if slider_button:
logger.info(f"✅ 在主页面找到刮刮乐滑块按钮(可见): {selector}")
search_context = page
break
except:
# 如果等待可见失败尝试只等待存在attached
try:
slider_button = await page.wait_for_selector(selector, timeout=300, state='attached')
if slider_button:
logger.warning(f"⚠️ 在主页面找到刮刮乐滑块按钮(不可见但存在): {selector}")
search_context = page
break
except:
continue
# 如果主页面没找到尝试在iframe中查找极速查找
if not slider_button:
try:
frames = page.frames
logger.debug(f"检查 {len(frames)} 个frame...")
for frame in frames:
if frame == page.main_frame:
continue
for selector in button_selectors:
try:
slider_button = await frame.wait_for_selector(selector, timeout=500, state='visible')
if slider_button:
logger.info(f"✅ 在iframe中找到刮刮乐滑块按钮: {selector}")
found_in_iframe = True
search_context = frame # iframe上下文用于查找
break
except:
continue
if slider_button:
break
except Exception as e:
logger.debug(f"检查iframe时出错: {e}")
# 最后尝试使用JavaScript直接查找在search_context中
if not slider_button:
try:
logger.debug("尝试使用JavaScript直接查找滑块按钮...")
js_found = await search_context.evaluate("""
() => {
const btn = document.getElementById('scratch-captcha-btn') ||
document.querySelector('#scratch-captcha-btn') ||
document.querySelector('.button#scratch-captcha-btn');
if (btn) {
return {
found: true,
visible: btn.offsetParent !== null,
display: window.getComputedStyle(btn).display,
visibility: window.getComputedStyle(btn).visibility
};
}
return { found: false };
}
""")
if js_found and js_found.get('found'):
logger.warning(f"⚠️ JavaScript找到按钮但Playwright无法访问: visible={js_found.get('visible')}, display={js_found.get('display')}, visibility={js_found.get('visibility')}")
# 尝试通过query_selector获取元素强制操作
slider_button = await search_context.query_selector('#scratch-captcha-btn')
if slider_button:
logger.info("✅ query_selector找到按钮")
except Exception as e:
logger.debug(f"JavaScript查找失败: {e}")
if not slider_button:
logger.error("❌ 未找到刮刮乐滑块按钮(所有方法都已尝试)")
await asyncio.sleep(random.uniform(0.5, 1))
continue
# 2. 获取滑块位置和大小
button_box = await slider_button.bounding_box()
if not button_box:
# 尝试使用JavaScript强制获取位置
try:
logger.warning("⚠️ 尝试使用JavaScript获取按钮位置...")
js_box = await search_context.evaluate("""
() => {
const btn = document.getElementById('scratch-captcha-btn');
if (btn) {
const rect = btn.getBoundingClientRect();
return {
x: rect.x,
y: rect.y,
width: rect.width,
height: rect.height
};
}
return null;
}
""")
if js_box:
logger.info(f"✅ JavaScript获取到按钮位置: {js_box}")
button_box = js_box
else:
logger.error("❌ JavaScript也无法获取滑块按钮位置")
await asyncio.sleep(random.uniform(0.5, 1))
continue
except Exception as e:
logger.error(f"❌ 无法获取滑块按钮位置: {e}")
await asyncio.sleep(random.uniform(0.5, 1))
continue
# 3. 计算滑动距离25-35%
# 假设轨道宽度约为300px可以根据实际调整
estimated_track_width = 300
scratch_ratio = random.uniform(0.25, 0.35)
slide_distance = estimated_track_width * scratch_ratio
logger.warning(f"🎨 刮刮乐模式:计划滑动{scratch_ratio*100:.1f}%距离 ({slide_distance:.2f}px)")
# 4. 执行滑动
start_x = button_box['x'] + button_box['width'] / 2
start_y = button_box['y'] + button_box['height'] / 2
# 移动到滑块(优化等待时间)
await page.mouse.move(start_x, start_y)
await asyncio.sleep(random.uniform(0.1, 0.2))
# 按下鼠标
await page.mouse.down()
await asyncio.sleep(random.uniform(0.05, 0.1))
# 模拟人类化滑动轨迹(加快速度)
steps = random.randint(10, 15)
for i in range(steps):
progress = (i + 1) / steps
current_distance = slide_distance * progress
# 添加Y轴抖动
y_jitter = random.uniform(-2, 2)
await page.mouse.move(
start_x + current_distance,
start_y + y_jitter
)
await asyncio.sleep(random.uniform(0.005, 0.015))
# 5. 在目标位置停顿观察(缩短时间)
pause_duration = random.uniform(0.2, 0.3)
logger.warning(f"🎨 在目标位置停顿{pause_duration:.2f}秒观察...")
await asyncio.sleep(pause_duration)
# 6. 释放鼠标
await page.mouse.up()
await asyncio.sleep(random.uniform(0.3, 0.5))
# 7. 检查是否成功检查滑块frame是否消失
try:
# 等待验证结果
await asyncio.sleep(0.8)
# 检查主页面的滑块容器
captcha_in_main = await page.query_selector('#nocaptcha')
main_visible = False
if captcha_in_main:
try:
main_visible = await captcha_in_main.is_visible()
except:
main_visible = False
# 检查iframe中的滑块
iframe_visible = False
try:
frames = page.frames
for frame in frames:
if frame != page.main_frame:
captcha_in_iframe = await frame.query_selector('#nocaptcha')
if captcha_in_iframe:
try:
if await captcha_in_iframe.is_visible():
iframe_visible = True
break
except:
pass
except:
pass
# 判断成功主页面和iframe都没有可见的滑块
if not main_visible and not iframe_visible:
logger.success(f"✅ 刮刮乐验证成功!滑块已消失(第{attempt}次尝试)")
return True
else:
if main_visible:
logger.warning(f"⚠️ 主页面滑块仍可见,继续重试...")
if iframe_visible:
logger.warning(f"⚠️ iframe滑块仍可见继续重试...")
except Exception as e:
logger.warning(f"⚠️ 检查验证结果时出错: {e},继续重试...")
except Exception as e:
logger.error(f"❌ 刮刮乐处理异常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
await asyncio.sleep(random.uniform(0.5, 1))
continue
logger.error(f"❌ 刮刮乐验证失败,已达到最大重试次数 {max_retries}")
return False
async def handle_slider_verification(self, page, context=None, browser=None, playwright=None, max_retries=5):
"""
通用的滑块验证处理方法
参数:
page: Playwright 页面对象必需
context: Playwright 上下文对象可选如果不传则使用 self.context
browser: Playwright 浏览器对象可选如果不传则使用 self.browser
playwright: Playwright 实例可选如果不传则使用 self.playwright
max_retries: 最大重试次数默认5次
返回:
bool: True表示成功包括没有滑块或滑块验证成功False表示失败
"""
try:
# 等待页面加载滑块元素(优化等待时间)
await asyncio.sleep(1)
logger.info("🔍 开始检测滑块验证...")
# 使用传入的对象或实例属性
context = context or self.context
browser = browser or self.browser
playwright = playwright or getattr(self, 'playwright', None)
# 【调试】打印页面HTML内容查找滑块相关关键词
try:
page_content = await page.content()
has_captcha_keyword = any(keyword in page_content.lower() for keyword in [
'nocaptcha', 'scratch-captcha', 'captcha', 'slider', '滑块', '验证'
])
if has_captcha_keyword:
logger.warning("⚠️ 页面HTML中包含滑块相关关键词")
# 保存页面内容用于调试
if 'nocaptcha' in page_content or 'scratch-captcha' in page_content:
logger.warning("🎯 检测到刮刮乐类型滑块特征词!")
else:
logger.info("✅ 页面HTML中未发现滑块关键词")
except Exception as e:
logger.debug(f"检查页面内容时出错: {e}")
# 检测滑块元素(支持多种类型的滑块)
slider_selectors = [
# 阿里云盾 nc 系列滑块
'#nc_1_n1z',
'.nc-container',
'.nc_scale',
'.nc-wrapper',
'[class*="nc_"]',
'[id*="nc_"]',
# 刮刮乐 (scratch-captcha) 类型滑块
'#nocaptcha',
'.scratch-captcha-container',
'.scratch-captcha-slider',
'#scratch-captcha-btn',
'[class*="scratch-captcha"]',
'div[id="nocaptcha"]',
'div.scratch-captcha-container',
# 其他常见滑块类型
'.captcha-slider',
'.slider-captcha',
'[class*="captcha"]',
'[id*="captcha"]'
]
has_slider = False
detected_selector = None
found_elements = []
for selector in slider_selectors:
try:
element = await page.query_selector(selector)
if element:
found_elements.append(selector)
is_visible = await element.is_visible()
logger.debug(f"找到元素 {selector},可见性: {is_visible}")
if is_visible:
logger.info(f"✅ 检测到滑块验证元素: {selector}")
has_slider = True
detected_selector = selector
break
except Exception as e:
logger.debug(f"选择器 {selector} 检测出错: {e}")
continue
# 输出调试信息
if found_elements:
logger.warning(f"🔍 找到以下滑块元素(但可能不可见): {', '.join(found_elements)}")
# 如果找到了元素但不可见,强制认为有滑块
if not has_slider and any('captcha' in sel.lower() or 'slider' in sel.lower() for sel in found_elements):
logger.warning("⚠️ 检测到滑块元素但不可见,仍然尝试处理")
has_slider = True
detected_selector = found_elements[0]
else:
logger.debug("未找到任何滑块选择器匹配的元素")
# 【额外检测】检查 iframe 中的滑块
if not has_slider:
try:
frames = page.frames
logger.debug(f"检测到 {len(frames)} 个 frame")
for frame in frames:
if frame != page.main_frame:
try:
iframe_content = await frame.content()
# 更精确的刮刮乐检测:必须包含明确特征
has_scratch_features = 'scratch-captcha' in iframe_content or \
('nocaptcha' in iframe_content and 'scratch' in iframe_content)
if has_scratch_features:
logger.warning("🎯 在 iframe 中检测到刮刮乐滑块!")
has_slider = True
detected_selector = "iframe-scratch-captcha"
break
except:
continue
except Exception as e:
logger.debug(f"检查 iframe 时出错: {e}")
# 如果没有检测到滑块,直接返回成功
if not has_slider:
logger.info("✅ 未检测到滑块验证,继续执行")
return True
# 检测到滑块,开始处理
logger.warning(f"⚠️ 检测到滑块验证({detected_selector}),开始处理...")
# 检测是否为刮刮乐类型(更精确的判断)
is_scratch_captcha = False
# 明确的刮刮乐特征
if 'scratch' in detected_selector.lower():
is_scratch_captcha = True
# 如果选择器是 #nocaptcha 但不是 nc 系列的标准滑块,则进一步检查
elif detected_selector in ['#nocaptcha', 'iframe-scratch-captcha']:
try:
page_html = await page.content()
# 检查是否有刮刮乐的明确特征
has_scratch_features = 'scratch-captcha' in page_html or \
('Release the slider' in page_html) or \
('fully appears' in page_html)
is_scratch_captcha = has_scratch_features
except:
is_scratch_captcha = False
if is_scratch_captcha:
logger.warning("🎨 检测到刮刮乐类型滑块")
# 人工处理模式 - 等待用户完成验证
logger.warning("⚠️ 刮刮乐需要人工处理,等待验证完成")
slider_success = await self._handle_scratch_captcha_manual(page, max_retries=3, wait_for_completion=True)
else:
actual_max_retries = max_retries
slider_success = None
try:
# 刮刮乐已经处理过了,直接检查结果
if is_scratch_captcha:
pass # slider_success 已经在上面设置
else:
# 普通滑块:使用 XianyuSliderStealth同步API
from utils.xianyu_slider_stealth import XianyuSliderStealth
# 创建滑块处理实例
slider_handler = XianyuSliderStealth(
user_id=getattr(self, 'user_id', 'default'),
enable_learning=True,
headless=True
)
# 将现有的浏览器对象传递给滑块处理器(复用现有浏览器)
slider_handler.page = page
slider_handler.context = context
slider_handler.browser = browser
slider_handler.playwright = playwright
# 调用滑块处理方法
logger.info(f"🎯 开始处理滑块验证(最多尝试 {actual_max_retries} 次)...")
slider_success = slider_handler.solve_slider(max_retries=actual_max_retries)
# 清除引用,防止 XianyuSliderStealth 尝试关闭我们的浏览器
slider_handler.page = None
slider_handler.context = None
slider_handler.browser = None
slider_handler.playwright = None
if slider_success:
logger.success("✅ 滑块验证成功!")
return True
else:
logger.error("❌ 滑块验证失败")
return False
except Exception as e:
logger.error(f"❌ 滑块验证处理异常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
# 确保清除引用
try:
if 'slider_handler' in locals():
slider_handler.page = None
slider_handler.context = None
slider_handler.browser = None
slider_handler.playwright = None
except:
pass
return False
except Exception as e:
logger.error(f"❌ 滑块检测过程异常: {str(e)}")
import traceback
logger.error(traceback.format_exc())
return False
async def safe_get(self, data, *keys, default="暂无"):
"""安全获取嵌套字典值"""
@ -109,13 +683,19 @@ class XianyuSearcher:
return False
async def init_browser(self):
"""初始化浏览器"""
"""初始化浏览器使用持久化上下文保留缓存和cookies"""
if not PLAYWRIGHT_AVAILABLE:
raise Exception("Playwright 未安装,无法使用真实搜索功能")
if not self.browser:
playwright = await async_playwright().start()
logger.info("正在启动浏览器...")
# 设置持久化数据目录保存缓存、cookies等
import tempfile
user_data_dir = os.path.join(tempfile.gettempdir(), 'xianyu_browser_cache')
os.makedirs(user_data_dir, exist_ok=True)
logger.info(f"使用持久化数据目录(保留缓存): {user_data_dir}")
# 简化的浏览器启动参数,避免冲突
browser_args = [
'--no-sandbox',
@ -124,7 +704,10 @@ class XianyuSearcher:
'--no-first-run',
'--disable-extensions',
'--disable-default-apps',
'--no-default-browser-check'
'--no-default-browser-check',
# 中文语言设置
'--lang=zh-CN',
'--accept-lang=zh-CN,zh,en-US,en'
]
# 只在确实是Docker环境时添加额外参数
@ -135,37 +718,50 @@ class XianyuSearcher:
# '--single-process' # 注释掉,避免崩溃
])
logger.info("正在启动浏览器...")
self.browser = await playwright.chromium.launch(
logger.info("正在启动浏览器(中文模式,持久化缓存)...")
# 使用 launch_persistent_context 实现跨会话的缓存持久化
# 这样通过一次滑块验证后,下次搜索可以复用缓存,避免再次出现滑块
self.context = await playwright.chromium.launch_persistent_context(
user_data_dir, # 第一个参数是用户数据目录,用于持久化
headless=True, # 无头模式,后台运行
args=browser_args
)
logger.info("浏览器启动成功,创建上下文...")
# 简化上下文创建,减少可能的问题
self.context = await self.browser.new_context(
args=browser_args,
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
viewport={'width': 1280, 'height': 720}
viewport={'width': 1280, 'height': 720},
locale='zh-CN', # 设置语言为中文
# 持久化上下文会自动保存和加载:
# - Cookies
# - 缓存
# - LocalStorage
# - SessionStorage
# - 其他浏览器状态
)
# launch_persistent_context 返回的是 context不是 browser
# 需要通过 context.browser 获取 browser 对象
self.browser = self.context.browser
logger.info("浏览器启动成功(持久化上下文已创建)...")
logger.info("创建页面...")
self.page = await self.context.new_page()
logger.info("浏览器初始化完成")
logger.info("浏览器初始化完成(缓存将持久化保存)")
async def close_browser(self):
"""关闭浏览器"""
"""关闭浏览器持久化上下文会自动保存缓存和cookies"""
try:
if self.page:
await self.page.close()
self.page = None
# 注意:使用 persistent_context 时,关闭 context 会自动保存所有数据
if self.context:
await self.context.close()
self.context = None
if self.browser:
await self.browser.close()
self.browser = None
logger.debug("商品搜索器浏览器已关闭")
# persistent_context 的 browser 会在 context 关闭时自动关闭
# 不需要单独关闭 browser
self.browser = None
logger.debug("商品搜索器浏览器已关闭(缓存已保存)")
except Exception as e:
logger.warning(f"关闭商品搜索器浏览器时出错: {e}")
@ -255,6 +851,8 @@ class XianyuSearcher:
# 刷新页面以应用cookies
await self.page.reload()
await asyncio.sleep(2)
await self.page.wait_for_load_state("networkidle", timeout=10000)
@ -265,19 +863,32 @@ class XianyuSearcher:
self.page.on("response", on_response)
await self.page.click('button[type="submit"]')
await self.page.wait_for_load_state("networkidle", timeout=15000)
# 等待第一页API响应
# 等待第一页API响应(缩短等待时间)
logger.info("等待第一页API响应...")
await asyncio.sleep(5)
await asyncio.sleep(2)
# 尝试处理弹窗
try:
await self.page.keyboard.press('Escape')
await asyncio.sleep(1)
await asyncio.sleep(0.5)
except:
pass
# 【核心】检测并处理滑块验证 → 使用公共方法
logger.info(f"检测是否有滑块验证...")
slider_result = await self.handle_slider_verification(
page=self.page,
context=self.context,
browser=self.browser,
playwright=getattr(self, 'playwright', None),
max_retries=5
)
if not slider_result:
logger.error(f"❌ 滑块验证失败,搜索终止")
return None
# 等待更多数据
await asyncio.sleep(3)
@ -709,19 +1320,36 @@ class XianyuSearcher:
logger.info("🖱️ 准备点击搜索按钮...")
await self.page.click('button[type="submit"]')
logger.info("✅ 搜索按钮已点击")
await self.page.wait_for_load_state("networkidle", timeout=15000)
# 等待第一页API响应
# 等待第一页API响应(优化等待时间)
logger.info("等待第一页API响应...")
await asyncio.sleep(10) # 增加等待时间
await asyncio.sleep(3)
# 尝试处理弹窗
try:
await self.page.keyboard.press('Escape')
await asyncio.sleep(1)
await asyncio.sleep(0.5)
except:
pass
# 【核心】检测并处理滑块验证 → 使用公共方法
logger.info(f"检测是否有滑块验证...")
slider_result = await self.handle_slider_verification(
page=self.page,
context=self.context,
browser=self.browser,
playwright=getattr(self, 'playwright', None),
max_retries=5
)
if not slider_result:
logger.error(f"❌ 滑块验证失败,搜索终止")
return {
'items': [],
'total': 0,
'error': '滑块验证失败'
}
# 等待更多数据
await asyncio.sleep(3)
@ -955,7 +1583,7 @@ async def search_multiple_pages_xianyu(keyword: str, total_pages: int = 1) -> Di
Returns:
搜索结果
"""
max_retries = 2
max_retries = 0
retry_delay = 5 # 秒,增加重试间隔
for attempt in range(max_retries + 1):

View File

@ -413,7 +413,7 @@ class QRLoginManager:
'status': session.status,
'session_id': session_id
}
logger.info(f"获取会话状态: {result}")
# 如果需要验证返回验证URL
if session.status == 'verification_required' and session.verification_url:
result['verification_url'] = session.verification_url

2262
utils/slider_patch.py Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,88 +0,0 @@
# This file was generated by Nuitka
# Stubs included by default
from __future__ import annotations
from config import SLIDER_VERIFICATION
from datetime import datetime
from loguru import logger
from playwright.sync_api import ElementHandle, sync_playwright
from typing import Any, Dict, List, Optional, Tuple
from typing_extensions import Self
import json
import os
import random
import shutil
import tempfile
import threading
import time
SLIDER_MAX_CONCURRENT = 3
SLIDER_WAIT_TIMEOUT = 60
class SliderConcurrencyManager:
def __new__(cls: cls) -> Any: ...
def __init__(self: Self) -> None: ...
def can_start_instance(self: Self, user_id: str) -> bool: ...
def wait_for_slot(self: Self, user_id: str, timeout: int) -> bool: ...
def register_instance(self: Self, user_id: str, instance: Any) -> Any: ...
def unregister_instance(self: Self, user_id: str) -> Any: ...
def _extract_pure_user_id(self: Self, user_id: str) -> str: ...
def get_stats(self: Self) -> Any: ...
class XianyuSliderStealth:
def __init__(self: Self, user_id: str, enable_learning: bool, headless: bool) -> None: ...
def _check_date_validity(self: Self) -> bool: ...
def init_browser(self: Self) -> Any: ...
def _cleanup_on_init_failure(self: Self) -> Any: ...
def _load_success_history(self: Self) -> List[Dict[str, Any]]: ...
def _save_success_record(self: Self, trajectory_data: Dict[str, Any]) -> Any: ...
def _optimize_trajectory_params(self: Self) -> Dict[str, Any]: ...
def _get_cookies_after_success(self: Self) -> Any: ...
def _save_cookies_to_file(self: Self, cookies: Any) -> Any: ...
def _get_random_browser_features(self: Self) -> Any: ...
def _get_stealth_script(self: Self, browser_features: Any) -> Any: ...
def generate_human_trajectory(self: Self, distance: float) -> Any: ...
def simulate_slide(self: Self, slider_button: ElementHandle, trajectory: Any) -> Any: ...
def find_slider_elements(self: Self) -> Any: ...
def calculate_slide_distance(self: Self, slider_button: ElementHandle, slider_track: ElementHandle) -> Any: ...
def check_verification_success(self: Self, slider_button: ElementHandle) -> Any: ...
def check_page_changed(self: Self) -> Any: ...
def check_verification_failure(self: Self) -> Any: ...
def solve_slider(self: Self) -> Any: ...
def close_browser(self: Self) -> Any: ...
def __del__(self: Self) -> Any: ...
def login_with_password_headful(self: Self, account: str, password: str, show_browser: bool) -> Any: ...
def run(self: Self, url: str) -> Any: ...
def get_slider_stats() -> Any:
...
__name__ = ...
# Modules used internally, to allow implicit dependencies to be seen:
import time
import random
import json
import os
import threading
import tempfile
import shutil
import datetime
import playwright
import playwright.sync_api
import playwright.sync_api.sync_playwright
import playwright.sync_api.ElementHandle
import typing
import loguru
import loguru.logger
import config
import traceback
import re
import DrissionPage
import DrissionPage.ChromiumPage
import DrissionPage.ChromiumOptions
import subprocess
import platform
import sys