xianyu-backend-java/utils/captcha_remote_control.py
2025-11-24 16:24:09 +08:00

369 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
刮刮乐远程控制模块
通过 WebSocket 实时传输页面截图到前端,并接收用户操作
"""
import asyncio
import base64
import json
from typing import Optional, Dict, Any
from loguru import logger
from playwright.async_api import Page
class CaptchaRemoteController:
"""刮刮乐远程控制器"""
def __init__(self):
self.active_sessions: Dict[str, Dict[str, Any]] = {}
self.websocket_connections: Dict[str, Any] = {}
async def create_session(self, session_id: str, page: Page) -> Dict[str, str]:
"""
创建远程控制会话
Args:
session_id: 会话ID通常是用户ID
page: Playwright Page 对象
Returns:
包含会话信息的字典
"""
# 获取滑块元素位置
captcha_info = await self._get_captcha_info(page)
# 只截取滑块区域,不截取整个页面(性能优化)
screenshot_bytes = await self._screenshot_captcha_area(page, captcha_info)
screenshot_base64 = base64.b64encode(screenshot_bytes).decode('utf-8')
# 获取视口大小
try:
viewport = page.viewport_size
if viewport is None:
# 如果没有设置viewport使用默认值或通过JS获取
viewport = await page.evaluate("() => ({width: window.innerWidth, height: window.innerHeight})")
except:
viewport = {'width': 1280, 'height': 720} # 默认值
# 存储会话
self.active_sessions[session_id] = {
'page': page,
'screenshot': screenshot_base64,
'captcha_info': captcha_info,
'completed': False,
'viewport': viewport
}
logger.info(f"✅ 创建远程控制会话: {session_id}")
return {
'session_id': session_id,
'screenshot': screenshot_base64,
'captcha_info': captcha_info,
'viewport': self.active_sessions[session_id]['viewport']
}
async def _screenshot_captcha_area(self, page: Page, captcha_info: Dict[str, Any]) -> bytes:
"""截取整个验证码容器区域"""
try:
if captcha_info and 'x' in captcha_info:
# 直接截取整个容器,稍微留一点边距
x = max(0, captcha_info['x'] - 10)
y = max(0, captcha_info['y'] - 10)
width = captcha_info['width'] + 20
height = captcha_info['height'] + 20
# 截取整个验证码容器
screenshot_bytes = await page.screenshot(
type='jpeg',
quality=80, # 验证码区域用高质量
clip={
'x': x,
'y': y,
'width': width,
'height': height
}
)
logger.info(f"✅ 截取验证码容器: {width}x{height} (包含完整验证码)")
return screenshot_bytes
else:
# 如果没有找到滑块,截取整个页面
logger.warning("未找到滑块位置,截取整个页面")
return await page.screenshot(type='jpeg', quality=75, full_page=False)
except Exception as e:
logger.warning(f"截取滑块区域失败,使用全页面: {e}")
return await page.screenshot(type='jpeg', quality=75, full_page=False)
async def _get_captcha_info(self, page: Page) -> Dict[str, Any]:
"""获取滑块验证码信息(查找整个容器)"""
try:
# 优先查找整个验证码容器(不是按钮)
container_selectors = [
'#nocaptcha', # 完整的验证码容器
'.scratch-captcha-container',
'[id*="captcha"]',
'.nc-container'
]
# 先在主页面查找
for selector in container_selectors:
try:
element = await page.query_selector(selector)
if element:
box = await element.bounding_box()
if box and box['width'] > 100 and box['height'] > 100: # 确保找到的是容器
logger.info(f"✅ 在主页面找到验证码容器: {selector}, 大小: {box['width']}x{box['height']}")
return {
'selector': selector,
'x': box['x'],
'y': box['y'],
'width': box['width'],
'height': box['height'],
'in_iframe': False
}
except Exception as e:
logger.debug(f"检查选择器 {selector} 失败: {e}")
continue
# 在 iframe 中查找
frames = page.frames
for frame in frames:
if frame != page.main_frame:
for selector in container_selectors:
try:
element = await frame.query_selector(selector)
if element:
box = await element.bounding_box()
if box and box['width'] > 100 and box['height'] > 100:
logger.info(f"✅ 在iframe找到验证码容器: {selector}, 大小: {box['width']}x{box['height']}")
return {
'selector': selector,
'x': box['x'],
'y': box['y'],
'width': box['width'],
'height': box['height'],
'in_iframe': True
# 注意:不保存 frame 对象,因为不能被 JSON 序列化
}
except Exception as e:
logger.debug(f"iframe检查选择器 {selector} 失败: {e}")
continue
logger.warning("⚠️ 未找到验证码容器")
return None
except Exception as e:
logger.error(f"获取滑块信息失败: {e}")
return None
async def update_screenshot(self, session_id: str, quality: int = 75) -> Optional[str]:
"""更新会话的截图(截取整个验证码容器)"""
if session_id not in self.active_sessions:
return None
try:
page = self.active_sessions[session_id]['page']
captcha_info = self.active_sessions[session_id].get('captcha_info')
# 截取整个验证码容器
if captcha_info and 'x' in captcha_info:
x = max(0, captcha_info['x'] - 10)
y = max(0, captcha_info['y'] - 10)
width = captcha_info['width'] + 20
height = captcha_info['height'] + 20
screenshot_bytes = await page.screenshot(
type='jpeg',
quality=quality,
clip={'x': x, 'y': y, 'width': width, 'height': height}
)
else:
# 降级方案:截取整个页面
screenshot_bytes = await page.screenshot(
type='jpeg',
quality=quality,
full_page=False
)
screenshot_base64 = base64.b64encode(screenshot_bytes).decode('utf-8')
self.active_sessions[session_id]['screenshot'] = screenshot_base64
return screenshot_base64
except Exception as e:
logger.error(f"更新截图失败: {e}")
return None
async def handle_mouse_event(self, session_id: str, event_type: str, x: int, y: int) -> bool:
"""
处理鼠标事件
Args:
session_id: 会话ID
event_type: 事件类型 (down/move/up)
x: X坐标
y: Y坐标
Returns:
是否成功
"""
if session_id not in self.active_sessions:
logger.warning(f"会话不存在: {session_id}")
return False
try:
page = self.active_sessions[session_id]['page']
if event_type == 'down':
await page.mouse.move(x, y)
await page.mouse.down()
logger.debug(f"鼠标按下: ({x}, {y})")
elif event_type == 'move':
await page.mouse.move(x, y)
logger.debug(f"鼠标移动: ({x}, {y})")
elif event_type == 'up':
await page.mouse.up()
logger.debug(f"鼠标释放: ({x}, {y})")
else:
logger.warning(f"未知事件类型: {event_type}")
return False
return True
except Exception as e:
logger.error(f"处理鼠标事件失败: {e}")
return False
async def check_completion(self, session_id: str) -> bool:
"""检查验证是否完成(更严格的判断)"""
if session_id not in self.active_sessions:
return False
try:
page = self.active_sessions[session_id]['page']
# 多个选择器检查,确保更准确
captcha_selectors = [
'#nocaptcha',
'#scratch-captcha-btn',
'.scratch-captcha-container',
'.scratch-captcha-slider'
]
found_visible_captcha = False
# 检查主页面
for selector in captcha_selectors:
try:
element = await page.query_selector(selector)
if element:
is_visible = await element.is_visible()
if is_visible:
logger.debug(f"主页面发现可见滑块: {selector}")
found_visible_captcha = True
break
except:
continue
if found_visible_captcha:
return False
# 检查所有 iframe
frames = page.frames
for frame in frames:
if frame != page.main_frame:
for selector in captcha_selectors:
try:
element = await frame.query_selector(selector)
if element:
is_visible = await element.is_visible()
if is_visible:
logger.debug(f"iframe中发现可见滑块: {selector}")
found_visible_captcha = True
break
except:
continue
if found_visible_captcha:
break
if found_visible_captcha:
return False
# 额外检查:看页面内容是否还包含滑块相关文字
try:
page_content = await page.content()
captcha_keywords = ['scratch-captcha', 'nocaptcha', 'slider-btn']
# 如果页面中仍然有大量滑块相关内容,可能还未完成
keyword_count = sum(1 for kw in captcha_keywords if kw in page_content)
if keyword_count >= 2:
logger.debug(f"页面中仍有 {keyword_count} 个滑块关键词")
return False
except:
pass
# 所有检查都通过,认为验证完成
logger.success(f"✅ 验证完成(所有滑块元素已消失): {session_id}")
self.active_sessions[session_id]['completed'] = True
return True
except Exception as e:
logger.error(f"检查完成状态失败: {e}")
# 出错时返回 False不要误判为成功
return False
def is_completed(self, session_id: str) -> bool:
"""检查会话是否已完成"""
if session_id not in self.active_sessions:
return False
return self.active_sessions[session_id].get('completed', False)
def session_exists(self, session_id: str) -> bool:
"""检查会话是否存在"""
return session_id in self.active_sessions
async def close_session(self, session_id: str):
"""关闭会话"""
if session_id in self.active_sessions:
del self.active_sessions[session_id]
logger.info(f"🔒 关闭远程控制会话: {session_id}")
async def auto_refresh_screenshot(self, session_id: str, interval: float = 1.0):
"""自动刷新截图(优化版:按需更新)"""
last_update_time = asyncio.get_event_loop().time()
while session_id in self.active_sessions and not self.is_completed(session_id):
try:
current_time = asyncio.get_event_loop().time()
# 使用自适应刷新:空闲时降低频率
if current_time - last_update_time >= interval:
screenshot = await self.update_screenshot(session_id, quality=55) # 降低质量提升性能
if screenshot and session_id in self.websocket_connections:
try:
ws = self.websocket_connections[session_id]
await ws.send_json({
'type': 'screenshot_update',
'screenshot': screenshot
})
last_update_time = current_time
except:
# WebSocket 可能已断开
break
# 降低检查频率,减少 CPU 使用
await asyncio.sleep(0.5)
except Exception as e:
logger.error(f"自动刷新截图失败: {e}")
await asyncio.sleep(1) # 出错时等待更长时间
# 全局实例
captcha_controller = CaptchaRemoteController()