Merge branch 'hotfix/fix_over_kafka_max_size_0828' into 'master'
修复超过kafka消息大小忽略处理 See merge request universal/infrastructure/backend/axzo-log-plat!125
This commit is contained in:
commit
3c225daaa5
@ -19,6 +19,7 @@ import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
@ -43,6 +44,9 @@ public class LogSinkELKServiceImpl implements LogSinkELKService {
|
||||
@Value("${log-plat.kafka.send.async:false}")
|
||||
private Boolean isAsync;
|
||||
|
||||
@Value("${log-plat.max.request.size:1040000}")
|
||||
private Long maxSize;
|
||||
|
||||
private Map<String, String> appKeyConfigMap = new HashMap<>();
|
||||
|
||||
@Value("${log-plat.sign.keysecret.config}")
|
||||
@ -57,11 +61,18 @@ public class LogSinkELKServiceImpl implements LogSinkELKService {
|
||||
public void sinkELk(LogSinkELKReqDTO req) throws BizException, ExecutionException, InterruptedException, JsonProcessingException {
|
||||
//解密 && 转换
|
||||
String json = signAndConvert(req);
|
||||
|
||||
long jsonMaxSize = json.getBytes(StandardCharsets.UTF_8).length;
|
||||
if (jsonMaxSize > maxSize) {
|
||||
logger.warn("json size is {} > {}", jsonMaxSize ,maxSize);
|
||||
return;
|
||||
}
|
||||
|
||||
if (isAsync) {
|
||||
kafkaProducer.send(new ProducerRecord<>(topic, json), new KafkaSendCallBack());
|
||||
} else {
|
||||
RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord<>(topic, json)).get();
|
||||
logger.info("send msg to kafka success,offset={},partition={}.", recordMetadata.offset(), recordMetadata.partition());
|
||||
logger.info("send msg to kafka success,size:{},offset={},partition={}.", jsonMaxSize, recordMetadata.offset(), recordMetadata.partition());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user