Merge branch 'feature/join_elk/221031' into 'dev'

Feature/join elk/221031

See merge request infra/axzo-log-plat!76
This commit is contained in:
彭健 2022-10-31 03:05:20 +00:00
commit 91ff0ef6cb
11 changed files with 455 additions and 8 deletions

View File

@ -128,14 +128,11 @@
<artifactId>basics-profiles-api</artifactId>
</dependency>
<!-- 测试 client 使用
<dependency>
<groupId>cn.axzo.platform</groupId>
<artifactId>axzo-log-api</artifactId>
<version>1.0.0-SNAPSHOT</version>
<scope>test</scope>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
-->
</dependencies>
<build>

View File

@ -31,12 +31,15 @@ public class LogPlatApplication {
Environment env = context.getEnvironment();
logger.info("Application 【{}】 is running on 【{}】 environment!\n\t" +
"MySQL: \t{}\t username:{}\t\n\t" +
"RabbitMQ: \t{}\t username:{},\t\n",
"RabbitMQ: \t{}\t username:{},\t\n" +
"kafka: \t{}\t topic:{},\t\n",
env.getProperty("spring.application.name"),
env.getProperty("spring.profiles.active"),
env.getProperty("spring.datasource.url"),
env.getProperty("spring.datasource.username"),
env.getProperty("spring.rabbitmq.addresses"),
env.getProperty("spring.rabbitmq.username"));
env.getProperty("spring.rabbitmq.username"),
env.getProperty("log-plat.kafka.servers"),
env.getProperty("log-plat.kafka.topic"));
}
}

View File

@ -0,0 +1,37 @@
package cn.axzo.log.platform.server.config;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.Properties;
/***
* @author: pepsi
* @description: TODO
* @date: 2022/10/30
*/
@Component
public class KafkaProducerConfig {
@Value("${log-plat.kafka.servers:120.46.182.212:9092,120.46.188.248:9092}")
private String servers;
@Value("${log-plat.kafka.topic:axzo.dev.log-plat.sink_elk}")
private String topic;
@Bean
public KafkaProducer<String, String> kafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "log-plat-producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<>(props);
}
}

View File

@ -0,0 +1,50 @@
package cn.axzo.log.platform.server.controller.api;
import cn.axzo.log.platform.server.exception.BizException;
import cn.axzo.log.platform.server.service.LogSinkELKService;
import cn.azxo.framework.common.model.CommonResponse;
import com.github.xiaoymin.knife4j.annotations.ApiSupport;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
/***
* @author: pepsi
* @description: 通过接口方式上报的日志转到kafka中
* @date: 2022/10/29
*/
@Api(tags = "日志转存 ELK 接口")
@ApiSupport(author = "田立勇")
@RestController
@RequestMapping("/api/v1")
public class LogSinkELKController {
private final Logger logger = LoggerFactory.getLogger(LogSinkELKController.class);
@Autowired
private LogSinkELKService sinkELKService;
@RequestMapping(value = "/log/sink/elk", method = RequestMethod.POST)
@ApiOperation(value = "日志中转接入 ELK")
public CommonResponse<?> operateLogDetail(String val) {
if (!StringUtils.hasText(val)) {
return CommonResponse.fail("illegal param");
}
try {
sinkELKService.sinkELk(val);
return CommonResponse.success();
} catch (BizException bizException) {
logger.warn("log sink elk biz exception,biz errMsg={}.", bizException.getMessage());
return CommonResponse.fail(bizException.getMessage());
} catch (Exception e) {
logger.error("log sink elk exception,errMsg={}.", e.getMessage());
return CommonResponse.fail(e.getMessage());
}
}
}

View File

@ -0,0 +1,69 @@
package cn.axzo.log.platform.server.dto;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotNull;
import java.util.Date;
import java.util.Map;
/***
* @author: pepsi
* @description: TODO
* @date: 2022/10/30
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class LogSinkELKReqDTO {
@ApiModelProperty(value = "服务名称", position = 1)
@NotNull(message = "服务名不可为空")
private String serviceName;
@ApiModelProperty(value = "url信息", position = 2)
private String url;
@ApiModelProperty(value = "内容摘要", position = 3)
private String content;
@ApiModelProperty(value = "操作前数据", position = 4)
private String operateBefore;
@ApiModelProperty(value = "操作后数据", position = 5)
private String operateAfter;
@ApiModelProperty(value = "发生时间", required = true, position = 6, example = "2022-10-20 12:12:12")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
@NotNull(message = "操作时间不能为空")
private Date operateTime;
@ApiModelProperty(value = "结束时间", position = 7, example = "2022-10-20 12:12:12")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8")
private Date operateEndTime;
@ApiModelProperty(value = "工作台Id", position = 8)
private Long workspaceId;
@ApiModelProperty(value = "工作台Id", position = 9)
private String sn;
@ApiModelProperty(value = "描述信息", position = 10)
private String desc;
@ApiModelProperty(value = "用户信息", position = 11)
private String userInfo;
@ApiModelProperty(value = "重试次数", position = 12)
private int retryNum;
@ApiModelProperty(value = "扩展 map", position = 13)
private Map<String, String> extValMap;
@ApiModelProperty(value = "扩展字符串", position = 14)
private String logExt;
}

View File

@ -0,0 +1,18 @@
package cn.axzo.log.platform.server.exception;
/***
* @author: pepsi
* @description: TODO
* @date: 2022/10/30
*/
public class BizException extends Exception {
public BizException() {
super();
}
public BizException(String msg) {
super(msg);
}
}

