diff --git a/common-lib/src/main/java/cn/axzo/foundation/result/ResultCode.java b/common-lib/src/main/java/cn/axzo/foundation/result/ResultCode.java index 5f41de3..0267fbc 100644 --- a/common-lib/src/main/java/cn/axzo/foundation/result/ResultCode.java +++ b/common-lib/src/main/java/cn/axzo/foundation/result/ResultCode.java @@ -11,7 +11,9 @@ public enum ResultCode implements IResultCode { INVALID_PARAMS("002", "请求参数格式错误", 400), NETWORK_FAILURE("003", "内部网络错误", 500), PROCESS_TIMEOUT("004", "内部处理超时", 500), - APP_CONFIG_ERROR("005", "服务配置错误", 500); + APP_CONFIG_ERROR("005", "服务配置错误", 500), + OPERATE_TOO_FREQUENTLY("006", "操作过于频繁", 500), + ; final private String code; diff --git a/pom.xml b/pom.xml index afc7af4..9df7ec1 100644 --- a/pom.xml +++ b/pom.xml @@ -32,6 +32,7 @@ web-support-lib gateway-support-lib event-support-lib + redis-support-lib diff --git a/redis-support-lib/pom.xml b/redis-support-lib/pom.xml new file mode 100644 index 0000000..0a63193 --- /dev/null +++ b/redis-support-lib/pom.xml @@ -0,0 +1,26 @@ + + + 4.0.0 + + cn.axzo.foundation + axzo-lib-box + 2.0.0-SNAPSHOT + + + redis-support-lib + + + cn.axzo.foundation + web-support-lib + 2.0.0-SNAPSHOT + + + + org.springframework.boot + spring-boot-starter-data-redis + + + + \ No newline at end of file diff --git a/redis-support-lib/src/main/java/cn/axzo/foundation/redis/support/EventBroadcast.java b/redis-support-lib/src/main/java/cn/axzo/foundation/redis/support/EventBroadcast.java new file mode 100644 index 0000000..b706636 --- /dev/null +++ b/redis-support-lib/src/main/java/cn/axzo/foundation/redis/support/EventBroadcast.java @@ -0,0 +1,75 @@ +package cn.axzo.foundation.redis.support; + +import com.alibaba.fastjson.JSONObject; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.function.BiConsumer; + +/** + * 提供简单的广播Client. 通过build获得的BroadcastQueue发送广播. 所有节点都会收到广播并回调consumer + * + */ +public interface EventBroadcast { + + + /** + * 通过queueName, 回调consumer来构建广播队列 + * + * @param queueName + * @param consumer + * @return + */ + BroadcastQueue build(String queueName, BiConsumer consumer); + + /** + * 广播队列. 向队列中广播消息. 所有的节点(包含自身)都会收到广播消息 + */ + interface BroadcastQueue { + /** + * 获得队列名称, 与注册时的queueName相同 + * + * @return + */ + String getName(); + + /** + * 广播, data为需要广播的内容 + * + * @param data + * @return + */ + boolean broadcast(JSONObject data); + } + + /** + * 广播事件 + */ + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + class BroadcastEvent { + String name; + JSONObject data; + /** + * 发起广播的节点信息 + */ + JSONObject senderRuntime; + + public String toJSONString() { + return new JSONObject() + .fluentPut("name", name) + .fluentPut("data", data) + .fluentPut("senderRuntime", senderRuntime) + .toJSONString(); + } + } +} diff --git a/redis-support-lib/src/main/java/cn/axzo/foundation/redis/support/LocalCacheCoordinate.java b/redis-support-lib/src/main/java/cn/axzo/foundation/redis/support/LocalCacheCoordinate.java new file mode 100644 index 0000000..a9659d9 --- /dev/null +++ b/redis-support-lib/src/main/java/cn/axzo/foundation/redis/support/LocalCacheCoordinate.java @@ -0,0 +1,49 @@ +package cn.axzo.foundation.redis.support; + +import com.alibaba.fastjson.JSONObject; +import lombok.*; + +import java.util.function.Consumer; +import java.util.function.Predicate; + +/** + * 简单的本地缓存协调客户端. 主要的目的是通过事件在不同节点间处理缓存dirty的场景. + * 提供了2中机制来处理缓存dirty的场景 + * registerCacheDirtyListener 精确处理key匹配 + * registerCacheDirtyInterceptor 统一的事件匹配. 可以通过检查 CacheDirtiedEvent.data中的数据是否包含特定的json-path来处理缓存. + */ +public interface LocalCacheCoordinate { + + + void notifyCacheDirty(CacheDirtiedEvent event); + + /** + * @param key {@link CacheDirtiedEvent#key} 中定义数据 + * @param consumer + */ + void registerCacheDirtyListener(String key, Consumer consumer); + + void registerCacheDirtyInterceptor(CacheDirtiedEventInterceptor interceptor); + + @NoArgsConstructor + @AllArgsConstructor + @Data + @Builder + final class CacheDirtiedEvent { + @NonNull + String key; + @NonNull + JSONObject data; + } + + @NoArgsConstructor + @AllArgsConstructor + @Data + @Builder + final class CacheDirtiedEventInterceptor { + @NonNull + Predicate filter; + @NonNull + Consumer consumer; + } +} diff --git a/redis-support-lib/src/main/java/cn/axzo/foundation/redis/support/RedisLock.java b/redis-support-lib/src/main/java/cn/axzo/foundation/redis/support/RedisLock.java new file mode 100644 index 0000000..38dd164 --- /dev/null +++ b/redis-support-lib/src/main/java/cn/axzo/foundation/redis/support/RedisLock.java @@ -0,0 +1,270 @@ +package cn.axzo.foundation.redis.support; + +import cn.axzo.foundation.result.ResultCode; +import lombok.extern.slf4j.Slf4j; + +import java.util.Optional; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * 使用redis实现的分布式锁. 为了不依赖redis, 这里需要调用者实现{@link RedisWrapper}. 如下: + *
{@code
+ *     @Bean
+ *     RedisLock.RedisWrapper redisWrapper(RedisTemplate redisTemplate) {
+ *         return new RedisLock.RedisWrapper() {
+ *             @Override
+ *             public void delete(String key) {
+ *                 redisTemplate.delete(key);
+ *             }
+ *
+ *             @Override
+ *             public boolean lock(String key, String value, long expireMills) {
+ *                 return redisTemplate.opsForValue().setIfAbsent(key, value, expireMills, TimeUnit.MILLISECONDS);
+ *             }
+ *         };
+ *     }
+ * }
+ *

+ * 然后使用{@link #tryAcquireRun(long, long, Supplier)} 来获取lock执行逻辑,明确指定 "等待锁时间" 和 "锁的超时时间". + */ +@Slf4j +public class RedisLock { + + public static final String DEFAULT_LOCK_SUFFIX = ":lock"; + private RedisWrapper redis; + + /** + * 默认超时时间(毫秒) + */ + private static final long DEFAULT_TIME_OUT_MILLIS = 5 * 1000; + private static final Random RANDOM = new Random(); + /** + * 锁的超时时间(豪秒),过期删除 + */ + public static final int EXPIRE_IN_MILLIS = 1 * 60 * 1000; + + private String key; + // 锁状态标志 + private boolean locked = false; + + private RuntimeException lockFailedException; + + public interface RedisWrapper { + /** + * 删除key + * + * @param key + */ + void delete(String key); + + /** + * 锁定key. 通常使用SetIfAbsent(); + * + * @param key + * @param value + * @param expireMills + * @return + */ + boolean lock(String key, String value, long expireMills); + } + + /** + * 关闭锁,该方法不建议外部直接使用,
+ * 对于加锁执行的操作,建议直接使用 {@link RedisLock#tryAcquireRun(long, long, Supplier)},会自动执行close操作。 + */ + private void close() { + if (this.locked) { + this.redis.delete(this.key); + } + } + + /** + * This creates a RedisLock + * + * @param key key + * @param redis 数据源 + */ + public RedisLock(String key, RedisWrapper redis) { + this(key, redis, DEFAULT_LOCK_SUFFIX, null); + } + + /** + * This creates a RedisLock + * + * @param key key + * @param redis 数据源 + */ + public RedisLock(String key, RedisWrapper redis, RuntimeException lockFailedException) { + this(key, redis, DEFAULT_LOCK_SUFFIX, lockFailedException); + } + + /** + * This creates a RedisLock + * + * @param key key + * @param redis 数据源 + */ + public RedisLock(String key, RedisWrapper redis, String suffix, RuntimeException lockFailedException) { + this.key = key + Optional.ofNullable(suffix).orElse(DEFAULT_LOCK_SUFFIX); + this.redis = redis; + this.lockFailedException = Optional.ofNullable(lockFailedException) + .orElseGet(() -> new AcquireLockFailException("获取锁失败 " + key)); + } + + /** + * 尝试在timeoutMillis毫秒内获取锁并设置锁的过期时间为expireMillis毫秒,若获取锁成功,则执行supplier的逻辑,并返回supplier执行结果。然后关闭锁
+ *

+     * 锁的释放,由2方面保证:
+     * 1、supplier方法执行完成后,会主动释放锁。
+     * 2、设置锁的过期时间
+     * 
+ * 如果只是单纯的尝试获取锁并执行,无需等待锁,可以将timeoutMillis参数设置为0。 + * + * @param timeoutMillis 等待获取锁的时间 单位毫秒(会在等待时间内不停自旋尝试获取锁。)如果超过该时间还没成功获取到锁,则抛出获取锁失败的BizException + * timeoutMillis=0,则表示只进行一次获取锁的尝试。获取失败,直接抛获取锁失败的异常 + * @param expireMillis 锁的过期时间,保证锁最长的持有时间。(如果主动释放锁失败,会有该参数保证锁成功释放) + * @param supplier 需要执行的方法 + * @param 返回参数类型 + * @return + */ + public T tryAcquireRun(final long timeoutMillis, final long expireMillis, Supplier supplier) { + if (!lock(timeoutMillis, expireMillis)) { + throw lockFailedException; + } + try { + return supplier.get(); + } finally { + close(); + } + } + + /** + * 尝试获取锁,并执行supplier.get()方法,返回结果。
+ * 该方法使用了默认的锁等待时间和过期时间:
+ * 等待锁时间={@link #DEFAULT_TIME_OUT_MILLIS 5秒}
+ * 锁过期时间={@link #EXPIRE_IN_MILLIS 1分钟}
+ * 调用该方法,效果等同于 {@link #tryAcquireRun(long, long, Supplier)} + * -> tryAcquireRun(DEFAULT_TIME_OUT_MILLIS, EXPIRE_IN_MILLIS, supplier); + * + * @param supplier + * @param + * @return + */ + public T tryAcquireRun(Supplier supplier) { + if (!lock()) { + throw lockFailedException; + } + try { + return supplier.get(); + } finally { + close(); + } + } + + /** + * 尝试获取锁,并执行supplier.get()方法,返回结果。
+ * 该方法使用了默认的锁过期时间:
+ * 锁过期时间={@link #EXPIRE_IN_MILLIS 1分钟}
+ * 调用该方法,效果等同于 {@link #tryAcquireRun(long, long, Supplier)} + * -> tryAcquireRun(timeoutMillis, EXPIRE_IN_MILLIS, supplier); + * + * @param supplier + * @param + * @return + */ + public T tryAcquireRun(long timeoutMillis, Supplier supplier) { + if (!lock(timeoutMillis)) { + throw lockFailedException; + } + try { + return supplier.get(); + } finally { + close(); + } + } + + /** + * 尝试立即获取锁,并执行supplier.get()方法,返回结果。
+ * timeoutMills = 0, expireMillis = 5分钟 + */ + public T acquireImmediatelyRun(Supplier supplier) { + if (!lock(0, TimeUnit.MINUTES.toMillis(5))) { + throw ResultCode.OPERATE_TOO_FREQUENTLY.toException(); + } + try { + return supplier.get(); + } finally { + close(); + } + } + + /** + * 加锁 应该以: lock(); try { doSomething(); } finally { close(); } 的方式调用
+ * 外部不建议直接使用该方法,建议使用{@link #tryAcquireRun(long, long, Supplier)}明确指定锁的等待和过期时间 + * + * @param timeoutMillis 超时时间(毫秒) + * @return 成功或失败标志 + */ + private boolean lock(long timeoutMillis) { + return lock(timeoutMillis, EXPIRE_IN_MILLIS); + } + + /** + * 加锁 应该以: lock(); try { doSomething(); } finally { close(); } 的方式调用
+ * 外部不建议直接使用该方法,建议使用{@link #tryAcquireRun(long, long, Supplier)}明确指定锁的等待和过期时间 + * + * @param timeoutMillis 超时时间(毫秒 + * @param expireMillis 锁的超时时间(毫秒),过期删除 + * @return 成功或失败标志 + */ + private boolean lock(final long timeoutMillis, final long expireMillis) { + long nano = System.nanoTime(); + long timeoutNano = TimeUnit.MILLISECONDS.toNanos(timeoutMillis); + try { + do { + boolean ok = redis.lock(key, "true", expireMillis); + if (ok) { + this.locked = true; + return this.locked; + } + // 短暂休眠,避免出现活锁 + Thread.sleep(3, RANDOM.nextInt(500)); + } while ((System.nanoTime() - nano) < timeoutNano); + } catch (Exception e) { + throw lockFailedException; + } + return false; + } + + /** + * 加锁 应该以: lock(); try { doSomething(); } finally { close(); } 的方式调用
+ * 外部不建议直接使用该方法,建议使用{@link #tryAcquireRun(long, long, Supplier)}明确指定锁的等待和过期时间 + * + * @return 成功或失败标志 + */ + private boolean lock() { + return lock(DEFAULT_TIME_OUT_MILLIS); + } + + /** 当获取锁失败的时候抛出该异常,方便调用方捕获处理 */ + public static class AcquireLockFailException extends RuntimeException { + public AcquireLockFailException() { + super(); + } + + public AcquireLockFailException(String msg) { + super(msg); + } + + public AcquireLockFailException(Throwable throwable) { + super(throwable); + } + + public AcquireLockFailException(String msg, Throwable throwable) { + super(msg, throwable); + } + } + +} diff --git a/redis-support-lib/src/main/java/cn/axzo/foundation/redis/support/impl/RedisEventBroadcastImpl.java b/redis-support-lib/src/main/java/cn/axzo/foundation/redis/support/impl/RedisEventBroadcastImpl.java new file mode 100644 index 0000000..6c9470b --- /dev/null +++ b/redis-support-lib/src/main/java/cn/axzo/foundation/redis/support/impl/RedisEventBroadcastImpl.java @@ -0,0 +1,189 @@ +package cn.axzo.foundation.redis.support.impl; + +import cn.axzo.foundation.redis.support.EventBroadcast; +import cn.axzo.foundation.web.support.AppRuntime; +import cn.axzo.foundation.web.support.utils.KeyBuilder; +import com.alibaba.fastjson.JSONObject; +import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import lombok.*; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.data.redis.core.RedisCallback; +import org.springframework.data.redis.core.RedisTemplate; + +import java.util.Map; +import java.util.Objects; +import java.util.function.BiConsumer; + +@Slf4j +public class RedisEventBroadcastImpl implements EventBroadcast, InitializingBean, DisposableBean { + + private RedisTemplate redisTemplate; + + private AppRuntime appRuntime; + + private String channel; + + /** + * 广播监听. 监听channel上的广播. 并根据event dispatch到不同的consumer + */ + @Setter(AccessLevel.PROTECTED) + private BroadcastListener broadcastListener; + + @Builder + public RedisEventBroadcastImpl(RedisTemplate redisTemplate, AppRuntime appRuntime) { + Objects.requireNonNull(redisTemplate); + Objects.requireNonNull(appRuntime); + this.redisTemplate = redisTemplate; + this.appRuntime = appRuntime; + } + + @Override + public BroadcastQueue build(String queueName, BiConsumer consumer) { + Objects.requireNonNull(queueName); + Objects.requireNonNull(consumer); + Preconditions.checkArgument(queueName.length() <= 32); + + RedisBroadcastQueue queue = RedisBroadcastQueue.builder() + .name(queueName) + .channel(channel) + .redisTemplate(redisTemplate) + .appRuntime(appRuntime) + .build(); + broadcastListener.register(queueName, consumer); + + return queue; + } + + /** + * 构建RedisBroadcastListener, 并启动任务 + * + * @throws Exception + */ + @Override + public void afterPropertiesSet() throws Exception { + this.channel = KeyBuilder.build(appRuntime, "broadcast", "channel"); + + broadcastListener = RedisBroadcastListener.builder() + .redisTemplate(redisTemplate) + .channel(channel) + .build(); + broadcastListener.start(); + } + + @Override + public void destroy() throws Exception { + broadcastListener.stop(); + } + + @Data + @lombok.Builder + @NoArgsConstructor + @AllArgsConstructor + protected static final class RedisBroadcastQueue implements BroadcastQueue { + String name; + + @Getter(AccessLevel.NONE) + RedisTemplate redisTemplate; + + @Getter(AccessLevel.NONE) + String channel; + + @Getter(AccessLevel.NONE) + AppRuntime appRuntime; + + @Override + public boolean broadcast(JSONObject data) { + return (Boolean) redisTemplate.execute((RedisCallback) connection -> { + connection.publish(channel.getBytes(), + BroadcastEvent.builder() + .name(name) + .data(data) + .senderRuntime(appRuntime.toJson()) + .build() + .toJSONString().getBytes(Charsets.UTF_8)); + return true; + }); + } + } + + protected static final class RedisBroadcastListener implements BroadcastListener { + private RedisTemplate redisTemplate; + private String channel; + + private final Map> broadcastConsumer = Maps.newConcurrentMap(); + + @lombok.Builder + public RedisBroadcastListener(RedisTemplate redisTemplate, String channel) { + this.redisTemplate = redisTemplate; + this.channel = channel; + } + + @Override + public boolean start() { + try { + redisTemplate.getConnectionFactory().getConnection() + .subscribe((message, pattern) -> { + BroadcastEvent broadcastEvent = JSONObject.parseObject(new String(message.getBody(), Charsets.UTF_8)) + .toJavaObject(BroadcastEvent.class); + onEvent(broadcastEvent); + }, channel.getBytes()); + } catch (Exception e) { + log.error("====== start broadcast listener error =====", e); + return false; + } + log.info("====== start broadcast listener ====="); + return true; + } + + @Override + public boolean register(String queueName, BiConsumer consumer) { + Preconditions.checkArgument(!broadcastConsumer.containsKey(queueName), "duplicate broadcast queue"); + broadcastConsumer.put(queueName, consumer); + return true; + } + + @Override + public boolean stop() { + //doNothing, 如果定义了线程池可以在这里销毁 + log.info("====== stop broadcast listener ====="); + return true; + } + + private void onEvent(BroadcastEvent event) { + BiConsumer consumer = broadcastConsumer.get(event.getName()); + if (consumer == null) { + log.error("event is ready, but no consumer found, event = {}", event.toJSONString()); + return; + } + try { + consumer.accept(event.getName(), event); + } catch (Exception ex) { + log.error("consume broadcast error, event = {}", event.toJSONString()); + //ignore 忽略业务异常 + } + } + } + + /** + * 广播监听. 提供开始监听, 结束监听, 注册queue以及对应的BiConsumer + */ + interface BroadcastListener { + boolean start(); + + /** + * 注册queue对应的consumer + * + * @param queueName + * @param consumer + * @return + */ + boolean register(String queueName, BiConsumer consumer); + + boolean stop(); + } +} + diff --git a/redis-support-lib/src/main/java/cn/axzo/foundation/redis/support/impl/RedisLocalCacheCoordinate.java b/redis-support-lib/src/main/java/cn/axzo/foundation/redis/support/impl/RedisLocalCacheCoordinate.java new file mode 100644 index 0000000..4e3a41b --- /dev/null +++ b/redis-support-lib/src/main/java/cn/axzo/foundation/redis/support/impl/RedisLocalCacheCoordinate.java @@ -0,0 +1,178 @@ +package cn.axzo.foundation.redis.support.impl; + +import cn.axzo.foundation.enums.AppEnvEnum; +import cn.axzo.foundation.redis.support.EventBroadcast; +import cn.axzo.foundation.redis.support.LocalCacheCoordinate; +import cn.axzo.foundation.web.support.AppRuntime; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import lombok.AccessLevel; +import lombok.Data; +import lombok.Setter; +import lombok.Singular; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.transaction.support.TransactionSynchronizationAdapter; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Supplier; + +@Slf4j +public class RedisLocalCacheCoordinate implements LocalCacheCoordinate, InitializingBean { + @Setter(AccessLevel.PROTECTED) + private AfterCommitExecutor executor; + + @Setter + private EventBroadcast EventBroadcast; + private EventBroadcast.BroadcastQueue broadcastQueue; + + @Singular + private Multimap> cacheDirtyHandlers = ArrayListMultimap.create(); + private List interceptors = Lists.newArrayList(); + + @Override + public void notifyCacheDirty(CacheDirtiedEvent event) { + executor.execute(() -> { + broadcastQueue.broadcast(JSON.parseObject(JSONObject.toJSONString(event))); + log.info("notify cache dirty, event={}", event); + }); + } + + @Override + public void registerCacheDirtyListener(String key, Consumer consumer) { + cacheDirtyHandlers.put(key, consumer); + } + + @Override + public void registerCacheDirtyInterceptor(CacheDirtiedEventInterceptor interceptor) { + interceptors.add(interceptor); + } + + void onCacheDirtied(String queueName, EventBroadcast.BroadcastEvent e) { + CacheDirtiedEvent cacheDirtiedEvent = CacheDirtiedEvent.builder().key(e.getData().getString("key")) + .data(e.getData().getJSONObject("data")) + .build(); + interceptors.forEach(h -> { + try { + if (h.getFilter().test(cacheDirtiedEvent)) { + h.getConsumer().accept(cacheDirtiedEvent); + log.info("interceptor handled cache dirty event={}, interceptor={}", cacheDirtiedEvent, h); + } + } catch (Exception ex) { + log.error("========interceptor handle cacheDirtiedEvent {}", cacheDirtiedEvent, ex); + } + }); + + cacheDirtyHandlers.get(cacheDirtiedEvent.getKey()).forEach(h -> { + try { + h.accept(cacheDirtiedEvent); + log.info("handler handled cache dirty event={}, handler={}", cacheDirtiedEvent, h); + } catch (Exception ex) { + log.error("========handler handle cacheDirtiedEvent {}", cacheDirtiedEvent, ex); + } + }); + + } + + @Override + public void afterPropertiesSet() throws Exception { + broadcastQueue = EventBroadcast.build("Local_cache-coordinate", this::onCacheDirtied); + } + + public static RedisLocalCacheCoordinate.Builder builder() { + return new RedisLocalCacheCoordinate.Builder(); + } + + @Data + public static class Builder { + EventBroadcast EventBroadcast; + AppRuntime appRuntime; + + public RedisLocalCacheCoordinate.Builder EventBroadcast(EventBroadcast EventBroadcast) { + this.EventBroadcast = EventBroadcast; + return this; + } + + public RedisLocalCacheCoordinate.Builder appRuntime(AppRuntime appRuntime) { + this.appRuntime = appRuntime; + return this; + } + + public LocalCacheCoordinate build() { + if (appRuntime.getEnv() == AppEnvEnum.unittest) { + return new LocalCacheCoordinate() { + private Multimap> cacheDirtyHandlers = ArrayListMultimap.create(); + private List interceptors = Lists.newArrayList(); + + @Override + public void notifyCacheDirty(CacheDirtiedEvent event) { + cacheDirtyHandlers.get(event.getKey()) + .forEach(h -> h.accept(event)); + + interceptors.stream() + .filter(i -> i.getFilter().test(event)) + .forEach(i -> i.getConsumer().accept(event)); + } + + @Override + public void registerCacheDirtyListener(String key, Consumer consumer) { + cacheDirtyHandlers.put(key, consumer); + } + + @Override + public void registerCacheDirtyInterceptor(CacheDirtiedEventInterceptor interceptor) { + interceptors.add(interceptor); + } + }; + } + + Objects.requireNonNull(EventBroadcast); + RedisLocalCacheCoordinate client = new RedisLocalCacheCoordinate(); + client.setEventBroadcast(EventBroadcast); + client.setExecutor(new AfterCommitExecutor()); + + return client; + } + } + + /** + * stolen from http://azagorneanu.blogspot.jp/2013/06/transaction-synchronization-callbacks.html + * 保证在交易结束后被调用. + */ + protected static class AfterCommitExecutor extends TransactionSynchronizationAdapter { + //大部分情况只有会清除一个缓存. 因此数组初始化为1 + private final ThreadLocal> contexts = ThreadLocal + .withInitial((Supplier>) () -> Lists.newArrayListWithCapacity(1)); + + public void execute(Runnable runnable) { + if (!TransactionSynchronizationManager.isSynchronizationActive()) { + runnable.run(); + return; + } + contexts.get().add(runnable); + TransactionSynchronizationManager.registerSynchronization(this); + } + + @Override + public void afterCommit() { + contexts.get().forEach(e -> { + try { + e.run(); + } catch (Exception ex) { + log.error("Failed to execute runnable = {} ", e, ex); + } + }); + } + + @Override + public void afterCompletion(int status) { + contexts.remove(); + } + } +}