add log sink elk api

This commit is contained in:
pepsi 2022-10-31 17:39:53 +08:00
parent d6f3606ee2
commit 3c8fed2553
7 changed files with 80 additions and 94 deletions

View File

@ -1,5 +1,6 @@
package cn.axzo.log.platform.server.controller.api;
import cn.axzo.log.platform.server.dto.LogSinkELKReqDTO;
import cn.axzo.log.platform.server.exception.BizException;
import cn.axzo.log.platform.server.service.LogSinkELKService;
import cn.azxo.framework.common.model.CommonResponse;
@ -9,11 +10,14 @@ import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.StringUtils;
import org.springframework.validation.BindingResult;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import javax.validation.Valid;
/***
* @author: pepsi
* @description: 通过接口方式上报的日志转到kafka中
@ -32,12 +36,14 @@ public class LogSinkELKController {
@RequestMapping(value = "/log/sink/elk", method = RequestMethod.POST)
@ApiOperation(value = "日志中转接入 ELK")
public CommonResponse<?> operateLogDetail(String val) {
if (!StringUtils.hasText(val)) {
return CommonResponse.fail("illegal param");
public CommonResponse<?> operateLogDetail(@RequestBody @Valid LogSinkELKReqDTO req,
BindingResult bindingResult) {
if (bindingResult.hasErrors()) {
logger.warn("illegal param for log sink elk.");
return CommonResponse.fail(bindingResult.getAllErrors().get(0).getDefaultMessage());
}
try {
sinkELKService.sinkELk(val);
sinkELKService.sinkELk(req);
return CommonResponse.success();
} catch (BizException bizException) {
logger.warn("log sink elk biz exception,biz errMsg={}.", bizException.getMessage());

View File

@ -48,7 +48,7 @@ public class LogSinkELKReqDTO {
@ApiModelProperty(value = "工作台Id", position = 8)
private Long workspaceId;
@ApiModelProperty(value = "工作台Id", position = 9)
@ApiModelProperty(value = "设备号", position = 9)
private String sn;
@ApiModelProperty(value = "描述信息", position = 10)
@ -66,4 +66,6 @@ public class LogSinkELKReqDTO {
@ApiModelProperty(value = "扩展字符串", position = 14)
private String logExt;
@ApiModelProperty(value = "签名", position = 14)
private String sign;
}

View File

@ -1,5 +1,6 @@
package cn.axzo.log.platform.server.service;
import cn.axzo.log.platform.server.dto.LogSinkELKReqDTO;
import cn.axzo.log.platform.server.exception.BizException;
import com.fasterxml.jackson.core.JsonProcessingException;
@ -18,5 +19,5 @@ public interface LogSinkELKService {
* @param val
* @return
*/
void sinkELk(String val) throws BizException, ExecutionException, InterruptedException, JsonProcessingException;
void sinkELk(LogSinkELKReqDTO val) throws BizException, ExecutionException, InterruptedException, JsonProcessingException;
}

View File

