feat: 增加redis的支持
This commit is contained in:
parent
bf66aedda5
commit
4df3579a34
@ -11,7 +11,9 @@ public enum ResultCode implements IResultCode {
|
||||
INVALID_PARAMS("002", "请求参数格式错误", 400),
|
||||
NETWORK_FAILURE("003", "内部网络错误", 500),
|
||||
PROCESS_TIMEOUT("004", "内部处理超时", 500),
|
||||
APP_CONFIG_ERROR("005", "服务配置错误", 500);
|
||||
APP_CONFIG_ERROR("005", "服务配置错误", 500),
|
||||
OPERATE_TOO_FREQUENTLY("006", "操作过于频繁", 500),
|
||||
;
|
||||
|
||||
final private String code;
|
||||
|
||||
|
||||
1
pom.xml
1
pom.xml
@ -32,6 +32,7 @@
|
||||
<module>web-support-lib</module>
|
||||
<module>gateway-support-lib</module>
|
||||
<module>event-support-lib</module>
|
||||
<module>redis-support-lib</module>
|
||||
</modules>
|
||||
|
||||
<dependencies>
|
||||
|
||||
26
redis-support-lib/pom.xml
Normal file
26
redis-support-lib/pom.xml
Normal file
@ -0,0 +1,26 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<parent>
|
||||
<groupId>cn.axzo.foundation</groupId>
|
||||
<artifactId>axzo-lib-box</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<artifactId>redis-support-lib</artifactId>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>cn.axzo.foundation</groupId>
|
||||
<artifactId>web-support-lib</artifactId>
|
||||
<version>2.0.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-data-redis</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@ -0,0 +1,75 @@
|
||||
package cn.axzo.foundation.redis.support;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
/**
|
||||
* 提供简单的广播Client. 通过build获得的BroadcastQueue发送广播. 所有节点都会收到广播并回调consumer
|
||||
* <ul>
|
||||
* <li>通过build获得BroadcastQueue, 调用queue的broadcast方法. 发送广播</li>
|
||||
* <li>收到广播会会主动回调注册时的BiConsumer</li>
|
||||
* <li>目前支持通过redis的pub/sub实现, 因此需要依赖redisTemplate</li>
|
||||
* <li>lettuce来处理回调, 不建议在回调用做比较重的业务</li>
|
||||
* </ul>
|
||||
*/
|
||||
public interface EventBroadcast {
|
||||
|
||||
|
||||
/**
|
||||
* 通过queueName, 回调consumer来构建广播队列
|
||||
*
|
||||
* @param queueName
|
||||
* @param consumer
|
||||
* @return
|
||||
*/
|
||||
BroadcastQueue build(String queueName, BiConsumer<String, BroadcastEvent> consumer);
|
||||
|
||||
/**
|
||||
* 广播队列. 向队列中广播消息. 所有的节点(包含自身)都会收到广播消息
|
||||
*/
|
||||
interface BroadcastQueue {
|
||||
/**
|
||||
* 获得队列名称, 与注册时的queueName相同
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
String getName();
|
||||
|
||||
/**
|
||||
* 广播, data为需要广播的内容
|
||||
*
|
||||
* @param data
|
||||
* @return
|
||||
*/
|
||||
boolean broadcast(JSONObject data);
|
||||
}
|
||||
|
||||
/**
|
||||
* 广播事件
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
class BroadcastEvent {
|
||||
String name;
|
||||
JSONObject data;
|
||||
/**
|
||||
* 发起广播的节点信息
|
||||
*/
|
||||
JSONObject senderRuntime;
|
||||
|
||||
public String toJSONString() {
|
||||
return new JSONObject()
|
||||
.fluentPut("name", name)
|
||||
.fluentPut("data", data)
|
||||
.fluentPut("senderRuntime", senderRuntime)
|
||||
.toJSONString();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,49 @@
|
||||
package cn.axzo.foundation.redis.support;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import lombok.*;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
/**
|
||||
* 简单的本地缓存协调客户端. 主要的目的是通过事件在<s>不同节点</s>间处理缓存dirty的场景.
|
||||
* 提供了2中机制来处理缓存dirty的场景
|
||||
* registerCacheDirtyListener 精确处理key匹配
|
||||
* registerCacheDirtyInterceptor 统一的事件匹配. 可以通过检查 CacheDirtiedEvent.data中的数据是否包含特定的json-path来处理缓存.
|
||||
*/
|
||||
public interface LocalCacheCoordinate {
|
||||
|
||||
|
||||
void notifyCacheDirty(CacheDirtiedEvent event);
|
||||
|
||||
/**
|
||||
* @param key {@link CacheDirtiedEvent#key} 中定义数据
|
||||
* @param consumer
|
||||
*/
|
||||
void registerCacheDirtyListener(String key, Consumer<CacheDirtiedEvent> consumer);
|
||||
|
||||
void registerCacheDirtyInterceptor(CacheDirtiedEventInterceptor interceptor);
|
||||
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Data
|
||||
@Builder
|
||||
final class CacheDirtiedEvent {
|
||||
@NonNull
|
||||
String key;
|
||||
@NonNull
|
||||
JSONObject data;
|
||||
}
|
||||
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Data
|
||||
@Builder
|
||||
final class CacheDirtiedEventInterceptor {
|
||||
@NonNull
|
||||
Predicate<CacheDirtiedEvent> filter;
|
||||
@NonNull
|
||||
Consumer<CacheDirtiedEvent> consumer;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,270 @@
|
||||
package cn.axzo.foundation.redis.support;
|
||||
|
||||
import cn.axzo.foundation.result.ResultCode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* 使用redis实现的分布式锁. 为了不依赖redis, 这里需要调用者实现{@link RedisWrapper}. 如下:
|
||||
* <pre>{@code
|
||||
* @Bean
|
||||
* RedisLock.RedisWrapper redisWrapper(RedisTemplate redisTemplate) {
|
||||
* return new RedisLock.RedisWrapper() {
|
||||
* @Override
|
||||
* public void delete(String key) {
|
||||
* redisTemplate.delete(key);
|
||||
* }
|
||||
*
|
||||
* @Override
|
||||
* public boolean lock(String key, String value, long expireMills) {
|
||||
* return redisTemplate.opsForValue().setIfAbsent(key, value, expireMills, TimeUnit.MILLISECONDS);
|
||||
* }
|
||||
* };
|
||||
* }
|
||||
* }</pre>
|
||||
* <p>
|
||||
* 然后使用{@link #tryAcquireRun(long, long, Supplier)} 来获取lock执行逻辑,明确指定 "等待锁时间" 和 "锁的超时时间".
|
||||
*/
|
||||
@Slf4j
|
||||
public class RedisLock {
|
||||
|
||||
public static final String DEFAULT_LOCK_SUFFIX = ":lock";
|
||||
private RedisWrapper redis;
|
||||
|
||||
/**
|
||||
* 默认超时时间(毫秒)
|
||||
*/
|
||||
private static final long DEFAULT_TIME_OUT_MILLIS = 5 * 1000;
|
||||
private static final Random RANDOM = new Random();
|
||||
/**
|
||||
* 锁的超时时间(豪秒),过期删除
|
||||
*/
|
||||
public static final int EXPIRE_IN_MILLIS = 1 * 60 * 1000;
|
||||
|
||||
private String key;
|
||||
// 锁状态标志
|
||||
private boolean locked = false;
|
||||
|
||||
private RuntimeException lockFailedException;
|
||||
|
||||
public interface RedisWrapper {
|
||||
/**
|
||||
* 删除key
|
||||
*
|
||||
* @param key
|
||||
*/
|
||||
void delete(String key);
|
||||
|
||||
/**
|
||||
* 锁定key. 通常使用SetIfAbsent();
|
||||
*
|
||||
* @param key
|
||||
* @param value
|
||||
* @param expireMills
|
||||
* @return
|
||||
*/
|
||||
boolean lock(String key, String value, long expireMills);
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭锁,该方法不建议外部直接使用,<br>
|
||||
* 对于加锁执行的操作,建议直接使用 {@link RedisLock#tryAcquireRun(long, long, Supplier)},会自动执行close操作。
|
||||
*/
|
||||
private void close() {
|
||||
if (this.locked) {
|
||||
this.redis.delete(this.key);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This creates a RedisLock
|
||||
*
|
||||
* @param key key
|
||||
* @param redis 数据源
|
||||
*/
|
||||
public RedisLock(String key, RedisWrapper redis) {
|
||||
this(key, redis, DEFAULT_LOCK_SUFFIX, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* This creates a RedisLock
|
||||
*
|
||||
* @param key key
|
||||
* @param redis 数据源
|
||||
*/
|
||||
public RedisLock(String key, RedisWrapper redis, RuntimeException lockFailedException) {
|
||||
this(key, redis, DEFAULT_LOCK_SUFFIX, lockFailedException);
|
||||
}
|
||||
|
||||
/**
|
||||
* This creates a RedisLock
|
||||
*
|
||||
* @param key key
|
||||
* @param redis 数据源
|
||||
*/
|
||||
public RedisLock(String key, RedisWrapper redis, String suffix, RuntimeException lockFailedException) {
|
||||
this.key = key + Optional.ofNullable(suffix).orElse(DEFAULT_LOCK_SUFFIX);
|
||||
this.redis = redis;
|
||||
this.lockFailedException = Optional.ofNullable(lockFailedException)
|
||||
.orElseGet(() -> new AcquireLockFailException("获取锁失败 " + key));
|
||||
}
|
||||
|
||||
/**
|
||||
* 尝试在timeoutMillis毫秒内获取锁并设置锁的过期时间为expireMillis毫秒,若获取锁成功,则执行supplier的逻辑,并返回supplier执行结果。然后关闭锁<br>
|
||||
* <pre>
|
||||
* 锁的释放,由2方面保证:
|
||||
* 1、supplier方法执行完成后,会主动释放锁。
|
||||
* 2、设置锁的过期时间
|
||||
* </pre>
|
||||
* 如果只是单纯的尝试获取锁并执行,无需等待锁,可以<b>将timeoutMillis参数设置为0。</b>
|
||||
*
|
||||
* @param timeoutMillis 等待获取锁的时间 单位毫秒(会在等待时间内不停自旋尝试获取锁。)如果超过该时间还没成功获取到锁,则抛出获取锁失败的BizException
|
||||
* <b>timeoutMillis=0,则表示只进行一次获取锁的尝试。获取失败,直接抛获取锁失败的异常</b>
|
||||
* @param expireMillis 锁的过期时间,保证锁最长的持有时间。(如果主动释放锁失败,会有该参数保证锁成功释放)
|
||||
* @param supplier 需要执行的方法
|
||||
* @param <T> 返回参数类型
|
||||
* @return
|
||||
*/
|
||||
public <T> T tryAcquireRun(final long timeoutMillis, final long expireMillis, Supplier<T> supplier) {
|
||||
if (!lock(timeoutMillis, expireMillis)) {
|
||||
throw lockFailedException;
|
||||
}
|
||||
try {
|
||||
return supplier.get();
|
||||
} finally {
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 尝试获取锁,并执行supplier.get()方法,返回结果。<br>
|
||||
* 该方法使用了默认的锁等待时间和过期时间:<br>
|
||||
* 等待锁时间={@link #DEFAULT_TIME_OUT_MILLIS 5秒}<br>
|
||||
* 锁过期时间={@link #EXPIRE_IN_MILLIS 1分钟}<br>
|
||||
* 调用该方法,效果等同于 {@link #tryAcquireRun(long, long, Supplier)}
|
||||
* -> tryAcquireRun(DEFAULT_TIME_OUT_MILLIS, EXPIRE_IN_MILLIS, supplier);
|
||||
*
|
||||
* @param supplier
|
||||
* @param <T>
|
||||
* @return
|
||||
*/
|
||||
public <T> T tryAcquireRun(Supplier<T> supplier) {
|
||||
if (!lock()) {
|
||||
throw lockFailedException;
|
||||
}
|
||||
try {
|
||||
return supplier.get();
|
||||
} finally {
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 尝试获取锁,并执行supplier.get()方法,返回结果。<br>
|
||||
* 该方法使用了默认的锁过期时间:<br>
|
||||
* 锁过期时间={@link #EXPIRE_IN_MILLIS 1分钟}<br>
|
||||
* 调用该方法,效果等同于 {@link #tryAcquireRun(long, long, Supplier)}
|
||||
* -> tryAcquireRun(timeoutMillis, EXPIRE_IN_MILLIS, supplier);
|
||||
*
|
||||
* @param supplier
|
||||
* @param <T>
|
||||
* @return
|
||||
*/
|
||||
public <T> T tryAcquireRun(long timeoutMillis, Supplier<T> supplier) {
|
||||
if (!lock(timeoutMillis)) {
|
||||
throw lockFailedException;
|
||||
}
|
||||
try {
|
||||
return supplier.get();
|
||||
} finally {
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 尝试立即获取锁,并执行supplier.get()方法,返回结果。<br>
|
||||
* timeoutMills = 0, expireMillis = 5分钟
|
||||
*/
|
||||
public <T> T acquireImmediatelyRun(Supplier<T> supplier) {
|
||||
if (!lock(0, TimeUnit.MINUTES.toMillis(5))) {
|
||||
throw ResultCode.OPERATE_TOO_FREQUENTLY.toException();
|
||||
}
|
||||
try {
|
||||
return supplier.get();
|
||||
} finally {
|
||||
close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 加锁 应该以: lock(); try { doSomething(); } finally { close(); } 的方式调用<br>
|
||||
* 外部不建议直接使用该方法,建议使用{@link #tryAcquireRun(long, long, Supplier)}明确指定锁的等待和过期时间
|
||||
*
|
||||
* @param timeoutMillis 超时时间(毫秒)
|
||||
* @return 成功或失败标志
|
||||
*/
|
||||
private boolean lock(long timeoutMillis) {
|
||||
return lock(timeoutMillis, EXPIRE_IN_MILLIS);
|
||||
}
|
||||
|
||||
/**
|
||||
* 加锁 应该以: lock(); try { doSomething(); } finally { close(); } 的方式调用<br>
|
||||
* 外部不建议直接使用该方法,建议使用{@link #tryAcquireRun(long, long, Supplier)}明确指定锁的等待和过期时间
|
||||
*
|
||||
* @param timeoutMillis 超时时间(毫秒
|
||||
* @param expireMillis 锁的超时时间(毫秒),过期删除
|
||||
* @return 成功或失败标志
|
||||
*/
|
||||
private boolean lock(final long timeoutMillis, final long expireMillis) {
|
||||
long nano = System.nanoTime();
|
||||
long timeoutNano = TimeUnit.MILLISECONDS.toNanos(timeoutMillis);
|
||||
try {
|
||||
do {
|
||||
boolean ok = redis.lock(key, "true", expireMillis);
|
||||
if (ok) {
|
||||
this.locked = true;
|
||||
return this.locked;
|
||||
}
|
||||
// 短暂休眠,避免出现活锁
|
||||
Thread.sleep(3, RANDOM.nextInt(500));
|
||||
} while ((System.nanoTime() - nano) < timeoutNano);
|
||||
} catch (Exception e) {
|
||||
throw lockFailedException;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 加锁 应该以: lock(); try { doSomething(); } finally { close(); } 的方式调用<br>
|
||||
* 外部不建议直接使用该方法,建议使用{@link #tryAcquireRun(long, long, Supplier)}明确指定锁的等待和过期时间
|
||||
*
|
||||
* @return 成功或失败标志
|
||||
*/
|
||||
private boolean lock() {
|
||||
return lock(DEFAULT_TIME_OUT_MILLIS);
|
||||
}
|
||||
|
||||
/** 当获取锁失败的时候抛出该异常,方便调用方捕获处理 */
|
||||
public static class AcquireLockFailException extends RuntimeException {
|
||||
public AcquireLockFailException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public AcquireLockFailException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
|
||||
public AcquireLockFailException(Throwable throwable) {
|
||||
super(throwable);
|
||||
}
|
||||
|
||||
public AcquireLockFailException(String msg, Throwable throwable) {
|
||||
super(msg, throwable);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,189 @@
|
||||
package cn.axzo.foundation.redis.support.impl;
|
||||
|
||||
import cn.axzo.foundation.redis.support.EventBroadcast;
|
||||
import cn.axzo.foundation.web.support.AppRuntime;
|
||||
import cn.axzo.foundation.web.support.utils.KeyBuilder;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.*;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.data.redis.core.RedisCallback;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
@Slf4j
|
||||
public class RedisEventBroadcastImpl implements EventBroadcast, InitializingBean, DisposableBean {
|
||||
|
||||
private RedisTemplate redisTemplate;
|
||||
|
||||
private AppRuntime appRuntime;
|
||||
|
||||
private String channel;
|
||||
|
||||
/**
|
||||
* 广播监听. 监听channel上的广播. 并根据event dispatch到不同的consumer
|
||||
*/
|
||||
@Setter(AccessLevel.PROTECTED)
|
||||
private BroadcastListener broadcastListener;
|
||||
|
||||
@Builder
|
||||
public RedisEventBroadcastImpl(RedisTemplate redisTemplate, AppRuntime appRuntime) {
|
||||
Objects.requireNonNull(redisTemplate);
|
||||
Objects.requireNonNull(appRuntime);
|
||||
this.redisTemplate = redisTemplate;
|
||||
this.appRuntime = appRuntime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BroadcastQueue build(String queueName, BiConsumer<String, BroadcastEvent> consumer) {
|
||||
Objects.requireNonNull(queueName);
|
||||
Objects.requireNonNull(consumer);
|
||||
Preconditions.checkArgument(queueName.length() <= 32);
|
||||
|
||||
RedisBroadcastQueue queue = RedisBroadcastQueue.builder()
|
||||
.name(queueName)
|
||||
.channel(channel)
|
||||
.redisTemplate(redisTemplate)
|
||||
.appRuntime(appRuntime)
|
||||
.build();
|
||||
broadcastListener.register(queueName, consumer);
|
||||
|
||||
return queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建RedisBroadcastListener, 并启动任务
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
this.channel = KeyBuilder.build(appRuntime, "broadcast", "channel");
|
||||
|
||||
broadcastListener = RedisBroadcastListener.builder()
|
||||
.redisTemplate(redisTemplate)
|
||||
.channel(channel)
|
||||
.build();
|
||||
broadcastListener.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() throws Exception {
|
||||
broadcastListener.stop();
|
||||
}
|
||||
|
||||
@Data
|
||||
@lombok.Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
protected static final class RedisBroadcastQueue implements BroadcastQueue {
|
||||
String name;
|
||||
|
||||
@Getter(AccessLevel.NONE)
|
||||
RedisTemplate redisTemplate;
|
||||
|
||||
@Getter(AccessLevel.NONE)
|
||||
String channel;
|
||||
|
||||
@Getter(AccessLevel.NONE)
|
||||
AppRuntime appRuntime;
|
||||
|
||||
@Override
|
||||
public boolean broadcast(JSONObject data) {
|
||||
return (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> {
|
||||
connection.publish(channel.getBytes(),
|
||||
BroadcastEvent.builder()
|
||||
.name(name)
|
||||
.data(data)
|
||||
.senderRuntime(appRuntime.toJson())
|
||||
.build()
|
||||
.toJSONString().getBytes(Charsets.UTF_8));
|
||||
return true;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
protected static final class RedisBroadcastListener implements BroadcastListener {
|
||||
private RedisTemplate redisTemplate;
|
||||
private String channel;
|
||||
|
||||
private final Map<String, BiConsumer<String, BroadcastEvent>> broadcastConsumer = Maps.newConcurrentMap();
|
||||
|
||||
@lombok.Builder
|
||||
public RedisBroadcastListener(RedisTemplate redisTemplate, String channel) {
|
||||
this.redisTemplate = redisTemplate;
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean start() {
|
||||
try {
|
||||
redisTemplate.getConnectionFactory().getConnection()
|
||||
.subscribe((message, pattern) -> {
|
||||
BroadcastEvent broadcastEvent = JSONObject.parseObject(new String(message.getBody(), Charsets.UTF_8))
|
||||
.toJavaObject(BroadcastEvent.class);
|
||||
onEvent(broadcastEvent);
|
||||
}, channel.getBytes());
|
||||
} catch (Exception e) {
|
||||
log.error("====== start broadcast listener error =====", e);
|
||||
return false;
|
||||
}
|
||||
log.info("====== start broadcast listener =====");
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean register(String queueName, BiConsumer<String, BroadcastEvent> consumer) {
|
||||
Preconditions.checkArgument(!broadcastConsumer.containsKey(queueName), "duplicate broadcast queue");
|
||||
broadcastConsumer.put(queueName, consumer);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean stop() {
|
||||
//doNothing, 如果定义了线程池可以在这里销毁
|
||||
log.info("====== stop broadcast listener =====");
|
||||
return true;
|
||||
}
|
||||
|
||||
private void onEvent(BroadcastEvent event) {
|
||||
BiConsumer<String, BroadcastEvent> consumer = broadcastConsumer.get(event.getName());
|
||||
if (consumer == null) {
|
||||
log.error("event is ready, but no consumer found, event = {}", event.toJSONString());
|
||||
return;
|
||||
}
|
||||
try {
|
||||
consumer.accept(event.getName(), event);
|
||||
} catch (Exception ex) {
|
||||
log.error("consume broadcast error, event = {}", event.toJSONString());
|
||||
//ignore 忽略业务异常
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 广播监听. 提供开始监听, 结束监听, 注册queue以及对应的BiConsumer
|
||||
*/
|
||||
interface BroadcastListener {
|
||||
boolean start();
|
||||
|
||||
/**
|
||||
* 注册queue对应的consumer
|
||||
*
|
||||
* @param queueName
|
||||
* @param consumer
|
||||
* @return
|
||||
*/
|
||||
boolean register(String queueName, BiConsumer<String, BroadcastEvent> consumer);
|
||||
|
||||
boolean stop();
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,178 @@
|
||||
package cn.axzo.foundation.redis.support.impl;
|
||||
|
||||
import cn.axzo.foundation.enums.AppEnvEnum;
|
||||
import cn.axzo.foundation.redis.support.EventBroadcast;
|
||||
import cn.axzo.foundation.redis.support.LocalCacheCoordinate;
|
||||
import cn.axzo.foundation.web.support.AppRuntime;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Multimap;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.Data;
|
||||
import lombok.Setter;
|
||||
import lombok.Singular;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
|
||||
import org.springframework.transaction.support.TransactionSynchronizationManager;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
@Slf4j
|
||||
public class RedisLocalCacheCoordinate implements LocalCacheCoordinate, InitializingBean {
|
||||
@Setter(AccessLevel.PROTECTED)
|
||||
private AfterCommitExecutor executor;
|
||||
|
||||
@Setter
|
||||
private EventBroadcast EventBroadcast;
|
||||
private EventBroadcast.BroadcastQueue broadcastQueue;
|
||||
|
||||
@Singular
|
||||
private Multimap<String, Consumer<CacheDirtiedEvent>> cacheDirtyHandlers = ArrayListMultimap.create();
|
||||
private List<CacheDirtiedEventInterceptor> interceptors = Lists.newArrayList();
|
||||
|
||||
@Override
|
||||
public void notifyCacheDirty(CacheDirtiedEvent event) {
|
||||
executor.execute(() -> {
|
||||
broadcastQueue.broadcast(JSON.parseObject(JSONObject.toJSONString(event)));
|
||||
log.info("notify cache dirty, event={}", event);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerCacheDirtyListener(String key, Consumer<CacheDirtiedEvent> consumer) {
|
||||
cacheDirtyHandlers.put(key, consumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerCacheDirtyInterceptor(CacheDirtiedEventInterceptor interceptor) {
|
||||
interceptors.add(interceptor);
|
||||
}
|
||||
|
||||
void onCacheDirtied(String queueName, EventBroadcast.BroadcastEvent e) {
|
||||
CacheDirtiedEvent cacheDirtiedEvent = CacheDirtiedEvent.builder().key(e.getData().getString("key"))
|
||||
.data(e.getData().getJSONObject("data"))
|
||||
.build();
|
||||
interceptors.forEach(h -> {
|
||||
try {
|
||||
if (h.getFilter().test(cacheDirtiedEvent)) {
|
||||
h.getConsumer().accept(cacheDirtiedEvent);
|
||||
log.info("interceptor handled cache dirty event={}, interceptor={}", cacheDirtiedEvent, h);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
log.error("========interceptor handle cacheDirtiedEvent {}", cacheDirtiedEvent, ex);
|
||||
}
|
||||
});
|
||||
|
||||
cacheDirtyHandlers.get(cacheDirtiedEvent.getKey()).forEach(h -> {
|
||||
try {
|
||||
h.accept(cacheDirtiedEvent);
|
||||
log.info("handler handled cache dirty event={}, handler={}", cacheDirtiedEvent, h);
|
||||
} catch (Exception ex) {
|
||||
log.error("========handler handle cacheDirtiedEvent {}", cacheDirtiedEvent, ex);
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
broadcastQueue = EventBroadcast.build("Local_cache-coordinate", this::onCacheDirtied);
|
||||
}
|
||||
|
||||
public static RedisLocalCacheCoordinate.Builder builder() {
|
||||
return new RedisLocalCacheCoordinate.Builder();
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class Builder {
|
||||
EventBroadcast EventBroadcast;
|
||||
AppRuntime appRuntime;
|
||||
|
||||
public RedisLocalCacheCoordinate.Builder EventBroadcast(EventBroadcast EventBroadcast) {
|
||||
this.EventBroadcast = EventBroadcast;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RedisLocalCacheCoordinate.Builder appRuntime(AppRuntime appRuntime) {
|
||||
this.appRuntime = appRuntime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public LocalCacheCoordinate build() {
|
||||
if (appRuntime.getEnv() == AppEnvEnum.unittest) {
|
||||
return new LocalCacheCoordinate() {
|
||||
private Multimap<String, Consumer<CacheDirtiedEvent>> cacheDirtyHandlers = ArrayListMultimap.create();
|
||||
private List<CacheDirtiedEventInterceptor> interceptors = Lists.newArrayList();
|
||||
|
||||
@Override
|
||||
public void notifyCacheDirty(CacheDirtiedEvent event) {
|
||||
cacheDirtyHandlers.get(event.getKey())
|
||||
.forEach(h -> h.accept(event));
|
||||
|
||||
interceptors.stream()
|
||||
.filter(i -> i.getFilter().test(event))
|
||||
.forEach(i -> i.getConsumer().accept(event));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerCacheDirtyListener(String key, Consumer<CacheDirtiedEvent> consumer) {
|
||||
cacheDirtyHandlers.put(key, consumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerCacheDirtyInterceptor(CacheDirtiedEventInterceptor interceptor) {
|
||||
interceptors.add(interceptor);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Objects.requireNonNull(EventBroadcast);
|
||||
RedisLocalCacheCoordinate client = new RedisLocalCacheCoordinate();
|
||||
client.setEventBroadcast(EventBroadcast);
|
||||
client.setExecutor(new AfterCommitExecutor());
|
||||
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* stolen from http://azagorneanu.blogspot.jp/2013/06/transaction-synchronization-callbacks.html
|
||||
* 保证在交易结束后被调用.
|
||||
*/
|
||||
protected static class AfterCommitExecutor extends TransactionSynchronizationAdapter {
|
||||
//大部分情况只有会清除一个缓存. 因此数组初始化为1
|
||||
private final ThreadLocal<List<Runnable>> contexts = ThreadLocal
|
||||
.withInitial((Supplier<List<Runnable>>) () -> Lists.newArrayListWithCapacity(1));
|
||||
|
||||
public void execute(Runnable runnable) {
|
||||
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
|
||||
runnable.run();
|
||||
return;
|
||||
}
|
||||
contexts.get().add(runnable);
|
||||
TransactionSynchronizationManager.registerSynchronization(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterCommit() {
|
||||
contexts.get().forEach(e -> {
|
||||
try {
|
||||
e.run();
|
||||
} catch (Exception ex) {
|
||||
log.error("Failed to execute runnable = {} ", e, ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterCompletion(int status) {
|
||||
contexts.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user