From 226063bafadbd2707fce985cc2c5f7d42c83493e Mon Sep 17 00:00:00 2001 From: xudawei Date: Wed, 28 Aug 2024 19:38:53 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=B6=85=E8=BF=87kafka?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=A4=A7=E5=B0=8F=E5=BF=BD=E7=95=A5=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/service/impl/LogSinkELKServiceImpl.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/axzo-log-server/src/main/java/cn/axzo/log/platform/server/service/impl/LogSinkELKServiceImpl.java b/axzo-log-server/src/main/java/cn/axzo/log/platform/server/service/impl/LogSinkELKServiceImpl.java index 589bd43..cba1380 100644 --- a/axzo-log-server/src/main/java/cn/axzo/log/platform/server/service/impl/LogSinkELKServiceImpl.java +++ b/axzo-log-server/src/main/java/cn/axzo/log/platform/server/service/impl/LogSinkELKServiceImpl.java @@ -19,6 +19,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -43,6 +44,9 @@ public class LogSinkELKServiceImpl implements LogSinkELKService { @Value("${log-plat.kafka.send.async:false}") private Boolean isAsync; + @Value("${log-plat.max.request.size:1040000}") + private Long maxSize; + private Map appKeyConfigMap = new HashMap<>(); @Value("${log-plat.sign.keysecret.config}") @@ -57,11 +61,18 @@ public class LogSinkELKServiceImpl implements LogSinkELKService { public void sinkELk(LogSinkELKReqDTO req) throws BizException, ExecutionException, InterruptedException, JsonProcessingException { //解密 && 转换 String json = signAndConvert(req); + + long jsonMaxSize = json.getBytes(StandardCharsets.UTF_8).length; + if (jsonMaxSize > maxSize) { + logger.warn("json size is {} > {}", jsonMaxSize ,maxSize); + return; + } + if (isAsync) { kafkaProducer.send(new ProducerRecord<>(topic, json), new KafkaSendCallBack()); } else { RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord<>(topic, json)).get(); - logger.info("send msg to kafka success,offset={},partition={}.", recordMetadata.offset(), recordMetadata.partition()); + logger.info("send msg to kafka success,size:{},offset={},partition={}.", jsonMaxSize, recordMetadata.offset(), recordMetadata.partition()); } }