View File

@ -0,0 +1,22 @@
package cn.axzo.log.platform.server.service;
import cn.axzo.log.platform.server.exception.BizException;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.concurrent.ExecutionException;
/***
* @author: pepsi
* @description: TODO
* @date: 2022/10/29
*/
public interface LogSinkELKService {
/***
* 解密& 消息投入kafka
* @param val
* @return
*/
void sinkELk(String val) throws BizException, ExecutionException, InterruptedException, JsonProcessingException;
}

View File

@ -0,0 +1,90 @@
package cn.axzo.log.platform.server.service.impl;
import cn.axzo.log.platform.server.dto.LogSinkELKReqDTO;
import cn.axzo.log.platform.server.exception.BizException;
import cn.axzo.log.platform.server.service.LogSinkELKService;
import cn.axzo.log.platform.server.utils.CryptoUtil;
import cn.axzo.log.platform.server.utils.JacksonCodecUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.concurrent.ExecutionException;
/***
* @author: pepsi
* @description: TODO
* @date: 2022/10/29
*/
@Service
public class LogSinkELKServiceImpl implements LogSinkELKService {
private Logger logger = LoggerFactory.getLogger(LogSinkELKServiceImpl.class);
@Autowired
private KafkaProducer<String, String> kafkaProducer;
@Value("${log-plat.kafka.topic:axzo.dev.log-plat.sink_elk}")
private String topic;
@Value("${log-plat.kafka.send.async:false}")
private Boolean isAsync;
@Override
public void sinkELk(String val) throws BizException, ExecutionException, InterruptedException, JsonProcessingException {
//解密 && 转换
String json = decryptAndConvert(val);
// String json = JacksonCodecUtil.writeValueAsString(sinkElkReq);
if (isAsync) {
kafkaProducer.send(new ProducerRecord<>(topic, json), new KafkaSendCallBack());
} else {
RecordMetadata recordMetadata = kafkaProducer.send(new ProducerRecord<>(topic, json)).get();
logger.info("send msg to kafka success,offset={},partition={}.", recordMetadata.offset(), recordMetadata.partition());
}
}
/**
* 解密和转换对象
* <p>
* 思来想去还是原汤对原食怎么来的怎么走.
*
* @param val
* @return
* @throws BizException
*/
private String decryptAndConvert(String val) throws BizException {
try {
String json = CryptoUtil.decryptStr(val);
LogSinkELKReqDTO sinkElkReq = JacksonCodecUtil.readValue(json, LogSinkELKReqDTO.class);
if (!StringUtils.hasText(sinkElkReq.getServiceName()) || sinkElkReq.getOperateTime() == null) {
logger.warn("illegal param,serviceName or oprTime can not null.");
throw new BizException("illegal param");
}
return json;
} catch (Exception e) {
logger.warn("decrypt&convert occur exception,msg={}.", e.getMessage());
throw new BizException(e.getMessage());
}
}
class KafkaSendCallBack implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata != null) {
logger.info("send msg to kafka success,offset={},partition={}.", recordMetadata.offset(), recordMetadata.partition());
} else {
logger.error("send msg to kafka failed,", e);
}
}
}
}

View File

@ -0,0 +1,50 @@
package cn.axzo.log.platform.server.utils;
import cn.hutool.core.codec.Base64;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.crypto.symmetric.AES;
/***
* @author: pepsi
* @description: TODO
* @date: 2022/10/30
*/
public class CryptoUtil {
/***
* 混淆值
*/
private final static String BASE64_SECRET = "aEsva0zDHECg47P8SuPzmw==";
/**
* 转数组
*/
private final static byte[] SECRET_BYTES = Base64.decode(BASE64_SECRET);
/**
* AES 加解密
*/
private final static AES AES = SecureUtil.aes(SECRET_BYTES);
/**
* default utf-8
*
* @param val
* @return
*/
public static String decryptStr(String val) {
return AES.decryptStr(val);
}
/**
* 加密
* default CharsetUtil.CHARSET_UTF_8
*
* @param val
* @return
*/
public static byte[] encryptStr(String val) {
return AES.encrypt(val);
}
}

View File

@ -0,0 +1,56 @@
package cn.axzo.log.platform.server.utils;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
/***
* @author: pepsi
* @description: 单例的 ObjectMapper
* @date: 2022/10/30
*/
public class JacksonCodecUtil {
private static final ObjectMapper mapper;
static {
//创建ObjectMapper对象
mapper = new ObjectMapper();
//configure方法 配置一些需要的参数
mapper.enable(SerializationFeature.INDENT_OUTPUT);
//序列化的时候序列对象的那些属性
//JsonInclude.Include.NON_DEFAULT 属性为默认值不序列化
//JsonInclude.Include.ALWAYS 所有属性
//JsonInclude.Include.NON_EMPTY 属性为 或者为 NULL 都不序列化
//JsonInclude.Include.NON_NULL 属性为NULL 不序列化
mapper.setSerializationInclusion(JsonInclude.Include.ALWAYS);
//反序列化时,遇到未知属性会不会报错
//true - 遇到没有的属性就报错 false - 没有的属性不会管不会报错
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
//如果是空对象的时候,不抛异常
mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
// 忽略 transient 修饰的属性
mapper.configure(MapperFeature.PROPAGATE_TRANSIENT_MARKER, true);
}
public static ObjectMapper getObjectMapper() {
return mapper;
}
public static <T> T readValue(String content, Class<T> clazz) throws JsonProcessingException {
return mapper.readValue(content, clazz);
}
public static String writeValueAsString(Object val) throws JsonProcessingException {
return mapper.writeValueAsString(val);
}
}

View File

@ -0,0 +1,55 @@
package cn.axzo.log.platform.server.service;
import cn.hutool.core.codec.Base64;
import cn.hutool.crypto.SecureUtil;
import cn.hutool.crypto.digest.DigestAlgorithm;
import cn.hutool.crypto.digest.Digester;
import cn.hutool.crypto.symmetric.AES;
/***
* @author: pepsi
* @description: TODO
* @date: 2022/10/29
*/
public class AESServiceTest {
private static final String BASE64_SECRET = "aEsva0zDHECg47P8SuPzmw==";
private final static byte[] SECRET_BYTES = Base64.decode(BASE64_SECRET);
private final static AES aes = SecureUtil.aes(SECRET_BYTES);
// @Test
public static void testMD5() {
Digester md5 = new Digester(DigestAlgorithm.MD5);
String digestHex = md5.digestHex("{\"@timestamp\":\"2022-10-28T18:10:39.179+08:00\",\"app\":\"leaf\",\"level\":\"INFO\",\"traceId\":\"\",\"thread\":\"Thread-IdRule-Update\",\"class\":\"c.a.l.m.impl.IdRuleCacheManagerImpl\",\"message\":\"refresh end! id rule config : {\\\"payment@payment_no\\\":{\\\"bizCode\\\":\\\"001\\\",\\\"bizTag\\\":\\\"payment@payment_no\\\",\\\"createAt\\\":1626147684000,\\\"dateFormat\\\":\\\"yyyyMMddHHmmss\\\",\\\"isDelete\\\":0,\\\"placeholder\\\":\\\"0000\\\",\\\"remark\\\":\\\"支付系统支付单号生成规则\\\",\\\"ruleExpression\\\":\\\"${dateFormat}${bizCode}${placeholder}${sequenceNo}\\\",\\\"sequenceLength\\\":11,\\\"updateAt\\\":1626147684000},\\\"payment@refund_order_no\\\":{\\\"bizCode\\\":\\\"002\\\",\\\"bizTag\\\":\\\"payment@refund_order_no\\\",\\\"createAt\\\":1632985337000,\\\"dateFormat\\\":\\\"yyyyMMddHHmmss\\\",\\\"isDelete\\\":0,\\\"placeholder\\\":\\\"0000\\\",\\\"remark\\\":\\\"支付系统退款单号生成规则\\\",\\\"ruleExpression\\\":\\\"${dateFormat}${bizCode}${placeholder}${sequenceNo}\\\",\\\"sequenceLength\\\":11,\\\"updateAt\\\":1632985337000}}\",\"m\":null,\"error_level\":\"\",\"error_type\":\"\",\"stack_trace\":\"\"}");
System.out.println(digestHex);
}
public static void main(String[] args) {
// testMD5();
testAES();
}
public static void testAES() {
//加密
String encrypt = aes.encryptHex("{\n" +
"\t\"serviceName\": \"serviceName\",\n" +
"\t\"url\": \"url\",\n" +
"\t\"content\": \"content\",\n" +
"\t\"operateBefore\": \"operateBefore\",\n" +
"\t\"operateAfter\": \"operateAfter\",\n" +
"\t\"operateTime\": 1667138755891,\n" +
"\t\"operateEndTime\": 1667138755891,\n" +
"\t\"workspaceId\": 1234,\n" +
"\t\"sn\": \"SN\",\n" +
"\t\"desc\": \"desc\",\n" +
"\t\"userInfo\": \"userInfo\",\n" +
"\t\"retryNum\": 3,\n" +
"\t\"logExt\": \"12321\"\n" +
"}");
System.out.println(encrypt);
System.out.println(new String(aes.decrypt(encrypt)));
}
}