@ -3,8 +3,8 @@ package cn.axzo.log.platform.server.service.impl;
import cn.axzo.log.platform.server.dto.LogSinkELKReqDTO;
import cn.axzo.log.platform.server.exception.BizException;
import cn.axzo.log.platform.server.service.LogSinkELKService;
import cn.axzo.log.platform.server.utils.CryptoUtil;
import cn.axzo.log.platform.server.utils.JacksonCodecUtil;
import cn.axzo.log.platform.server.utils.SignUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
@ -15,8 +15,8 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.LinkedHashMap;
import java.util.concurrent.ExecutionException;
/***
@ -27,29 +27,29 @@ import java.util.concurrent.ExecutionException;
@Service
public class LogSinkELKServiceImpl implements LogSinkELKService {
private Logger logger = LoggerFactory.getLogger(LogSinkELKServiceImpl.class);
private final Logger logger = LoggerFactory.getLogger(LogSinkELKServiceImpl.class);
private final String SECRET_KEY = "aEsva0zDHECg47P8SuPzmw==";
@Autowired
private KafkaProducer<String, String> kafkaProducer;
@Value("${log-plat.kafka.topic:axzo.dev.log-plat.sink_elk}")
@Value("${log-plat.kafka.topicccc:axzo.pro.log-plat.sink_elk}")
private String topic;
@Value("${log-plat.kafka.send.async:false}")
private Boolean isAsync;
@Override
public void sinkELk(String val) throws BizException, ExecutionException, InterruptedException, JsonProcessingException {
public void sinkELk(LogSinkELKReqDTO req) throws BizException, ExecutionException, InterruptedException, JsonProcessingException {
//解密 && 转换
String json = decryptAndConvert(val);
// String json = JacksonCodecUtil.writeValueAsString(sinkElkReq);
String json = signAndConvert(req);
if (isAsync) {
kafkaProducer.send(new ProducerRecord<>(topic, json), new KafkaSendCallBack());
} else {
RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord<>(topic, json)).get();
logger.info("send msg to kafka success,offset={},partition={}.", recordMetadata.offset(), recordMetadata.partition());
}
}
/**
@ -57,21 +57,24 @@ public class LogSinkELKServiceImpl implements LogSinkELKService {
* <p>
* 思来想去还是原汤对原食怎么来的怎么走.
*
* @param val
* @param req
* @return
* @throws BizException
*/
private String decryptAndConvert(String val) throws BizException {
private String signAndConvert(LogSinkELKReqDTO req) throws BizException {
try {
String json = CryptoUtil.decryptStr(val);
LogSinkELKReqDTO sinkElkReq = JacksonCodecUtil.readValue(json, LogSinkELKReqDTO.class);
if (!StringUtils.hasText(sinkElkReq.getServiceName()) || sinkElkReq.getOperateTime() == null) {
logger.warn("illegal param,serviceName or oprTime can not null.");
throw new BizException("illegal param");
LinkedHashMap<String, String> paramsMap = new LinkedHashMap<>();
paramsMap.put("serviceName", req.getServiceName());
paramsMap.put("operateTime", Long.toString(req.getOperateTime().getTime()));
paramsMap.put("secretKey", SECRET_KEY);
String sign = SignUtil.signParamsMd5(paramsMap);
if (!req.getSign().equals(sign)) {
logger.warn("request sign is illegal,request sign={},server sign={}.", req.getSign(), sign);
throw new BizException("request sign is illegal.");
}
return json;
return JacksonCodecUtil.writeValueAsString(req);
} catch (Exception e) {
logger.warn("decrypt&convert occur exception,msg={}.", e.getMessage());
logger.warn("check sign or convert occur exception,msg={}.", e.getMessage());
throw new BizException(e.getMessage());
}
}

View File

@ -1,50 +0,0 @@
package cn.axzo.log.platform.server.utils;
import cn.hutool.core.codec.Base64;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.crypto.symmetric.AES;
/***
* @author: pepsi
* @description: TODO
* @date: 2022/10/30
*/
public class CryptoUtil {
/***
* 混淆值
*/
private final static String BASE64_SECRET = "aEsva0zDHECg47P8SuPzmw==";
/**
* 转数组
*/
private final static byte[] SECRET_BYTES = Base64.decode(BASE64_SECRET);
/**
* AES 加解密
*/
private final static AES AES = SecureUtil.aes(SECRET_BYTES);
/**
* default utf-8
*
* @param val
* @return
*/
public static String decryptStr(String val) {
return AES.decryptStr(val);
}
/**
* 加密
* default CharsetUtil.CHARSET_UTF_8
*
* @param val
* @return
*/
public static byte[] encryptStr(String val) {
return AES.encrypt(val);
}
}

View File

@ -0,0 +1,28 @@
package cn.axzo.log.platform.server.utils;
import org.springframework.util.DigestUtils;
import org.springframework.util.StringUtils;
import java.util.Map;
/***
* @author: pepsi
* @description: TODO
* @date: 2022/10/31
*/
public class SignUtil {
public static String signParamsMd5(Map<String, String> paramsMap) {
StringBuilder stringBuilder = new StringBuilder(150);
for (Map.Entry<String, String> entry : paramsMap.entrySet()) {
if (StringUtils.hasText(entry.getValue())) {
stringBuilder.append(entry.getKey()).append("=").append(entry.getValue()).append("&");
}
}
String paramsStr = stringBuilder.toString();
if (StringUtils.hasText(paramsStr)) {
paramsStr = paramsStr.substring(0, paramsStr.length() - 1);
}
return DigestUtils.md5DigestAsHex(paramsStr.getBytes());
}
}

View File

@ -1,11 +1,14 @@
package cn.axzo.log.platform.server.service;
import cn.axzo.log.platform.server.utils.SignUtil;
import cn.hutool.core.codec.Base64;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.crypto.digest.DigestAlgorithm;
import cn.hutool.crypto.digest.Digester;
import cn.hutool.crypto.symmetric.AES;
import java.util.LinkedHashMap;
/***
* @author: pepsi
* @description: TODO
@ -28,28 +31,21 @@ public class AESServiceTest {
public static void main(String[] args) {
// testMD5();
testAES();
LinkedHashMap<String, String> allParams = new LinkedHashMap<>();
allParams.put("serviceName", "panel_netlog");
allParams.put("operateTime", "1667138755891");
allParams.put("secretKey", "aEsva0zDHECg47P8SuPzmw==");
System.out.println(SignUtil.signParamsMd5(allParams));
}
public static void testAES() {
//加密
String encrypt = aes.encryptHex("{\n" +
"\t\"serviceName\": \"serviceName\",\n" +
"\t\"url\": \"url\",\n" +
"\t\"content\": \"content\",\n" +
"\t\"operateBefore\": \"operateBefore\",\n" +
"\t\"operateAfter\": \"operateAfter\",\n" +
"\t\"operateTime\": 1667138755891,\n" +
"\t\"operateEndTime\": 1667138755891,\n" +
"\t\"workspaceId\": 1234,\n" +
"\t\"sn\": \"SN\",\n" +
"\t\"desc\": \"desc\",\n" +
"\t\"userInfo\": \"userInfo\",\n" +
"\t\"retryNum\": 3,\n" +
"\t\"logExt\": \"12321\"\n" +
"}");
System.out.println(encrypt);
System.out.println(new String(aes.decrypt(encrypt)));
public static void testSign() {
LinkedHashMap<String, Object> allParams = new LinkedHashMap<>();
allParams.put("serviceName", "panel_netlog");
allParams.put("operateTime", "1667138755891");
allParams.put("secretKey", "aEsva0zDHECg47P8SuPzmw==");
// String sign = SignUtil.signParams(DigestAlgorithm.MD5, allParams, "&", "=", true);
// System.out.println(sign);
}
}