This commit is contained in:
wangli 2026-02-01 11:09:28 +08:00
parent e77f642628
commit 0b75cc6ac2
11 changed files with 581 additions and 14 deletions

View File

@ -47,6 +47,7 @@ public class GetAccountApi extends GoofishAbstractApi<String> {
.header("Cookie", cookieStr)
.header("content-type", "application/x-www-form-urlencoded")
.header("priority", "u=1, i")
.header("user-agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0")
.body("data=" + URLEncoder.encode(dataStr, StandardCharsets.UTF_8))
.execute()) {
String body = response.body();

View File

@ -47,6 +47,7 @@ public class GetDisplayNameApi extends GoofishAbstractApi<String> {
.header("Cookie", cookieStr)
.header("content-type", "application/x-www-form-urlencoded")
.header("priority", "u=1, i")
.header("user-agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0")
.body("data=" + URLEncoder.encode(dataStr, StandardCharsets.UTF_8))
.execute()) {
String body = response.body();

View File

@ -47,6 +47,7 @@ public class GetUserIdApi extends GoofishAbstractApi<Long> {
.header("Cookie", cookieStr)
.header("content-type", "application/x-www-form-urlencoded")
.header("priority", "u=1, i")
.header("user-agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0")
.body("data=" + URLEncoder.encode(dataStr, StandardCharsets.UTF_8))
.execute()) {
String body = response.body();

View File

@ -0,0 +1,110 @@
package top.biwin.xianyu.goofish.api.impl;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import cn.hutool.http.HttpUtil;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.biwin.xianyu.goofish.api.GoofishAbstractApi;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
/**
* TODO
*
* @author wangli
* @since 2026-01-29 21:41
*/
@Component
@Slf4j
public class GetWsTokenApi extends GoofishAbstractApi<String> {
@Override
public String getName() {
return "getWsToken";
}
@Override
public String getApi() {
return "mtop.taobao.idlemessage.pc.login.token";
}
@Override
public String getVersion() {
return "1.0";
}
@Override
public String call(String goofishId, String cookieStr, String dataStr) {
String apiUrl = buildApiUrl() + HttpUtil.toParams(buildQueryParams(cookieStr, dataStr));
log.debug("【{}】获取闲鱼 WebSocket Token ApiUrl: {}", goofishId, apiUrl);
log.debug("【{}】获取闲鱼 WebSocket Token 时使用的 Cookie 为: {}", goofishId, cookieStr);
try (HttpResponse response = HttpRequest.post(apiUrl)
.header("Cookie", cookieStr)
.header("content-type", "application/x-www-form-urlencoded")
.header("priority", "u=1, i")
.header("user-agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0")
.body("data=" + URLEncoder.encode(dataStr, StandardCharsets.UTF_8))
.execute()) {
String body = response.body();
log.info("【{}】获取闲鱼 WebSocket Token 时,服务端返回的完整响应为: {}", goofishId, body);
JSONObject resJson = JSONUtil.parseObj(body);
// 检查是否需要滑块验证
if (needsCaptchaVerification(resJson)) {
log.warn("【{}】检测到滑块验证要求需要刷新Cookie", goofishId);
return null;
}
// 检查响应
if (resJson.containsKey("ret")) {
JSONArray retArray = resJson.getJSONArray("ret");
for (int i = 0; i < retArray.size(); i++) {
String ret = retArray.getStr(i);
if (ret.contains("SUCCESS::调用成功")) {
if (resJson.containsKey("data")) {
JSONObject data = resJson.getJSONObject("data");
if (data.containsKey("accessToken")) {
String newToken = data.getStr("accessToken");
log.info("【{}】获取到accessToken: {}", goofishId, newToken);
return newToken;
}
}
}
}
}
} catch (Exception e) {
log.error("获取闲鱼 WebSocket Token 异常: {}", e.getMessage(), e);
}
return null;
}
/**
* 检查是否需要滑块验证
*/
private boolean needsCaptchaVerification(JSONObject resJson) {
try {
JSONArray ret = resJson.getJSONArray("ret");
if (ret == null || ret.isEmpty()) {
return false;
}
String errorMsg = ret.getStr(0);
// 检查是否包含滑块验证关键词
return errorMsg.contains("FAIL_SYS_USER_VALIDATE") ||
errorMsg.contains("RGV587_ERROR") ||
errorMsg.contains("哎哟喂,被挤爆啦") ||
errorMsg.contains("哎哟喂,被挤爆啦") ||
errorMsg.contains("captcha") ||
errorMsg.contains("punish");
} catch (Exception e) {
return false;
}
}
}

View File

@ -7,6 +7,7 @@ import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import top.biwin.xianyu.core.repository.GoofishAccountRepository;
import top.biwin.xianyu.goofish.api.GoofishApi;
import top.biwin.xianyu.goofish.websocket.WebSocketConfiguration;
import javax.annotation.Nullable;
import java.util.List;
@ -21,6 +22,8 @@ import java.util.Objects;
@Component
@Data
public class GoofishApiService {
@Autowired
private WebSocketConfiguration webSocketConfiguration;
@Autowired
private List<GoofishApi<?>> apis;
@Autowired
@ -51,7 +54,7 @@ public class GoofishApiService {
public Long getUserId(String goofishId, @Nullable String cookieStr) {
GoofishApi<?> goofishApi = getApi("getUserId");
if (!StringUtils.hasText(cookieStr)) {
cookieStr = goofishAccountRepository.findByUsername(goofishId)
cookieStr = goofishAccountRepository.findById(goofishId)
.orElseThrow(() -> new IllegalArgumentException("无法获取闲鱼用户 ID缺少 Cookie 信息"))
.getCookie();
}
@ -61,13 +64,23 @@ public class GoofishApiService {
public String getNickName(String goofishId, @Nullable String cookieStr) {
GoofishApi<?> goofishApi = getApi("getDisplayName");
if (!StringUtils.hasText(cookieStr)) {
cookieStr = goofishAccountRepository.findByUsername(goofishId)
cookieStr = goofishAccountRepository.findById(goofishId)
.orElseThrow(() -> new IllegalArgumentException("无法获取闲鱼用户 ID缺少 Cookie 信息"))
.getCookie();
}
return (String) goofishApi.call(goofishId, cookieStr, "{}");
}
public String getWsToken(String goofishId, @Nullable String cookieStr, String deviceId) {
GoofishApi<?> goofishApi = getApi("getWsToken");
if (!StringUtils.hasText(cookieStr)) {
cookieStr = goofishAccountRepository.findById(goofishId)
.orElseThrow(() -> new IllegalArgumentException("无法获取闲鱼用户 ID缺少 Cookie 信息"))
.getCookie();
}
return (String) goofishApi.call(goofishId, cookieStr, "{\"appKey\":\"" + webSocketConfiguration.getAppKey() + "\", \"deviceId\":\"" + deviceId + "\"}");
}
private GoofishApi getApi(String apiName) {
if (CollectionUtils.isEmpty(apis)) {
throw new IllegalStateException("未初始化闲鱼 API");

View File

@ -18,6 +18,7 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import static top.biwin.xianyu.goofish.BrowserConstant.STEALTH_SCRIPT;
import static top.biwin.xianyu.goofish.util.CookieUtils.buildCookieStr;
import static top.biwin.xianyu.goofish.util.SliderUtils.attemptSolveSlider;
/**

View File

@ -34,4 +34,11 @@ public class XianyuUtils {
}
return result.toString() + "-" + userId;
}
public static String generateMid() {
Random random = new Random();
int randomPart = (int) (1000 * random.nextDouble());
long timestamp = System.currentTimeMillis();
return randomPart + String.valueOf(timestamp) + " 0";
}
}

View File

@ -1,6 +1,10 @@
package top.biwin.xianyu.goofish.websocket;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSON;
import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.microsoft.playwright.BrowserContext;
import jakarta.websocket.ContainerProvider;
import jakarta.websocket.WebSocketContainer;
@ -9,6 +13,7 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHttpHeaders;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.client.WebSocketClient;
@ -28,6 +33,9 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -42,13 +50,13 @@ import java.util.concurrent.atomic.AtomicLong;
*/
@Slf4j
public class GoofishAccountWebsocket extends TextWebSocketHandler {
private static final String WEBSOCKET_URL = "wss://wss-goofish.dingtalk.com/";
private final AtomicBoolean connected = new AtomicBoolean(false);
private final AtomicBoolean running = new AtomicBoolean(false);
private volatile WebSocketConnectionState connectionState = WebSocketConnectionState.DISCONNECTED;
private final AtomicLong lastStateChangeTime = new AtomicLong(System.currentTimeMillis());
private final AtomicInteger connectionFailures = new AtomicInteger(0);
private static final int MAX_CONNECTION_FAILURES = 3;
private static final int MESSAGE_COOLDOWN = 300; // 消息冷却时间5分钟
private WebSocketSession webSocketSession;
private String cookiesStr;
private Long userId;
@ -56,23 +64,42 @@ public class GoofishAccountWebsocket extends TextWebSocketHandler {
private String myId;
private String deviceId;
private String currentToken;
private final AtomicLong lastTokenRefreshTime = new AtomicLong(0);
private volatile String lastTokenRefreshStatus = "none"; // Token刷新状态
// ============== 心跳相关 ==============
private final AtomicLong lastHeartbeatTime = new AtomicLong(0);
private final AtomicLong lastHeartbeatResponse = new AtomicLong(0);
private final AtomicLong lastMessageReceivedTime = new AtomicLong(0); // 上次收到消息时间
private final AtomicLong lastCookieRefreshTime = new AtomicLong(0);
private final Semaphore messageSemaphore = new Semaphore(100); // 最多100个并发消息
private final AtomicInteger activeMessageTasks = new AtomicInteger(0);
private final String goofishId;
private final GoofishAccountRepository goofishAccountRepository;
private final BrowserService browserService;
private final GoofishApiService goofishApiService;
private final ExecutorService scheduledExecutor;
private final WebSocketConfiguration webSocketConfiguration;
private final ScheduledExecutorService scheduledExecutorService;
private ScheduledFuture<?> heartbeatTask;
private ScheduledFuture<?> tokenRefreshTask;
private ScheduledFuture<?> cleanupTask;
private ScheduledFuture<?> cookieRefreshTask;
public GoofishAccountWebsocket(String goofishId,
GoofishAccountRepository goofishAccountRepository,
BrowserService browserService,
GoofishApiService goofishApiService,
@Qualifier("goofishAccountWebSocketExecutor") ExecutorService scheduledExecutor) {
WebSocketConfiguration webSocketConfiguration,
// WebSocketContainer webSocketContainer,
@Qualifier("goofishAccountWebSocketExecutor") ScheduledExecutorService scheduledExecutorService) {
super();
this.goofishId = goofishId;
this.goofishAccountRepository = goofishAccountRepository;
this.browserService = browserService;
this.goofishApiService = goofishApiService;
this.scheduledExecutor = scheduledExecutor;
this.webSocketConfiguration = webSocketConfiguration;
this.scheduledExecutorService = scheduledExecutorService;
}
public void start() {
@ -92,7 +119,7 @@ public class GoofishAccountWebsocket extends TextWebSocketHandler {
}
// 启动WebSocket连接循环
CompletableFuture.runAsync(this::connecting, scheduledExecutor);
CompletableFuture.runAsync(this::connecting, scheduledExecutorService);
}
private boolean loadGoofishAccount() {
@ -114,7 +141,7 @@ public class GoofishAccountWebsocket extends TextWebSocketHandler {
}
String accountName = goofishApiService.getAccount(account.getId(), cookiesStr);
if(!StringUtils.hasText(accountName)) {
if (!StringUtils.hasText(accountName)) {
// 说明 cookie 失效尝试重新登录
this.cookiesStr = browserService.refreshGoofishAccountCookie(account.getId(), account.getShowBrowser() == 1, 1000D, cookiesStr);
}
@ -166,7 +193,7 @@ public class GoofishAccountWebsocket extends TextWebSocketHandler {
}
}
/**
/**
* 等待WebSocket连接断开
*/
private void waitForDisconnection() {
@ -302,7 +329,7 @@ public class GoofishAccountWebsocket extends TextWebSocketHandler {
try {
// 发起WebSocket握手
ListenableFuture<WebSocketSession> future =
client.doHandshake(this, headers, URI.create(WEBSOCKET_URL));
client.doHandshake(this, headers, URI.create(webSocketConfiguration.getWsUrl()));
// 等待连接完成超时30秒
// 注意由于 afterConnectionEstablished 已异步化这个超时仅用于 WebSocket 握手本身
@ -330,7 +357,7 @@ public class GoofishAccountWebsocket extends TextWebSocketHandler {
// 更新连接状态
setConnectionState(WebSocketConnectionState.INIT, "准备建立WebSocket连接");
log.info("【{}】WebSocket目标地址: {}", goofishId, WEBSOCKET_URL);
log.info("【{}】WebSocket目标地址: {}", goofishId, webSocketConfiguration.getWsUrl());
// 单次连接尝试
connectWebSocket();
@ -356,10 +383,390 @@ public class GoofishAccountWebsocket extends TextWebSocketHandler {
log.info("【{}】WebSocket 连接循环已退出", goofishId);
}
/**
* 初始化连接 - 对应Python的init()方法
*/
private void init(WebSocketSession session) throws Exception {
log.info("【{}】========== 开始初始化WebSocket连接 ==========", goofishId);
log.info("【{}】检查Token状态... currentToken={}, lastRefresh={}",
goofishId, currentToken != null ? "存在" : "不存在", lastTokenRefreshTime.get());
// 刷新Token
long currentTime = System.currentTimeMillis();
if (currentToken == null || (currentTime - lastTokenRefreshTime.get()) >= webSocketConfiguration.getTokenRefreshInterval() * 1000L) {
log.info("【{}】需要刷新token开始调用refreshToken()...", goofishId);
try {
currentToken = goofishApiService.getWsToken(goofishId, cookiesStr, deviceId);
log.info("【{}】Token刷新调用完成currentToken={}", goofishId, currentToken != null ? "已获取" : "未获取");
} catch (Exception e) {
log.error("【{}】Token刷新过程出错: {}", goofishId, e.getMessage(), e);
throw e;
}
} else {
log.info("【{}】Token有效跳过刷新", goofishId);
}
if (currentToken == null) {
log.error("【{}】❌ 无法获取有效token初始化失败", goofishId);
throw new Exception("Token获取失败");
}
log.info("【{}】✅ Token验证通过: {}", goofishId, currentToken.substring(0, Math.min(20, currentToken.length())) + "...");
// 发送 /reg 消息
log.info("【{}】准备发送 /reg 消息...", goofishId);
JSONObject regMsg = new JSONObject();
regMsg.put("lwp", "/reg");
JSONObject regHeaders = new JSONObject();
regHeaders.put("cache-header", "app-key token ua wv");
regHeaders.put("app-key", webSocketConfiguration.getAppKey());
regHeaders.put("token", currentToken);
regHeaders.put("ua", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0");
regHeaders.put("dt", "j");
regHeaders.put("wv", "im:3,au:3,sy:6");
regHeaders.put("sync", "0,0;0;0;");
regHeaders.put("did", deviceId);
regHeaders.put("mid", XianyuUtils.generateMid());
regMsg.put("headers", regHeaders);
try {
session.sendMessage(new TextMessage(regMsg.toString()));
log.info("【{}】✅ /reg 消息已发送", goofishId);
} catch (Exception e) {
log.error("【{}】❌ 发送 /reg 消息失败: {}", goofishId, e.getMessage(), e);
throw e;
}
// 等待1秒
log.info("【{}】等待1秒...", goofishId);
Thread.sleep(1000);
// 发送 /ackDiff 消息
log.info("【{}】准备发送 /ackDiff 消息...", goofishId);
long timestamp = System.currentTimeMillis();
JSONObject ackMsg = new JSONObject();
ackMsg.put("lwp", "/r/SyncStatus/ackDiff");
JSONObject ackHeaders = new JSONObject();
ackHeaders.put("mid", XianyuUtils.generateMid());
ackMsg.put("headers", ackHeaders);
JSONArray bodyArray = new JSONArray();
JSONObject bodyItem = new JSONObject();
bodyItem.put("pipeline", "sync");
bodyItem.put("tooLong2Tag", "PNM,1");
bodyItem.put("channel", "sync");
bodyItem.put("topic", "sync");
bodyItem.put("highPts", 0);
bodyItem.put("pts", timestamp * 1000);
bodyItem.put("seq", 0);
bodyItem.put("timestamp", timestamp);
bodyArray.add(bodyItem);
ackMsg.put("body", bodyArray);
try {
session.sendMessage(new TextMessage(ackMsg.toString()));
log.info("【{}】✅ /ackDiff 消息已发送", goofishId);
} catch (Exception e) {
log.error("【{}】❌ 发送 /ackDiff 消息失败: {}", goofishId, e.getMessage(), e);
throw e;
}
log.info("【{}】========== WebSocket初始化完成 ==========", goofishId);
}
/**
* 发送心跳 - 对应Python的send_heartbeat()方法
*/
private void sendHeartbeat() throws Exception {
if (webSocketSession == null || !webSocketSession.isOpen()) {
throw new Exception("WebSocket连接已关闭无法发送心跳");
}
JSONObject msg = new JSONObject();
msg.put("lwp", "/!");
JSONObject headers = new JSONObject();
headers.put("mid", XianyuUtils.generateMid());
msg.put("headers", headers);
webSocketSession.sendMessage(new TextMessage(msg.toString()));
lastHeartbeatTime.set(System.currentTimeMillis());
log.debug("【{}】心跳包已发送", goofishId);
}
/**
* 心跳循环 - 对应Python的heartbeat_loop()方法
*/
private void heartbeatLoop() {
if (!connected.get() || webSocketSession == null || !webSocketSession.isOpen()) {
return;
}
try {
sendHeartbeat();
} catch (Exception e) {
log.error("【{}】心跳发送失败: {}", goofishId, e.getMessage());
}
}
/**
* 关闭WebSocket连接
*/
private void closeWebSocket() {
if (webSocketSession != null) {
try {
if (webSocketSession.isOpen()) {
webSocketSession.close();
log.info("【{}】WebSocket连接已关闭", goofishId);
}
} catch (Exception e) {
log.error("【{}】关闭WebSocket时出错", goofishId, e);
} finally {
webSocketSession = null;
connected.set(false);
}
}
}
/**
* Token刷新循环 - 对应Python的token_refresh_loop()方法
*/
private void tokenRefreshLoop() {
try {
// 检查账号是否启用
if (!isAccountEnabled()) {
log.info("【{}】账号已禁用停止Token刷新循环", goofishId);
return;
}
long currentTime = System.currentTimeMillis();
if (currentTime - lastTokenRefreshTime.get() >= webSocketConfiguration.getTokenRefreshInterval() * 1000L) {
log.info("【{}】Token即将过期准备刷新...", goofishId);
String newToken = goofishApiService.getWsToken(goofishId, cookiesStr, deviceId);
if (newToken != null) {
log.info("【{}】Token刷新成功将关闭WebSocket以使用新Token重连", goofishId);
// Token刷新成功后关闭WebSocket连接让它用新Token重新连接
closeWebSocket();
}
}
} catch (Exception e) {
log.error("【{}】Token刷新循环出错", goofishId, e);
}
}
/**
* 暂停清理循环 - 对应Python的pause_cleanup_loop()方法
*/
private void pauseCleanupLoop() {
}
/**
* Cookie刷新循环 - 对应Python的cookie_refresh_loop()方法
*/
private void cookieRefreshLoop() {
try {
// 检查账号是否启用
if (!isAccountEnabled()) {
log.info("【{}】账号已禁用停止Cookie刷新循环", goofishId);
return;
}
long currentTime = System.currentTimeMillis();
// 检查是否在消息接收后的冷却时间内
long timeSinceLastMessage = currentTime - lastMessageReceivedTime.get();
if (lastMessageReceivedTime.get() > 0 && timeSinceLastMessage < MESSAGE_COOLDOWN * 1000L) {
log.info("【{}】收到消息后冷却中跳过本次Cookie刷新", goofishId);
return;
}
// 从数据库重新加载Cookie
if (currentTime - lastCookieRefreshTime.get() >= webSocketConfiguration.getCookieRefreshInterval() * 1000L) {
log.info("【{}】开始Cookie刷新...", goofishId);
if (!loadGoofishAccount()) {
lastCookieRefreshTime.set(currentTime);
log.info("【{}】Cookie刷新成功", goofishId);
}
}
} catch (Exception e) {
log.error("【{}】Cookie刷新循环出错", goofishId, e);
}
}
/**
* 启动所有后台任务
*/
private void startBackgroundTasks() {
log.info("【{}】准备启动后台任务...", goofishId);
// 启动心跳任务依赖WebSocket每次重连都需要重启
if (heartbeatTask == null || heartbeatTask.isDone()) {
log.info("【{}】启动心跳任务...", goofishId);
heartbeatTask = scheduledExecutorService.scheduleWithFixedDelay(
this::heartbeatLoop,
0,
webSocketConfiguration.getHeartbeatInterval(),
TimeUnit.SECONDS
);
}
// 启动Token刷新任务
if (tokenRefreshTask == null || tokenRefreshTask.isDone()) {
log.info("【{}】启动Token刷新任务...", goofishId);
tokenRefreshTask = scheduledExecutorService.scheduleWithFixedDelay(
this::tokenRefreshLoop,
60,
60,
TimeUnit.SECONDS
);
}
// 启动清理任务
if (cleanupTask == null || cleanupTask.isDone()) {
log.info("【{}】启动暂停记录清理任务...", goofishId);
cleanupTask = scheduledExecutorService.scheduleWithFixedDelay(
this::pauseCleanupLoop,
webSocketConfiguration.getCleanupInterval(),
webSocketConfiguration.getCleanupInterval(),
TimeUnit.SECONDS
);
}
// 启动Cookie刷新任务
if (cookieRefreshTask == null || cookieRefreshTask.isDone()) {
log.info("【{}】启动Cookie刷新任务...", goofishId);
cookieRefreshTask = scheduledExecutorService.scheduleWithFixedDelay(
this::cookieRefreshLoop,
webSocketConfiguration.getCookieRefreshInterval(),
webSocketConfiguration.getCookieRefreshInterval(),
TimeUnit.SECONDS
);
}
log.info("【{}】✅ 所有后台任务已启动", goofishId);
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
super.afterConnectionEstablished(session);
log.info("【{}】WebSocket连接已建立", goofishId);
this.webSocketSession = session;
// 更新连接状态连接已建立但尚未初始化
setConnectionState(WebSocketConnectionState.CONNECTED, "握手完成,准备初始化");
connected.set(true); // 标记连接已建立
// 异步执行初始化不阻塞连接建立过程
// 这样可以避免 init() 中的耗时操作如refreshToken导致连接超时
CompletableFuture.runAsync(() -> {
try {
log.info("【{}】开始异步初始化...", goofishId);
init(session);
log.info("【{}】WebSocket初始化完成", goofishId);
// 更新连接状态
setConnectionState(WebSocketConnectionState.REGISTER, "初始化完成,连接就绪");
connectionFailures.set(0);
// 启动后台任务
startBackgroundTasks();
log.info("【{}】✅ WebSocket连接和初始化全部完成", goofishId);
} catch (Exception e) {
log.error("【{}】❌ WebSocket初始化失败: {}", goofishId, e.getMessage(), e);
log.error("【{}】异常类型: {}", goofishId, e.getClass().getName());
log.error("【{}】异常堆栈:", goofishId, e);
connected.set(false);
// 关闭连接触发重连
try {
if (session.isOpen()) {
session.close();
}
} catch (Exception closeEx) {
log.error("【{}】关闭连接失败", goofishId, closeEx);
}
}
}, scheduledExecutorService); // 使用已有的线程池执行异步任务
}
/**
* 处理心跳响应 - 对应Python的handle_heartbeat_response()方法
*/
private boolean handleHeartbeatResponse(JSONObject messageData) {
try {
if (messageData.getInt("code") == 200) {
lastHeartbeatResponse.set(System.currentTimeMillis());
log.debug("【{}】收到心跳响应", goofishId);
return true;
}
} catch (Exception e) {
// 忽略
}
return false;
}
private void handleMessage(JSONObject messageData, WebSocketSession session) {
// TODO
}
/**
* 带信号量的消息处理包装器防止并发任务过多
* 对应Python的 _handle_message_with_semaphore()方法
*/
private void handleMessageWithSemaphore(JSONObject messageData, WebSocketSession session) {
try {
messageSemaphore.acquire();
int currentTasks = activeMessageTasks.incrementAndGet();
try {
log.debug("【{}】收到的消息内容: {}", goofishId, JSONUtil.toJsonStr(messageData));
handleMessage(messageData, session);
} finally {
activeMessageTasks.decrementAndGet();
messageSemaphore.release();
// 定期记录活跃任务数每100个任务记录一次
// 对应Python: if self.active_message_tasks % 100 == 0 and self.active_message_tasks > 0
if (currentTasks % 100 == 0 && currentTasks > 0) {
log.info("【{}】当前活跃消息处理任务数: {}", goofishId, currentTasks);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("【{}】消息处理被中断", goofishId, e);
}
}
/**
* 接收WebSocket消息
*/
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String payload = message.getPayload();
log.info("【{}】收到WebSocket消息: {} 字节", goofishId, payload.length());
try {
JSONObject messageData = JSONUtil.parseObj(payload);
// 处理心跳响应
if (handleHeartbeatResponse(messageData)) {
return;
}
// 处理其他消息异步处理避免阻塞
CompletableFuture.runAsync(() -> handleMessageWithSemaphore(messageData, session), scheduledExecutorService);
} catch (Exception e) {
log.error("【{}】处理消息出错", goofishId, e);
}
}
@Override

View File

@ -0,0 +1,21 @@
package top.biwin.xianyu.goofish.websocket;
import lombok.Data;
import org.springframework.context.annotation.Configuration;
/**
* TODO
*
* @author wangli
* @since 2026-02-01 10:01
*/
@Data
@Configuration(proxyBeanMethods = false, value = "goofish.websocket")
public class WebSocketConfiguration {
private String appKey;
private String wsUrl;
private Integer tokenRefreshInterval;
private Integer heartbeatInterval;
private Integer cleanupInterval;
private Integer cookieRefreshInterval;
}

View File

@ -3,8 +3,8 @@ package top.biwin.xinayu.server.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
* TODO
@ -16,7 +16,7 @@ import java.util.concurrent.Executors;
public class ThreadPoolConfig {
@Bean(name = "goofishAccountWebSocketExecutor")
public ExecutorService executor() {
public ScheduledExecutorService executor() {
// 创建定时任务线程池
return Executors.newScheduledThreadPool(5, r -> {
Thread t = new Thread(r);

View File

@ -65,6 +65,11 @@ goofish:
api:
hostUrl: https://h5api.m.goofish.com/h5/
appKey: 34839810
websocket:
appKey: 444e9908a51d1cb236a27862abc769c9
wsUrl: wss://wss-goofish.dingtalk.com/
tokenRefreshInterval: 72000
heartbeatInterval: 30
assistant:
static: