add log sink elk api

This commit is contained in:
pepsi 2022-10-31 10:59:21 +08:00
parent 26fde0d811
commit 3f75fe6c65

View File

@ -41,33 +41,35 @@ public class LogSinkELKServiceImpl implements LogSinkELKService {
@Override
public void sinkELk(String val) throws BizException, ExecutionException, InterruptedException, JsonProcessingException {
//解密 && 转换
LogSinkELKReqDTO sinkElkReq = decryptAndConvert(val);
String json = JacksonCodecUtil.writeValueAsString(sinkElkReq);
String json = decryptAndConvert(val);
// String json = JacksonCodecUtil.writeValueAsString(sinkElkReq);
if (isAsync) {
kafkaProducer.send(new ProducerRecord<>(topic, json), new KafkaSendCallBack());
} else {
RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord<>(topic, json)).get();
logger.debug("send msg to kafka success,offset={},partition={}.", recordMetadata.offset(), recordMetadata.partition());
logger.info("send msg to kafka success,offset={},partition={}.", recordMetadata.offset(), recordMetadata.partition());
}
}
/**
* 解密和转换对象
* <p>
* 思来想去还是原汤对原食怎么来的怎么走.
*
* @param val
* @return
* @throws BizException
*/
private LogSinkELKReqDTO decryptAndConvert(String val) throws BizException {
private String decryptAndConvert(String val) throws BizException {
try {
String json = CryptoUtil.decryptStr(val);
LogSinkELKReqDTO sinkElkReq = JacksonCodecUtil.readValue(json, LogSinkELKReqDTO.class);
if (!StringUtils.hasText(sinkElkReq.getServiceName())
|| sinkElkReq.getOperateTime() == null) {
if (!StringUtils.hasText(sinkElkReq.getServiceName()) || sinkElkReq.getOperateTime() == null) {
logger.warn("illegal param,serviceName or oprTime can not null.");
throw new BizException("illegal param");
}
return sinkElkReq;
return json;
} catch (Exception e) {
logger.warn("decrypt&convert occur exception,msg={}.", e.getMessage());
throw new BizException(e.getMessage());
@ -79,7 +81,7 @@ public class LogSinkELKServiceImpl implements LogSinkELKService {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata != null) {
logger.debug("send msg to kafka success,offset={},partition={}.", recordMetadata.offset(), recordMetadata.partition());
logger.info("send msg to kafka success,offset={},partition={}.", recordMetadata.offset(), recordMetadata.partition());
} else {
logger.error("send msg to kafka failed,", e);
}