369 lines
15 KiB
Python
369 lines
15 KiB
Python
"""
|
||
刮刮乐远程控制模块
|
||
通过 WebSocket 实时传输页面截图到前端,并接收用户操作
|
||
"""
|
||
|
||
import asyncio
|
||
import base64
|
||
import json
|
||
from typing import Optional, Dict, Any
|
||
from loguru import logger
|
||
from playwright.async_api import Page
|
||
|
||
|
||
class CaptchaRemoteController:
|
||
"""刮刮乐远程控制器"""
|
||
|
||
def __init__(self):
|
||
self.active_sessions: Dict[str, Dict[str, Any]] = {}
|
||
self.websocket_connections: Dict[str, Any] = {}
|
||
|
||
async def create_session(self, session_id: str, page: Page) -> Dict[str, str]:
|
||
"""
|
||
创建远程控制会话
|
||
|
||
Args:
|
||
session_id: 会话ID(通常是用户ID)
|
||
page: Playwright Page 对象
|
||
|
||
Returns:
|
||
包含会话信息的字典
|
||
"""
|
||
# 获取滑块元素位置
|
||
captcha_info = await self._get_captcha_info(page)
|
||
|
||
# 只截取滑块区域,不截取整个页面(性能优化)
|
||
screenshot_bytes = await self._screenshot_captcha_area(page, captcha_info)
|
||
screenshot_base64 = base64.b64encode(screenshot_bytes).decode('utf-8')
|
||
|
||
# 获取视口大小
|
||
try:
|
||
viewport = page.viewport_size
|
||
if viewport is None:
|
||
# 如果没有设置viewport,使用默认值或通过JS获取
|
||
viewport = await page.evaluate("() => ({width: window.innerWidth, height: window.innerHeight})")
|
||
except:
|
||
viewport = {'width': 1280, 'height': 720} # 默认值
|
||
|
||
# 存储会话
|
||
self.active_sessions[session_id] = {
|
||
'page': page,
|
||
'screenshot': screenshot_base64,
|
||
'captcha_info': captcha_info,
|
||
'completed': False,
|
||
'viewport': viewport
|
||
}
|
||
|
||
logger.info(f"✅ 创建远程控制会话: {session_id}")
|
||
|
||
return {
|
||
'session_id': session_id,
|
||
'screenshot': screenshot_base64,
|
||
'captcha_info': captcha_info,
|
||
'viewport': self.active_sessions[session_id]['viewport']
|
||
}
|
||
|
||
async def _screenshot_captcha_area(self, page: Page, captcha_info: Dict[str, Any]) -> bytes:
|
||
"""截取整个验证码容器区域"""
|
||
try:
|
||
if captcha_info and 'x' in captcha_info:
|
||
# 直接截取整个容器,稍微留一点边距
|
||
x = max(0, captcha_info['x'] - 10)
|
||
y = max(0, captcha_info['y'] - 10)
|
||
width = captcha_info['width'] + 20
|
||
height = captcha_info['height'] + 20
|
||
|
||
# 截取整个验证码容器
|
||
screenshot_bytes = await page.screenshot(
|
||
type='jpeg',
|
||
quality=80, # 验证码区域用高质量
|
||
clip={
|
||
'x': x,
|
||
'y': y,
|
||
'width': width,
|
||
'height': height
|
||
}
|
||
)
|
||
logger.info(f"✅ 截取验证码容器: {width}x{height} (包含完整验证码)")
|
||
return screenshot_bytes
|
||
else:
|
||
# 如果没有找到滑块,截取整个页面
|
||
logger.warning("未找到滑块位置,截取整个页面")
|
||
return await page.screenshot(type='jpeg', quality=75, full_page=False)
|
||
|
||
except Exception as e:
|
||
logger.warning(f"截取滑块区域失败,使用全页面: {e}")
|
||
return await page.screenshot(type='jpeg', quality=75, full_page=False)
|
||
|
||
async def _get_captcha_info(self, page: Page) -> Dict[str, Any]:
|
||
"""获取滑块验证码信息(查找整个容器)"""
|
||
try:
|
||
# 优先查找整个验证码容器(不是按钮)
|
||
container_selectors = [
|
||
'#nocaptcha', # 完整的验证码容器
|
||
'.scratch-captcha-container',
|
||
'[id*="captcha"]',
|
||
'.nc-container'
|
||
]
|
||
|
||
# 先在主页面查找
|
||
for selector in container_selectors:
|
||
try:
|
||
element = await page.query_selector(selector)
|
||
if element:
|
||
box = await element.bounding_box()
|
||
if box and box['width'] > 100 and box['height'] > 100: # 确保找到的是容器
|
||
logger.info(f"✅ 在主页面找到验证码容器: {selector}, 大小: {box['width']}x{box['height']}")
|
||
return {
|
||
'selector': selector,
|
||
'x': box['x'],
|
||
'y': box['y'],
|
||
'width': box['width'],
|
||
'height': box['height'],
|
||
'in_iframe': False
|
||
}
|
||
except Exception as e:
|
||
logger.debug(f"检查选择器 {selector} 失败: {e}")
|
||
continue
|
||
|
||
# 在 iframe 中查找
|
||
frames = page.frames
|
||
for frame in frames:
|
||
if frame != page.main_frame:
|
||
for selector in container_selectors:
|
||
try:
|
||
element = await frame.query_selector(selector)
|
||
if element:
|
||
box = await element.bounding_box()
|
||
if box and box['width'] > 100 and box['height'] > 100:
|
||
logger.info(f"✅ 在iframe找到验证码容器: {selector}, 大小: {box['width']}x{box['height']}")
|
||
return {
|
||
'selector': selector,
|
||
'x': box['x'],
|
||
'y': box['y'],
|
||
'width': box['width'],
|
||
'height': box['height'],
|
||
'in_iframe': True
|
||
# 注意:不保存 frame 对象,因为不能被 JSON 序列化
|
||
}
|
||
except Exception as e:
|
||
logger.debug(f"iframe检查选择器 {selector} 失败: {e}")
|
||
continue
|
||
|
||
logger.warning("⚠️ 未找到验证码容器")
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取滑块信息失败: {e}")
|
||
return None
|
||
|
||
async def update_screenshot(self, session_id: str, quality: int = 75) -> Optional[str]:
|
||
"""更新会话的截图(截取整个验证码容器)"""
|
||
if session_id not in self.active_sessions:
|
||
return None
|
||
|
||
try:
|
||
page = self.active_sessions[session_id]['page']
|
||
captcha_info = self.active_sessions[session_id].get('captcha_info')
|
||
|
||
# 截取整个验证码容器
|
||
if captcha_info and 'x' in captcha_info:
|
||
x = max(0, captcha_info['x'] - 10)
|
||
y = max(0, captcha_info['y'] - 10)
|
||
width = captcha_info['width'] + 20
|
||
height = captcha_info['height'] + 20
|
||
|
||
screenshot_bytes = await page.screenshot(
|
||
type='jpeg',
|
||
quality=quality,
|
||
clip={'x': x, 'y': y, 'width': width, 'height': height}
|
||
)
|
||
else:
|
||
# 降级方案:截取整个页面
|
||
screenshot_bytes = await page.screenshot(
|
||
type='jpeg',
|
||
quality=quality,
|
||
full_page=False
|
||
)
|
||
|
||
screenshot_base64 = base64.b64encode(screenshot_bytes).decode('utf-8')
|
||
self.active_sessions[session_id]['screenshot'] = screenshot_base64
|
||
return screenshot_base64
|
||
|
||
except Exception as e:
|
||
logger.error(f"更新截图失败: {e}")
|
||
return None
|
||
|
||
async def handle_mouse_event(self, session_id: str, event_type: str, x: int, y: int) -> bool:
|
||
"""
|
||
处理鼠标事件
|
||
|
||
Args:
|
||
session_id: 会话ID
|
||
event_type: 事件类型 (down/move/up)
|
||
x: X坐标
|
||
y: Y坐标
|
||
|
||
Returns:
|
||
是否成功
|
||
"""
|
||
if session_id not in self.active_sessions:
|
||
logger.warning(f"会话不存在: {session_id}")
|
||
return False
|
||
|
||
try:
|
||
page = self.active_sessions[session_id]['page']
|
||
|
||
if event_type == 'down':
|
||
await page.mouse.move(x, y)
|
||
await page.mouse.down()
|
||
logger.debug(f"鼠标按下: ({x}, {y})")
|
||
|
||
elif event_type == 'move':
|
||
await page.mouse.move(x, y)
|
||
logger.debug(f"鼠标移动: ({x}, {y})")
|
||
|
||
elif event_type == 'up':
|
||
await page.mouse.up()
|
||
logger.debug(f"鼠标释放: ({x}, {y})")
|
||
|
||
else:
|
||
logger.warning(f"未知事件类型: {event_type}")
|
||
return False
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理鼠标事件失败: {e}")
|
||
return False
|
||
|
||
async def check_completion(self, session_id: str) -> bool:
|
||
"""检查验证是否完成(更严格的判断)"""
|
||
if session_id not in self.active_sessions:
|
||
return False
|
||
|
||
try:
|
||
page = self.active_sessions[session_id]['page']
|
||
|
||
# 多个选择器检查,确保更准确
|
||
captcha_selectors = [
|
||
'#nocaptcha',
|
||
'#scratch-captcha-btn',
|
||
'.scratch-captcha-container',
|
||
'.scratch-captcha-slider'
|
||
]
|
||
|
||
found_visible_captcha = False
|
||
|
||
# 检查主页面
|
||
for selector in captcha_selectors:
|
||
try:
|
||
element = await page.query_selector(selector)
|
||
if element:
|
||
is_visible = await element.is_visible()
|
||
if is_visible:
|
||
logger.debug(f"主页面发现可见滑块: {selector}")
|
||
found_visible_captcha = True
|
||
break
|
||
except:
|
||
continue
|
||
|
||
if found_visible_captcha:
|
||
return False
|
||
|
||
# 检查所有 iframe
|
||
frames = page.frames
|
||
for frame in frames:
|
||
if frame != page.main_frame:
|
||
for selector in captcha_selectors:
|
||
try:
|
||
element = await frame.query_selector(selector)
|
||
if element:
|
||
is_visible = await element.is_visible()
|
||
if is_visible:
|
||
logger.debug(f"iframe中发现可见滑块: {selector}")
|
||
found_visible_captcha = True
|
||
break
|
||
except:
|
||
continue
|
||
if found_visible_captcha:
|
||
break
|
||
|
||
if found_visible_captcha:
|
||
return False
|
||
|
||
# 额外检查:看页面内容是否还包含滑块相关文字
|
||
try:
|
||
page_content = await page.content()
|
||
captcha_keywords = ['scratch-captcha', 'nocaptcha', 'slider-btn']
|
||
|
||
# 如果页面中仍然有大量滑块相关内容,可能还未完成
|
||
keyword_count = sum(1 for kw in captcha_keywords if kw in page_content)
|
||
if keyword_count >= 2:
|
||
logger.debug(f"页面中仍有 {keyword_count} 个滑块关键词")
|
||
return False
|
||
except:
|
||
pass
|
||
|
||
# 所有检查都通过,认为验证完成
|
||
logger.success(f"✅ 验证完成(所有滑块元素已消失): {session_id}")
|
||
self.active_sessions[session_id]['completed'] = True
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"检查完成状态失败: {e}")
|
||
# 出错时返回 False,不要误判为成功
|
||
return False
|
||
|
||
def is_completed(self, session_id: str) -> bool:
|
||
"""检查会话是否已完成"""
|
||
if session_id not in self.active_sessions:
|
||
return False
|
||
return self.active_sessions[session_id].get('completed', False)
|
||
|
||
def session_exists(self, session_id: str) -> bool:
|
||
"""检查会话是否存在"""
|
||
return session_id in self.active_sessions
|
||
|
||
async def close_session(self, session_id: str):
|
||
"""关闭会话"""
|
||
if session_id in self.active_sessions:
|
||
del self.active_sessions[session_id]
|
||
logger.info(f"🔒 关闭远程控制会话: {session_id}")
|
||
|
||
async def auto_refresh_screenshot(self, session_id: str, interval: float = 1.0):
|
||
"""自动刷新截图(优化版:按需更新)"""
|
||
last_update_time = asyncio.get_event_loop().time()
|
||
|
||
while session_id in self.active_sessions and not self.is_completed(session_id):
|
||
try:
|
||
current_time = asyncio.get_event_loop().time()
|
||
|
||
# 使用自适应刷新:空闲时降低频率
|
||
if current_time - last_update_time >= interval:
|
||
screenshot = await self.update_screenshot(session_id, quality=55) # 降低质量提升性能
|
||
|
||
if screenshot and session_id in self.websocket_connections:
|
||
try:
|
||
ws = self.websocket_connections[session_id]
|
||
await ws.send_json({
|
||
'type': 'screenshot_update',
|
||
'screenshot': screenshot
|
||
})
|
||
last_update_time = current_time
|
||
except:
|
||
# WebSocket 可能已断开
|
||
break
|
||
|
||
# 降低检查频率,减少 CPU 使用
|
||
await asyncio.sleep(0.5)
|
||
|
||
except Exception as e:
|
||
logger.error(f"自动刷新截图失败: {e}")
|
||
await asyncio.sleep(1) # 出错时等待更长时间
|
||
|
||
|
||
# 全局实例
|
||
captcha_controller = CaptchaRemoteController()
|
||
|