修复超过kafka消息大小忽略处理

This commit is contained in:
xudawei 2024-08-28 19:38:53 +08:00
parent 5528b6f643
commit 226063bafa

View File

@ -19,6 +19,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
@ -43,6 +44,9 @@ public class LogSinkELKServiceImpl implements LogSinkELKService {
@Value("${log-plat.kafka.send.async:false}")
private Boolean isAsync;
@Value("${log-plat.max.request.size:1040000}")
private Long maxSize;
private Map<String, String> appKeyConfigMap = new HashMap<>();
@Value("${log-plat.sign.keysecret.config}")
@ -57,11 +61,18 @@ public class LogSinkELKServiceImpl implements LogSinkELKService {
public void sinkELk(LogSinkELKReqDTO req) throws BizException, ExecutionException, InterruptedException, JsonProcessingException {
//解密 && 转换
String json = signAndConvert(req);
long jsonMaxSize = json.getBytes(StandardCharsets.UTF_8).length;
if (jsonMaxSize > maxSize) {
logger.warn("json size is {} > {}", jsonMaxSize ,maxSize);
return;
}
if (isAsync) {
kafkaProducer.send(new ProducerRecord<>(topic, json), new KafkaSendCallBack());
} else {
RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord<>(topic, json)).get();
logger.info("send msg to kafka success,offset={},partition={}.", recordMetadata.offset(), recordMetadata.partition());
logger.info("send msg to kafka success,size:{},offset={},partition={}.", jsonMaxSize, recordMetadata.offset(), recordMetadata.partition());
}
}