diff --git a/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/api/impl/GetAccountApi.java b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/api/impl/GetAccountApi.java index 5825e6f..3e19be8 100644 --- a/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/api/impl/GetAccountApi.java +++ b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/api/impl/GetAccountApi.java @@ -47,6 +47,7 @@ public class GetAccountApi extends GoofishAbstractApi { .header("Cookie", cookieStr) .header("content-type", "application/x-www-form-urlencoded") .header("priority", "u=1, i") + .header("user-agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0") .body("data=" + URLEncoder.encode(dataStr, StandardCharsets.UTF_8)) .execute()) { String body = response.body(); diff --git a/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/api/impl/GetDisplayNameApi.java b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/api/impl/GetDisplayNameApi.java index aeb43b7..1e6acfb 100644 --- a/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/api/impl/GetDisplayNameApi.java +++ b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/api/impl/GetDisplayNameApi.java @@ -47,6 +47,7 @@ public class GetDisplayNameApi extends GoofishAbstractApi { .header("Cookie", cookieStr) .header("content-type", "application/x-www-form-urlencoded") .header("priority", "u=1, i") + .header("user-agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0") .body("data=" + URLEncoder.encode(dataStr, StandardCharsets.UTF_8)) .execute()) { String body = response.body(); diff --git a/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/api/impl/GetUserIdApi.java b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/api/impl/GetUserIdApi.java index c48e31f..f964cfa 100644 --- a/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/api/impl/GetUserIdApi.java +++ b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/api/impl/GetUserIdApi.java @@ -47,6 +47,7 @@ public class GetUserIdApi extends GoofishAbstractApi { .header("Cookie", cookieStr) .header("content-type", "application/x-www-form-urlencoded") .header("priority", "u=1, i") + .header("user-agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0") .body("data=" + URLEncoder.encode(dataStr, StandardCharsets.UTF_8)) .execute()) { String body = response.body(); diff --git a/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/api/impl/GetWsTokenApi.java b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/api/impl/GetWsTokenApi.java new file mode 100644 index 0000000..fce9eb6 --- /dev/null +++ b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/api/impl/GetWsTokenApi.java @@ -0,0 +1,110 @@ +package top.biwin.xianyu.goofish.api.impl; + +import cn.hutool.http.HttpRequest; +import cn.hutool.http.HttpResponse; +import cn.hutool.http.HttpUtil; +import cn.hutool.json.JSONArray; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import top.biwin.xianyu.goofish.api.GoofishAbstractApi; + +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; + +/** + * TODO + * + * @author wangli + * @since 2026-01-29 21:41 + */ +@Component +@Slf4j +public class GetWsTokenApi extends GoofishAbstractApi { + @Override + public String getName() { + return "getWsToken"; + } + + @Override + public String getApi() { + return "mtop.taobao.idlemessage.pc.login.token"; + } + + @Override + public String getVersion() { + return "1.0"; + } + + @Override + public String call(String goofishId, String cookieStr, String dataStr) { + String apiUrl = buildApiUrl() + HttpUtil.toParams(buildQueryParams(cookieStr, dataStr)); + log.debug("【{}】获取闲鱼 WebSocket Token ApiUrl: {}", goofishId, apiUrl); + log.debug("【{}】获取闲鱼 WebSocket Token 时使用的 Cookie 为: {}", goofishId, cookieStr); + + try (HttpResponse response = HttpRequest.post(apiUrl) + .header("Cookie", cookieStr) + .header("content-type", "application/x-www-form-urlencoded") + .header("priority", "u=1, i") + .header("user-agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0") + .body("data=" + URLEncoder.encode(dataStr, StandardCharsets.UTF_8)) + .execute()) { + String body = response.body(); + log.info("【{}】获取闲鱼 WebSocket Token 时,服务端返回的完整响应为: {}", goofishId, body); + + JSONObject resJson = JSONUtil.parseObj(body); + // 检查是否需要滑块验证 + if (needsCaptchaVerification(resJson)) { + log.warn("【{}】检测到滑块验证要求,需要刷新Cookie", goofishId); + return null; + } + + // 检查响应 + if (resJson.containsKey("ret")) { + JSONArray retArray = resJson.getJSONArray("ret"); + for (int i = 0; i < retArray.size(); i++) { + String ret = retArray.getStr(i); + if (ret.contains("SUCCESS::调用成功")) { + if (resJson.containsKey("data")) { + JSONObject data = resJson.getJSONObject("data"); + if (data.containsKey("accessToken")) { + String newToken = data.getStr("accessToken"); + log.info("【{}】获取到accessToken: {}", goofishId, newToken); + return newToken; + } + } + } + } + } + } catch (Exception e) { + log.error("获取闲鱼 WebSocket Token 异常: {}", e.getMessage(), e); + } + + return null; + } + + /** + * 检查是否需要滑块验证 + */ + private boolean needsCaptchaVerification(JSONObject resJson) { + try { + JSONArray ret = resJson.getJSONArray("ret"); + if (ret == null || ret.isEmpty()) { + return false; + } + + String errorMsg = ret.getStr(0); + + // 检查是否包含滑块验证关键词 + return errorMsg.contains("FAIL_SYS_USER_VALIDATE") || + errorMsg.contains("RGV587_ERROR") || + errorMsg.contains("哎哟喂,被挤爆啦") || + errorMsg.contains("哎哟喂,被挤爆啦") || + errorMsg.contains("captcha") || + errorMsg.contains("punish"); + } catch (Exception e) { + return false; + } + } +} diff --git a/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/service/GoofishApiService.java b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/service/GoofishApiService.java index 8543388..41ce95c 100644 --- a/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/service/GoofishApiService.java +++ b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/service/GoofishApiService.java @@ -7,6 +7,7 @@ import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import top.biwin.xianyu.core.repository.GoofishAccountRepository; import top.biwin.xianyu.goofish.api.GoofishApi; +import top.biwin.xianyu.goofish.websocket.WebSocketConfiguration; import javax.annotation.Nullable; import java.util.List; @@ -21,6 +22,8 @@ import java.util.Objects; @Component @Data public class GoofishApiService { + @Autowired + private WebSocketConfiguration webSocketConfiguration; @Autowired private List> apis; @Autowired @@ -51,7 +54,7 @@ public class GoofishApiService { public Long getUserId(String goofishId, @Nullable String cookieStr) { GoofishApi goofishApi = getApi("getUserId"); if (!StringUtils.hasText(cookieStr)) { - cookieStr = goofishAccountRepository.findByUsername(goofishId) + cookieStr = goofishAccountRepository.findById(goofishId) .orElseThrow(() -> new IllegalArgumentException("无法获取闲鱼用户 ID,缺少 Cookie 信息")) .getCookie(); } @@ -61,13 +64,23 @@ public class GoofishApiService { public String getNickName(String goofishId, @Nullable String cookieStr) { GoofishApi goofishApi = getApi("getDisplayName"); if (!StringUtils.hasText(cookieStr)) { - cookieStr = goofishAccountRepository.findByUsername(goofishId) + cookieStr = goofishAccountRepository.findById(goofishId) .orElseThrow(() -> new IllegalArgumentException("无法获取闲鱼用户 ID,缺少 Cookie 信息")) .getCookie(); } return (String) goofishApi.call(goofishId, cookieStr, "{}"); } + public String getWsToken(String goofishId, @Nullable String cookieStr, String deviceId) { + GoofishApi goofishApi = getApi("getWsToken"); + if (!StringUtils.hasText(cookieStr)) { + cookieStr = goofishAccountRepository.findById(goofishId) + .orElseThrow(() -> new IllegalArgumentException("无法获取闲鱼用户 ID,缺少 Cookie 信息")) + .getCookie(); + } + return (String) goofishApi.call(goofishId, cookieStr, "{\"appKey\":\"" + webSocketConfiguration.getAppKey() + "\", \"deviceId\":\"" + deviceId + "\"}"); + } + private GoofishApi getApi(String apiName) { if (CollectionUtils.isEmpty(apis)) { throw new IllegalStateException("未初始化闲鱼 API"); diff --git a/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/service/GoofishPwdLoginService.java b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/service/GoofishPwdLoginService.java index 4a2b8d1..dc015bd 100644 --- a/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/service/GoofishPwdLoginService.java +++ b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/service/GoofishPwdLoginService.java @@ -18,6 +18,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import static top.biwin.xianyu.goofish.BrowserConstant.STEALTH_SCRIPT; +import static top.biwin.xianyu.goofish.util.CookieUtils.buildCookieStr; import static top.biwin.xianyu.goofish.util.SliderUtils.attemptSolveSlider; /** diff --git a/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/util/XianyuUtils.java b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/util/XianyuUtils.java index 56e80f3..032f176 100644 --- a/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/util/XianyuUtils.java +++ b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/util/XianyuUtils.java @@ -34,4 +34,11 @@ public class XianyuUtils { } return result.toString() + "-" + userId; } + + public static String generateMid() { + Random random = new Random(); + int randomPart = (int) (1000 * random.nextDouble()); + long timestamp = System.currentTimeMillis(); + return randomPart + String.valueOf(timestamp) + " 0"; + } } diff --git a/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/websocket/GoofishAccountWebsocket.java b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/websocket/GoofishAccountWebsocket.java index a1c1d64..3046c02 100644 --- a/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/websocket/GoofishAccountWebsocket.java +++ b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/websocket/GoofishAccountWebsocket.java @@ -1,6 +1,10 @@ package top.biwin.xianyu.goofish.websocket; import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSON; +import cn.hutool.json.JSONArray; +import cn.hutool.json.JSONObject; +import cn.hutool.json.JSONUtil; import com.microsoft.playwright.BrowserContext; import jakarta.websocket.ContainerProvider; import jakarta.websocket.WebSocketContainer; @@ -9,6 +13,7 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.util.StringUtils; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketHttpHeaders; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.client.WebSocketClient; @@ -28,6 +33,9 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -42,13 +50,13 @@ import java.util.concurrent.atomic.AtomicLong; */ @Slf4j public class GoofishAccountWebsocket extends TextWebSocketHandler { - private static final String WEBSOCKET_URL = "wss://wss-goofish.dingtalk.com/"; private final AtomicBoolean connected = new AtomicBoolean(false); private final AtomicBoolean running = new AtomicBoolean(false); private volatile WebSocketConnectionState connectionState = WebSocketConnectionState.DISCONNECTED; private final AtomicLong lastStateChangeTime = new AtomicLong(System.currentTimeMillis()); private final AtomicInteger connectionFailures = new AtomicInteger(0); private static final int MAX_CONNECTION_FAILURES = 3; + private static final int MESSAGE_COOLDOWN = 300; // 消息冷却时间(秒),5分钟 private WebSocketSession webSocketSession; private String cookiesStr; private Long userId; @@ -56,23 +64,42 @@ public class GoofishAccountWebsocket extends TextWebSocketHandler { private String myId; private String deviceId; + private String currentToken; + private final AtomicLong lastTokenRefreshTime = new AtomicLong(0); + private volatile String lastTokenRefreshStatus = "none"; // Token刷新状态 + // ============== 心跳相关 ============== + private final AtomicLong lastHeartbeatTime = new AtomicLong(0); + private final AtomicLong lastHeartbeatResponse = new AtomicLong(0); + private final AtomicLong lastMessageReceivedTime = new AtomicLong(0); // 上次收到消息时间 + private final AtomicLong lastCookieRefreshTime = new AtomicLong(0); + private final Semaphore messageSemaphore = new Semaphore(100); // 最多100个并发消息 + private final AtomicInteger activeMessageTasks = new AtomicInteger(0); + private final String goofishId; private final GoofishAccountRepository goofishAccountRepository; private final BrowserService browserService; private final GoofishApiService goofishApiService; - private final ExecutorService scheduledExecutor; + private final WebSocketConfiguration webSocketConfiguration; + private final ScheduledExecutorService scheduledExecutorService; + private ScheduledFuture heartbeatTask; + private ScheduledFuture tokenRefreshTask; + private ScheduledFuture cleanupTask; + private ScheduledFuture cookieRefreshTask; public GoofishAccountWebsocket(String goofishId, GoofishAccountRepository goofishAccountRepository, BrowserService browserService, GoofishApiService goofishApiService, - @Qualifier("goofishAccountWebSocketExecutor") ExecutorService scheduledExecutor) { + WebSocketConfiguration webSocketConfiguration, +// WebSocketContainer webSocketContainer, + @Qualifier("goofishAccountWebSocketExecutor") ScheduledExecutorService scheduledExecutorService) { super(); this.goofishId = goofishId; this.goofishAccountRepository = goofishAccountRepository; this.browserService = browserService; this.goofishApiService = goofishApiService; - this.scheduledExecutor = scheduledExecutor; + this.webSocketConfiguration = webSocketConfiguration; + this.scheduledExecutorService = scheduledExecutorService; } public void start() { @@ -92,7 +119,7 @@ public class GoofishAccountWebsocket extends TextWebSocketHandler { } // 启动WebSocket连接循环 - CompletableFuture.runAsync(this::connecting, scheduledExecutor); + CompletableFuture.runAsync(this::connecting, scheduledExecutorService); } private boolean loadGoofishAccount() { @@ -114,7 +141,7 @@ public class GoofishAccountWebsocket extends TextWebSocketHandler { } String accountName = goofishApiService.getAccount(account.getId(), cookiesStr); - if(!StringUtils.hasText(accountName)) { + if (!StringUtils.hasText(accountName)) { // 说明 cookie 失效,尝试重新登录 this.cookiesStr = browserService.refreshGoofishAccountCookie(account.getId(), account.getShowBrowser() == 1, 1000D, cookiesStr); } @@ -166,7 +193,7 @@ public class GoofishAccountWebsocket extends TextWebSocketHandler { } } - /** + /** * 等待WebSocket连接断开 */ private void waitForDisconnection() { @@ -302,7 +329,7 @@ public class GoofishAccountWebsocket extends TextWebSocketHandler { try { // 发起WebSocket握手 ListenableFuture future = - client.doHandshake(this, headers, URI.create(WEBSOCKET_URL)); + client.doHandshake(this, headers, URI.create(webSocketConfiguration.getWsUrl())); // 等待连接完成(超时30秒) // 注意:由于 afterConnectionEstablished 已异步化,这个超时仅用于 WebSocket 握手本身 @@ -330,7 +357,7 @@ public class GoofishAccountWebsocket extends TextWebSocketHandler { // 更新连接状态 setConnectionState(WebSocketConnectionState.INIT, "准备建立WebSocket连接"); - log.info("【{}】WebSocket目标地址: {}", goofishId, WEBSOCKET_URL); + log.info("【{}】WebSocket目标地址: {}", goofishId, webSocketConfiguration.getWsUrl()); // 单次连接尝试 connectWebSocket(); @@ -356,10 +383,390 @@ public class GoofishAccountWebsocket extends TextWebSocketHandler { log.info("【{}】WebSocket 连接循环已退出", goofishId); } + /** + * 初始化连接 - 对应Python的init()方法 + */ + private void init(WebSocketSession session) throws Exception { + log.info("【{}】========== 开始初始化WebSocket连接 ==========", goofishId); + log.info("【{}】检查Token状态... currentToken={}, lastRefresh={}", + goofishId, currentToken != null ? "存在" : "不存在", lastTokenRefreshTime.get()); + + // 刷新Token + long currentTime = System.currentTimeMillis(); + if (currentToken == null || (currentTime - lastTokenRefreshTime.get()) >= webSocketConfiguration.getTokenRefreshInterval() * 1000L) { + log.info("【{}】需要刷新token,开始调用refreshToken()...", goofishId); + + try { + currentToken = goofishApiService.getWsToken(goofishId, cookiesStr, deviceId); + log.info("【{}】Token刷新调用完成,currentToken={}", goofishId, currentToken != null ? "已获取" : "未获取"); + } catch (Exception e) { + log.error("【{}】Token刷新过程出错: {}", goofishId, e.getMessage(), e); + throw e; + } + } else { + log.info("【{}】Token有效,跳过刷新", goofishId); + } + + if (currentToken == null) { + log.error("【{}】❌ 无法获取有效token,初始化失败", goofishId); + throw new Exception("Token获取失败"); + } + + log.info("【{}】✅ Token验证通过: {}", goofishId, currentToken.substring(0, Math.min(20, currentToken.length())) + "..."); + + // 发送 /reg 消息 + log.info("【{}】准备发送 /reg 消息...", goofishId); + JSONObject regMsg = new JSONObject(); + regMsg.put("lwp", "/reg"); + + JSONObject regHeaders = new JSONObject(); + regHeaders.put("cache-header", "app-key token ua wv"); + regHeaders.put("app-key", webSocketConfiguration.getAppKey()); + regHeaders.put("token", currentToken); + regHeaders.put("ua", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0"); + regHeaders.put("dt", "j"); + regHeaders.put("wv", "im:3,au:3,sy:6"); + regHeaders.put("sync", "0,0;0;0;"); + regHeaders.put("did", deviceId); + regHeaders.put("mid", XianyuUtils.generateMid()); + regMsg.put("headers", regHeaders); + + try { + session.sendMessage(new TextMessage(regMsg.toString())); + log.info("【{}】✅ /reg 消息已发送", goofishId); + } catch (Exception e) { + log.error("【{}】❌ 发送 /reg 消息失败: {}", goofishId, e.getMessage(), e); + throw e; + } + + // 等待1秒 + log.info("【{}】等待1秒...", goofishId); + Thread.sleep(1000); + + + // 发送 /ackDiff 消息 + log.info("【{}】准备发送 /ackDiff 消息...", goofishId); + long timestamp = System.currentTimeMillis(); + JSONObject ackMsg = new JSONObject(); + ackMsg.put("lwp", "/r/SyncStatus/ackDiff"); + + JSONObject ackHeaders = new JSONObject(); + ackHeaders.put("mid", XianyuUtils.generateMid()); + ackMsg.put("headers", ackHeaders); + + JSONArray bodyArray = new JSONArray(); + JSONObject bodyItem = new JSONObject(); + bodyItem.put("pipeline", "sync"); + bodyItem.put("tooLong2Tag", "PNM,1"); + bodyItem.put("channel", "sync"); + bodyItem.put("topic", "sync"); + bodyItem.put("highPts", 0); + bodyItem.put("pts", timestamp * 1000); + + bodyItem.put("seq", 0); + bodyItem.put("timestamp", timestamp); + bodyArray.add(bodyItem); + ackMsg.put("body", bodyArray); + + try { + session.sendMessage(new TextMessage(ackMsg.toString())); + log.info("【{}】✅ /ackDiff 消息已发送", goofishId); + } catch (Exception e) { + log.error("【{}】❌ 发送 /ackDiff 消息失败: {}", goofishId, e.getMessage(), e); + throw e; + } + + log.info("【{}】========== WebSocket初始化完成 ==========", goofishId); + } + + /** + * 发送心跳 - 对应Python的send_heartbeat()方法 + */ + private void sendHeartbeat() throws Exception { + if (webSocketSession == null || !webSocketSession.isOpen()) { + throw new Exception("WebSocket连接已关闭,无法发送心跳"); + } + + JSONObject msg = new JSONObject(); + msg.put("lwp", "/!"); + + JSONObject headers = new JSONObject(); + headers.put("mid", XianyuUtils.generateMid()); + msg.put("headers", headers); + + webSocketSession.sendMessage(new TextMessage(msg.toString())); + lastHeartbeatTime.set(System.currentTimeMillis()); + log.debug("【{}】心跳包已发送", goofishId); + } + + /** + * 心跳循环 - 对应Python的heartbeat_loop()方法 + */ + private void heartbeatLoop() { + if (!connected.get() || webSocketSession == null || !webSocketSession.isOpen()) { + return; + } + + try { + sendHeartbeat(); + } catch (Exception e) { + log.error("【{}】心跳发送失败: {}", goofishId, e.getMessage()); + } + } + + /** + * 关闭WebSocket连接 + */ + private void closeWebSocket() { + if (webSocketSession != null) { + try { + if (webSocketSession.isOpen()) { + webSocketSession.close(); + log.info("【{}】WebSocket连接已关闭", goofishId); + } + } catch (Exception e) { + log.error("【{}】关闭WebSocket时出错", goofishId, e); + } finally { + webSocketSession = null; + connected.set(false); + } + } + } + + /** + * Token刷新循环 - 对应Python的token_refresh_loop()方法 + */ + private void tokenRefreshLoop() { + try { + // 检查账号是否启用 + if (!isAccountEnabled()) { + log.info("【{}】账号已禁用,停止Token刷新循环", goofishId); + return; + } + + long currentTime = System.currentTimeMillis(); + if (currentTime - lastTokenRefreshTime.get() >= webSocketConfiguration.getTokenRefreshInterval() * 1000L) { + log.info("【{}】Token即将过期,准备刷新...", goofishId); + String newToken = goofishApiService.getWsToken(goofishId, cookiesStr, deviceId); + if (newToken != null) { + log.info("【{}】Token刷新成功,将关闭WebSocket以使用新Token重连", goofishId); + // Token刷新成功后,关闭WebSocket连接,让它用新Token重新连接 + closeWebSocket(); + } + } + + } catch (Exception e) { + log.error("【{}】Token刷新循环出错", goofishId, e); + } + } + + /** + * 暂停清理循环 - 对应Python的pause_cleanup_loop()方法 + */ + private void pauseCleanupLoop() { + } + + /** + * Cookie刷新循环 - 对应Python的cookie_refresh_loop()方法 + */ + private void cookieRefreshLoop() { + try { + // 检查账号是否启用 + if (!isAccountEnabled()) { + log.info("【{}】账号已禁用,停止Cookie刷新循环", goofishId); + return; + } + + long currentTime = System.currentTimeMillis(); + + // 检查是否在消息接收后的冷却时间内 + long timeSinceLastMessage = currentTime - lastMessageReceivedTime.get(); + if (lastMessageReceivedTime.get() > 0 && timeSinceLastMessage < MESSAGE_COOLDOWN * 1000L) { + log.info("【{}】收到消息后冷却中,跳过本次Cookie刷新", goofishId); + return; + } + + // 从数据库重新加载Cookie + if (currentTime - lastCookieRefreshTime.get() >= webSocketConfiguration.getCookieRefreshInterval() * 1000L) { + log.info("【{}】开始Cookie刷新...", goofishId); + if (!loadGoofishAccount()) { + lastCookieRefreshTime.set(currentTime); + log.info("【{}】Cookie刷新成功", goofishId); + } + } + + } catch (Exception e) { + log.error("【{}】Cookie刷新循环出错", goofishId, e); + } + } + + /** + * 启动所有后台任务 + */ + private void startBackgroundTasks() { + log.info("【{}】准备启动后台任务...", goofishId); + + // 启动心跳任务(依赖WebSocket,每次重连都需要重启) + if (heartbeatTask == null || heartbeatTask.isDone()) { + log.info("【{}】启动心跳任务...", goofishId); + heartbeatTask = scheduledExecutorService.scheduleWithFixedDelay( + this::heartbeatLoop, + 0, + webSocketConfiguration.getHeartbeatInterval(), + TimeUnit.SECONDS + ); + } + + // 启动Token刷新任务 + if (tokenRefreshTask == null || tokenRefreshTask.isDone()) { + log.info("【{}】启动Token刷新任务...", goofishId); + tokenRefreshTask = scheduledExecutorService.scheduleWithFixedDelay( + this::tokenRefreshLoop, + 60, + 60, + TimeUnit.SECONDS + ); + } + + // 启动清理任务 + if (cleanupTask == null || cleanupTask.isDone()) { + log.info("【{}】启动暂停记录清理任务...", goofishId); + cleanupTask = scheduledExecutorService.scheduleWithFixedDelay( + this::pauseCleanupLoop, + webSocketConfiguration.getCleanupInterval(), + webSocketConfiguration.getCleanupInterval(), + TimeUnit.SECONDS + ); + } + + // 启动Cookie刷新任务 + if (cookieRefreshTask == null || cookieRefreshTask.isDone()) { + log.info("【{}】启动Cookie刷新任务...", goofishId); + cookieRefreshTask = scheduledExecutorService.scheduleWithFixedDelay( + this::cookieRefreshLoop, + webSocketConfiguration.getCookieRefreshInterval(), + webSocketConfiguration.getCookieRefreshInterval(), + TimeUnit.SECONDS + ); + } + + log.info("【{}】✅ 所有后台任务已启动", goofishId); + } @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { - super.afterConnectionEstablished(session); + log.info("【{}】WebSocket连接已建立", goofishId); + this.webSocketSession = session; + + // 更新连接状态(连接已建立,但尚未初始化) + setConnectionState(WebSocketConnectionState.CONNECTED, "握手完成,准备初始化"); + connected.set(true); // 标记连接已建立 + + // 异步执行初始化(不阻塞连接建立过程) + // 这样可以避免 init() 中的耗时操作(如refreshToken)导致连接超时 + CompletableFuture.runAsync(() -> { + try { + log.info("【{}】开始异步初始化...", goofishId); + init(session); + log.info("【{}】WebSocket初始化完成!", goofishId); + + // 更新连接状态 + setConnectionState(WebSocketConnectionState.REGISTER, "初始化完成,连接就绪"); + connectionFailures.set(0); + + // 启动后台任务 + startBackgroundTasks(); + + log.info("【{}】✅ WebSocket连接和初始化全部完成", goofishId); + + } catch (Exception e) { + log.error("【{}】❌ WebSocket初始化失败: {}", goofishId, e.getMessage(), e); + log.error("【{}】异常类型: {}", goofishId, e.getClass().getName()); + log.error("【{}】异常堆栈:", goofishId, e); + connected.set(false); + + // 关闭连接,触发重连 + try { + if (session.isOpen()) { + session.close(); + } + } catch (Exception closeEx) { + log.error("【{}】关闭连接失败", goofishId, closeEx); + } + } + }, scheduledExecutorService); // 使用已有的线程池执行异步任务 + } + + /** + * 处理心跳响应 - 对应Python的handle_heartbeat_response()方法 + */ + private boolean handleHeartbeatResponse(JSONObject messageData) { + try { + if (messageData.getInt("code") == 200) { + lastHeartbeatResponse.set(System.currentTimeMillis()); + log.debug("【{}】收到心跳响应", goofishId); + return true; + } + } catch (Exception e) { + // 忽略 + } + return false; + } + + private void handleMessage(JSONObject messageData, WebSocketSession session) { + // TODO + } + + /** + * 带信号量的消息处理包装器,防止并发任务过多 + * 对应Python的 _handle_message_with_semaphore()方法 + */ + private void handleMessageWithSemaphore(JSONObject messageData, WebSocketSession session) { + try { + messageSemaphore.acquire(); + int currentTasks = activeMessageTasks.incrementAndGet(); + try { + log.debug("【{}】收到的消息内容: {}", goofishId, JSONUtil.toJsonStr(messageData)); + handleMessage(messageData, session); + } finally { + activeMessageTasks.decrementAndGet(); + messageSemaphore.release(); + + // 定期记录活跃任务数(每100个任务记录一次) + // 对应Python: if self.active_message_tasks % 100 == 0 and self.active_message_tasks > 0 + if (currentTasks % 100 == 0 && currentTasks > 0) { + log.info("【{}】当前活跃消息处理任务数: {}", goofishId, currentTasks); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("【{}】消息处理被中断", goofishId, e); + } + } + + + /** + * 接收WebSocket消息 + */ + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { + String payload = message.getPayload(); + log.info("【{}】收到WebSocket消息: {} 字节", goofishId, payload.length()); + + try { + JSONObject messageData = JSONUtil.parseObj(payload); + + // 处理心跳响应 + if (handleHeartbeatResponse(messageData)) { + return; + } + + // 处理其他消息(异步处理,避免阻塞) + CompletableFuture.runAsync(() -> handleMessageWithSemaphore(messageData, session), scheduledExecutorService); + + } catch (Exception e) { + log.error("【{}】处理消息出错", goofishId, e); + } } @Override diff --git a/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/websocket/WebSocketConfiguration.java b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/websocket/WebSocketConfiguration.java new file mode 100644 index 0000000..1e6a77c --- /dev/null +++ b/xianyu-goofish/src/main/java/top/biwin/xianyu/goofish/websocket/WebSocketConfiguration.java @@ -0,0 +1,21 @@ +package top.biwin.xianyu.goofish.websocket; + +import lombok.Data; +import org.springframework.context.annotation.Configuration; + +/** + * TODO + * + * @author wangli + * @since 2026-02-01 10:01 + */ +@Data +@Configuration(proxyBeanMethods = false, value = "goofish.websocket") +public class WebSocketConfiguration { + private String appKey; + private String wsUrl; + private Integer tokenRefreshInterval; + private Integer heartbeatInterval; + private Integer cleanupInterval; + private Integer cookieRefreshInterval; +} diff --git a/xianyu-server/src/main/java/top/biwin/xinayu/server/config/ThreadPoolConfig.java b/xianyu-server/src/main/java/top/biwin/xinayu/server/config/ThreadPoolConfig.java index 8a3a0a2..1f5fe00 100644 --- a/xianyu-server/src/main/java/top/biwin/xinayu/server/config/ThreadPoolConfig.java +++ b/xianyu-server/src/main/java/top/biwin/xinayu/server/config/ThreadPoolConfig.java @@ -3,8 +3,8 @@ package top.biwin.xinayu.server.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; /** * TODO @@ -16,7 +16,7 @@ import java.util.concurrent.Executors; public class ThreadPoolConfig { @Bean(name = "goofishAccountWebSocketExecutor") - public ExecutorService executor() { + public ScheduledExecutorService executor() { // 创建定时任务线程池 return Executors.newScheduledThreadPool(5, r -> { Thread t = new Thread(r); diff --git a/xianyu-server/src/main/resources/application.yml b/xianyu-server/src/main/resources/application.yml index cbf6b4c..f29f3ba 100644 --- a/xianyu-server/src/main/resources/application.yml +++ b/xianyu-server/src/main/resources/application.yml @@ -65,6 +65,11 @@ goofish: api: hostUrl: https://h5api.m.goofish.com/h5/ appKey: 34839810 + websocket: + appKey: 444e9908a51d1cb236a27862abc769c9 + wsUrl: wss://wss-goofish.dingtalk.com/ + tokenRefreshInterval: 72000 + heartbeatInterval: 30 assistant: static: