diff --git a/canal-alarm/pom.xml b/canal-alarm/pom.xml new file mode 100644 index 0000000..38799e9 --- /dev/null +++ b/canal-alarm/pom.xml @@ -0,0 +1,77 @@ + + + + 4.0.0 + + cn.axzo.framework + canal-alarm + 1.0.0 + + canal-alarm + + + UTF-8 + 1.8 + 1.8 + + + + + com.alibaba.otter + canal.common + 1.1.5 + provided + + + com.alibaba.otter + canal.instance.manager + 1.1.5 + provided + + + com.google.guava + guava + 20.0 + provided + + + junit + junit + 4.11 + test + + + + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-sources + + jar + + + + + + + + + + + rdc-releases + Nexus Release Repository + https://packages.aliyun.com/maven/repository/2005773-release-XI7cl5/ + + + rdc-snapshots + Nexus Snapshot Repository + https://packages.aliyun.com/maven/repository/2005773-snapshot-V5Gjdf/ + + + + diff --git a/canal-alarm/src/main/java/cn/axzo/framework/canal/alarm/DingCanalAlarmHandler.java b/canal-alarm/src/main/java/cn/axzo/framework/canal/alarm/DingCanalAlarmHandler.java new file mode 100644 index 0000000..2d0ec68 --- /dev/null +++ b/canal-alarm/src/main/java/cn/axzo/framework/canal/alarm/DingCanalAlarmHandler.java @@ -0,0 +1,131 @@ +package cn.axzo.framework.canal.alarm; + +import com.alibaba.fastjson.JSON; +import com.alibaba.otter.canal.common.AbstractCanalLifeCycle; +import com.alibaba.otter.canal.common.alarm.CanalAlarmHandler; +import com.alibaba.otter.canal.common.alarm.LogAlarmHandler; +import com.alibaba.otter.canal.instance.manager.plain.HttpHelper; +import com.google.common.base.Joiner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.EnvironmentAware; +import org.springframework.core.env.Environment; +import org.springframework.util.StringUtils; + +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * Canal 钉钉报警 + * + * @author zhaoyong + * @see DingCanalAlarmHandler + * @since 2022-01-10 18:03 + */ +public class DingCanalAlarmHandler extends AbstractCanalLifeCycle implements CanalAlarmHandler, EnvironmentAware { + + private static final Logger logger = LoggerFactory.getLogger(LogAlarmHandler.class); + + /** + * 通知地址 + */ + private String alarmUrl; + /** + * 密钥 + */ + private String secretKey; + /** + * 接收者 + */ + private String receive; + /** + * HTTP 请求 + */ + private HttpHelper httpHelper; + + private Environment environment; + + @Override + public void sendAlarm(String destination, String msg) { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String text = String.format( + "[警报] %s - Canal异常服务报警 \n\n" + + " --- \n\n " + + "destination:%s \n\n " + + "OWNER:@%s \n\n" + + "异常信息:%s \n\n " + + " --- \n\n " + + "**播报时间:%s**", + // 环境 + getProfile(), + // destination + destination, + // 告警手机号 + getReceives(), + // 响应参数 + msg, + // 当前时间 + sdf.format(new Date()) + ); + MarkdownMessageRequest request = new MarkdownMessageRequest(); + request.setMsgtype("markdown"); + MarkdownMessage markdown = new MarkdownMessage(); + markdown.setTitle("Canal服务报警"); + markdown.setText(text); + request.setMarkdown(markdown); + try { + logger.info("[canal -> ding] DingCanalAlarmHandler.sendAlarm request param is {}", JSON.toJSONString(request)); + String response = httpHelper.post(alarmUrl + secretKey, null, request,5000); + logger.info("[canal <- ding] DingCanalAlarmHandler.sendAlarm response is {}", response); + } catch (Exception e) { + logger.error("[canal <- ding] DingCanalAlarmHandler.sendAlarm response is error", e); + } + } + + @Override + public void setEnvironment(Environment environment) { + this.environment = environment; + } + + public void setAlarmUrl(String alarmUrl) { + this.alarmUrl = alarmUrl; + } + + public void setSecretKey(String secretKey) { + this.secretKey = secretKey; + } + + public void setHttpHelper(HttpHelper httpHelper) { + this.httpHelper = httpHelper; + } + + public void setReceive(String receive) { + this.receive = receive; + } + + private String getProfile() { + String[] activeProfiles = environment.getActiveProfiles(); + if(activeProfiles == null || activeProfiles.length == 0) { + return "dev"; + } + return activeProfiles[0]; + } + + private String getReceives() { + if(StringUtils.isEmpty(receive)) { + return "13585844124"; + } + String[] receives = receive.split(","); + return Joiner.on(", @").join(receives); + } + + public static void main(String[] args) { + DingCanalAlarmHandler handler = new DingCanalAlarmHandler(); + handler.setAlarmUrl("https://oapi.dingtalk.com/robot/send?access_token="); + handler.setSecretKey("4adbec74d777b6052cdae2ab25edb2ec1f32ef2d9b2c46a521a9b8258a1cdf47"); + handler.setReceive("13585844124"); + handler.setHttpHelper(new HttpHelper()); + handler.sendAlarm("test", "test"); + } + +} diff --git a/canal-alarm/src/main/java/cn/axzo/framework/canal/alarm/MarkdownMessage.java b/canal-alarm/src/main/java/cn/axzo/framework/canal/alarm/MarkdownMessage.java new file mode 100644 index 0000000..fb4c2d5 --- /dev/null +++ b/canal-alarm/src/main/java/cn/axzo/framework/canal/alarm/MarkdownMessage.java @@ -0,0 +1,46 @@ +package cn.axzo.framework.canal.alarm; + +/** + * markdown + * + * @author zhaoyong + * @see MarkdownMessage + * @since 2021-11-27 10:47 + */ +public class MarkdownMessage { + + /** + * 首屏会话透出的展示内容。 + */ + private String title; + + /** + * markdown格式的消息。 + */ + private String text; + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getText() { + return text; + } + + public void setText(String text) { + this.text = text; + } + + @Override + public String toString() { + return "MarkdownMessage{" + + "title='" + title + '\'' + + ", text='" + text + '\'' + + '}'; + } +} + diff --git a/canal-alarm/src/main/java/cn/axzo/framework/canal/alarm/MarkdownMessageRequest.java b/canal-alarm/src/main/java/cn/axzo/framework/canal/alarm/MarkdownMessageRequest.java new file mode 100644 index 0000000..3adf972 --- /dev/null +++ b/canal-alarm/src/main/java/cn/axzo/framework/canal/alarm/MarkdownMessageRequest.java @@ -0,0 +1,31 @@ +package cn.axzo.framework.canal.alarm; + +/** + * Markdown 消息 + * + * @author zhaoyong + * @see MarkdownMessageRequest + * @since 2022-01-10 18:06 + */ +public class MarkdownMessageRequest { + + private String msgtype; + + private MarkdownMessage markdown; + + public String getMsgtype() { + return msgtype; + } + + public void setMsgtype(String msgtype) { + this.msgtype = msgtype; + } + + public MarkdownMessage getMarkdown() { + return markdown; + } + + public void setMarkdown(MarkdownMessage markdown) { + this.markdown = markdown; + } +} diff --git a/common-common/pom.xml b/common-common/pom.xml index 878a036..c799b3f 100644 --- a/common-common/pom.xml +++ b/common-common/pom.xml @@ -7,7 +7,7 @@ cn.axzo.framework common-common - 1.1.4 + 1.1.5 diff --git a/common-common/src/main/java/cn/azxo/framework/common/service/SnowflakeIdWorker.java b/common-common/src/main/java/cn/azxo/framework/common/service/SnowflakeIdWorker.java new file mode 100644 index 0000000..2dedc54 --- /dev/null +++ b/common-common/src/main/java/cn/azxo/framework/common/service/SnowflakeIdWorker.java @@ -0,0 +1,113 @@ +package cn.azxo.framework.common.service; + +/** + * 雪花算法 + * + * @author zhaoyong + * @see SnowflakeIdWorker + * @since 2021-12-12 04:25 + */ +public class SnowflakeIdWorker { + /** 开始时间截 (建议用服务第一次上线的时间,到毫秒级的时间戳) */ + private final long twepoch = 687888001020L; + + /** 机器id所占的位数 */ + private final long workerIdBits = 10L; + + /** 支持的最大机器id,结果是1023 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) */ + private final long maxWorkerId = -1L ^ (-1L << workerIdBits); + + /** 序列在id中占的位数 */ + private final long sequenceBits = 12L; + + /** 机器ID向左移12位 */ + private final long workerIdShift = sequenceBits; + + /** 时间截向左移22位(10+12) */ + private final long timestampLeftShift = sequenceBits + workerIdBits; + + /** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095) + * <<为左移,每左移动1位,则扩大1倍 + * */ + private final long sequenceMask = -1L ^ (-1L << sequenceBits); + + /** 工作机器ID(0~1024) */ + private long workerId; + + /** 毫秒内序列(0~4095) */ + private long sequence = 0L; + + /** 上次生成ID的时间截 */ + private long lastTimestamp = -1L; + + //==============================Constructors===================================== + /** + * 构造函数 + * @param workerId 工作ID (0~1023) + */ + public SnowflakeIdWorker(long workerId) { + if (workerId > maxWorkerId || workerId < 0) { + throw new IllegalArgumentException(String.format("workerId can't be greater than %d or less than 0", maxWorkerId)); + } + this.workerId = workerId; + } + + // ==============================Methods========================================== + /** + * 获得下一个ID (该方法是线程安全的) + * @return SnowflakeId + */ + public synchronized long nextId() { + long timestamp = timeGen(); + //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常 + if (timestamp < lastTimestamp) { + throw new RuntimeException( + String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp)); + } + + //如果是同一时间生成的,则进行毫秒内序列 + if (lastTimestamp == timestamp) { + //如果毫秒相同,则从0递增生成序列号 + sequence = (sequence + 1) & sequenceMask; + //毫秒内序列溢出 + if (sequence == 0) { + //阻塞到下一个毫秒,获得新的时间戳 + timestamp = tilNextMillis(lastTimestamp); + } + } + //时间戳改变,毫秒内序列重置 + else { + sequence = 0L; + } + + //上次生成ID的时间截 + lastTimestamp = timestamp; + + //移位并通过或运算拼到一起组成64位的ID + return ((timestamp - twepoch) << timestampLeftShift) // + | (workerId << workerIdShift) // + | sequence; + } + + /** + * 阻塞到下一个毫秒,直到获得新的时间戳 + * @param lastTimestamp 上次生成ID的时间截 + * @return 当前时间戳 + */ + protected long tilNextMillis(long lastTimestamp) { + long timestamp = timeGen(); + while (timestamp <= lastTimestamp) { + timestamp = timeGen(); + } + return timestamp; + } + + /** + * 返回以毫秒为单位的当前时间,从1970-01-01 08:00:00算起 + * @return 当前时间(毫秒) + */ + protected long timeGen() { + return System.currentTimeMillis(); + } + +} diff --git a/pom.xml b/pom.xml index 703fe48..0af8b5e 100644 --- a/pom.xml +++ b/pom.xml @@ -16,6 +16,7 @@ smart-datasource asyncTool alarm-spring-boot-starter + canal-alarm