From feaaba7bc8f5262e93e6b3423c9da3aba27e6676 Mon Sep 17 00:00:00 2001 From: zhaoyong_sh Date: Mon, 12 Apr 2021 21:57:03 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=BC=82=E6=AD=A5=E5=B7=A5?= =?UTF-8?q?=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- asyncTool/pom.xml | 57 ++ .../async/callback/DefaultCallback.java | 20 + .../async/callback/DefaultGroupCallback.java | 21 + .../jd/platform/async/callback/ICallback.java | 25 + .../async/callback/IGroupCallback.java | 20 + .../async/callback/ITimeoutWorker.java | 20 + .../jd/platform/async/callback/IWorker.java | 30 + .../async/exception/SkippedException.java | 17 + .../com/jd/platform/async/executor/Async.java | 126 +++ .../async/executor/timer/SystemClock.java | 51 + .../platform/async/worker/DependWrapper.java | 61 ++ .../jd/platform/async/worker/ResultState.java | 14 + .../jd/platform/async/worker/WorkResult.java | 65 ++ .../platform/async/wrapper/WorkerWrapper.java | 615 +++++++++++++ asyncTool/src/test/java/depend/DeWorker.java | 42 + asyncTool/src/test/java/depend/DeWorker1.java | 43 + asyncTool/src/test/java/depend/DeWorker2.java | 43 + .../src/test/java/depend/LambdaTest.java | 74 ++ asyncTool/src/test/java/depend/Test.java | 55 ++ asyncTool/src/test/java/depend/User.java | 29 + .../src/test/java/dependnew/DeWorker.java | 42 + .../src/test/java/dependnew/DeWorker1.java | 45 + .../src/test/java/dependnew/DeWorker2.java | 45 + asyncTool/src/test/java/dependnew/Test.java | 49 + asyncTool/src/test/java/dependnew/User.java | 29 + .../test/java/parallel/ParTimeoutWorker.java | 49 + .../src/test/java/parallel/ParWorker.java | 49 + .../src/test/java/parallel/ParWorker1.java | 53 ++ .../src/test/java/parallel/ParWorker2.java | 54 ++ .../src/test/java/parallel/ParWorker3.java | 53 ++ .../src/test/java/parallel/ParWorker4.java | 49 + asyncTool/src/test/java/parallel/TestPar.java | 868 ++++++++++++++++++ .../src/test/java/seq/SeqTimeoutWorker.java | 48 + asyncTool/src/test/java/seq/SeqWorker.java | 49 + asyncTool/src/test/java/seq/SeqWorker1.java | 48 + asyncTool/src/test/java/seq/SeqWorker2.java | 48 + .../src/test/java/seq/TestSequential.java | 71 ++ .../test/java/seq/TestSequentialTimeout.java | 67 ++ common-common/pom.xml | 17 + pom.xml | 1 + smart-datasource/pom.xml | 17 + 41 files changed, 3179 insertions(+) create mode 100644 asyncTool/pom.xml create mode 100755 asyncTool/src/main/java/com/jd/platform/async/callback/DefaultCallback.java create mode 100644 asyncTool/src/main/java/com/jd/platform/async/callback/DefaultGroupCallback.java create mode 100755 asyncTool/src/main/java/com/jd/platform/async/callback/ICallback.java create mode 100755 asyncTool/src/main/java/com/jd/platform/async/callback/IGroupCallback.java create mode 100644 asyncTool/src/main/java/com/jd/platform/async/callback/ITimeoutWorker.java create mode 100755 asyncTool/src/main/java/com/jd/platform/async/callback/IWorker.java create mode 100644 asyncTool/src/main/java/com/jd/platform/async/exception/SkippedException.java create mode 100644 asyncTool/src/main/java/com/jd/platform/async/executor/Async.java create mode 100644 asyncTool/src/main/java/com/jd/platform/async/executor/timer/SystemClock.java create mode 100644 asyncTool/src/main/java/com/jd/platform/async/worker/DependWrapper.java create mode 100755 asyncTool/src/main/java/com/jd/platform/async/worker/ResultState.java create mode 100755 asyncTool/src/main/java/com/jd/platform/async/worker/WorkResult.java create mode 100755 asyncTool/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java create mode 100755 asyncTool/src/test/java/depend/DeWorker.java create mode 100755 asyncTool/src/test/java/depend/DeWorker1.java create mode 100755 asyncTool/src/test/java/depend/DeWorker2.java create mode 100644 asyncTool/src/test/java/depend/LambdaTest.java create mode 100644 asyncTool/src/test/java/depend/Test.java create mode 100644 asyncTool/src/test/java/depend/User.java create mode 100755 asyncTool/src/test/java/dependnew/DeWorker.java create mode 100755 asyncTool/src/test/java/dependnew/DeWorker1.java create mode 100755 asyncTool/src/test/java/dependnew/DeWorker2.java create mode 100644 asyncTool/src/test/java/dependnew/Test.java create mode 100644 asyncTool/src/test/java/dependnew/User.java create mode 100755 asyncTool/src/test/java/parallel/ParTimeoutWorker.java create mode 100755 asyncTool/src/test/java/parallel/ParWorker.java create mode 100755 asyncTool/src/test/java/parallel/ParWorker1.java create mode 100755 asyncTool/src/test/java/parallel/ParWorker2.java create mode 100755 asyncTool/src/test/java/parallel/ParWorker3.java create mode 100755 asyncTool/src/test/java/parallel/ParWorker4.java create mode 100755 asyncTool/src/test/java/parallel/TestPar.java create mode 100755 asyncTool/src/test/java/seq/SeqTimeoutWorker.java create mode 100755 asyncTool/src/test/java/seq/SeqWorker.java create mode 100755 asyncTool/src/test/java/seq/SeqWorker1.java create mode 100755 asyncTool/src/test/java/seq/SeqWorker2.java create mode 100755 asyncTool/src/test/java/seq/TestSequential.java create mode 100755 asyncTool/src/test/java/seq/TestSequentialTimeout.java diff --git a/asyncTool/pom.xml b/asyncTool/pom.xml new file mode 100644 index 0000000..bb186c1 --- /dev/null +++ b/asyncTool/pom.xml @@ -0,0 +1,57 @@ + + + + 4.0.0 + + com.jd.platform + asyncTool + 1.3.1 + + + UTF-8 + 1.8 + 1.8 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.3 + + 1.8 + 1.8 + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-sources + + jar + + + + + + + + + + + rdc-releases + Nexus Release Repository + https://packages.aliyun.com/maven/repository/2005773-release-XI7cl5/ + + + rdc-snapshots + Nexus Snapshot Repository + https://packages.aliyun.com/maven/repository/2005773-snapshot-V5Gjdf/ + + + + diff --git a/asyncTool/src/main/java/com/jd/platform/async/callback/DefaultCallback.java b/asyncTool/src/main/java/com/jd/platform/async/callback/DefaultCallback.java new file mode 100755 index 0000000..aaa5c70 --- /dev/null +++ b/asyncTool/src/main/java/com/jd/platform/async/callback/DefaultCallback.java @@ -0,0 +1,20 @@ +package com.jd.platform.async.callback; + +import com.jd.platform.async.worker.WorkResult; + +/** + * 默认回调类,如果不设置的话,会默认给这个回调 + * @author wuweifeng wrote on 2019-11-19. + */ +public class DefaultCallback implements ICallback { + @Override + public void begin() { + + } + + @Override + public void result(boolean success, T param, WorkResult workResult) { + + } + +} diff --git a/asyncTool/src/main/java/com/jd/platform/async/callback/DefaultGroupCallback.java b/asyncTool/src/main/java/com/jd/platform/async/callback/DefaultGroupCallback.java new file mode 100644 index 0000000..b9485d3 --- /dev/null +++ b/asyncTool/src/main/java/com/jd/platform/async/callback/DefaultGroupCallback.java @@ -0,0 +1,21 @@ +package com.jd.platform.async.callback; + +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.List; + +/** + * @author wuweifeng wrote on 2019-12-27 + * @version 1.0 + */ +public class DefaultGroupCallback implements IGroupCallback { + @Override + public void success(List workerWrappers) { + + } + + @Override + public void failure(List workerWrappers, Exception e) { + + } +} diff --git a/asyncTool/src/main/java/com/jd/platform/async/callback/ICallback.java b/asyncTool/src/main/java/com/jd/platform/async/callback/ICallback.java new file mode 100755 index 0000000..ac37456 --- /dev/null +++ b/asyncTool/src/main/java/com/jd/platform/async/callback/ICallback.java @@ -0,0 +1,25 @@ +package com.jd.platform.async.callback; + +import com.jd.platform.async.worker.WorkResult; + +/** + * 每个执行单元执行完毕后,会回调该接口

+ * 需要监听执行结果的,实现该接口即可 + * + * @author wuweifeng wrote on 2019-11-19. + */ +@FunctionalInterface +public interface ICallback { + + /** + * 任务开始的监听 + */ + default void begin() { + + } + + /** + * 耗时操作执行完毕后,就给value注入值 + */ + void result(boolean success, T param, WorkResult workResult); +} diff --git a/asyncTool/src/main/java/com/jd/platform/async/callback/IGroupCallback.java b/asyncTool/src/main/java/com/jd/platform/async/callback/IGroupCallback.java new file mode 100755 index 0000000..02880b4 --- /dev/null +++ b/asyncTool/src/main/java/com/jd/platform/async/callback/IGroupCallback.java @@ -0,0 +1,20 @@ +package com.jd.platform.async.callback; + +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.List; + +/** + * 如果是异步执行整组的话,可以用这个组回调。不推荐使用 + * @author wuweifeng wrote on 2019-11-19. + */ +public interface IGroupCallback { + /** + * 成功后,可以从wrapper里去getWorkResult + */ + void success(List workerWrappers); + /** + * 失败了,也可以从wrapper里去getWorkResult + */ + void failure(List workerWrappers, Exception e); +} diff --git a/asyncTool/src/main/java/com/jd/platform/async/callback/ITimeoutWorker.java b/asyncTool/src/main/java/com/jd/platform/async/callback/ITimeoutWorker.java new file mode 100644 index 0000000..be5e7ec --- /dev/null +++ b/asyncTool/src/main/java/com/jd/platform/async/callback/ITimeoutWorker.java @@ -0,0 +1,20 @@ +package com.jd.platform.async.callback; + +/** + * @author wuweifeng wrote on 2019-12-20 + * @version 1.0 + */ +public interface ITimeoutWorker extends IWorker { + /** + * 每个worker都可以设置超时时间 + * @return 毫秒超时时间 + */ + long timeOut(); + + /** + * 是否开启单个执行单元的超时功能(有时是一个group设置个超时,而不具备关心单个worker的超时) + *

注意,如果开启了单个执行单元的超时检测,将使线程池数量多出一倍

+ * @return 是否开启 + */ + boolean enableTimeOut(); +} diff --git a/asyncTool/src/main/java/com/jd/platform/async/callback/IWorker.java b/asyncTool/src/main/java/com/jd/platform/async/callback/IWorker.java new file mode 100755 index 0000000..0f3f6e6 --- /dev/null +++ b/asyncTool/src/main/java/com/jd/platform/async/callback/IWorker.java @@ -0,0 +1,30 @@ +package com.jd.platform.async.callback; + +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * 每个最小执行单元需要实现该接口 + * + * @author wuweifeng wrote on 2019-11-19. + */ +@FunctionalInterface +public interface IWorker { + /** + * 在这里做耗时操作,如rpc请求、IO等 + * + * @param object object + * @param allWrappers 任务包装 + */ + V action(T object, Map allWrappers); + + /** + * 超时、异常时,返回的默认值 + * + * @return 默认值 + */ + default V defaultValue() { + return null; + } +} diff --git a/asyncTool/src/main/java/com/jd/platform/async/exception/SkippedException.java b/asyncTool/src/main/java/com/jd/platform/async/exception/SkippedException.java new file mode 100644 index 0000000..e5f5283 --- /dev/null +++ b/asyncTool/src/main/java/com/jd/platform/async/exception/SkippedException.java @@ -0,0 +1,17 @@ +package com.jd.platform.async.exception; + +/** + * 如果任务在执行之前,自己后面的任务已经执行完或正在被执行,则抛该exception + * @author wuweifeng wrote on 2020-02-18 + * @version 1.0 + */ +public class SkippedException extends RuntimeException { + + public SkippedException() { + super(); + } + + public SkippedException(String message) { + super(message); + } +} diff --git a/asyncTool/src/main/java/com/jd/platform/async/executor/Async.java b/asyncTool/src/main/java/com/jd/platform/async/executor/Async.java new file mode 100644 index 0000000..68acaf8 --- /dev/null +++ b/asyncTool/src/main/java/com/jd/platform/async/executor/Async.java @@ -0,0 +1,126 @@ +package com.jd.platform.async.executor; + +import com.jd.platform.async.callback.DefaultGroupCallback; +import com.jd.platform.async.callback.IGroupCallback; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * 类入口,可以根据自己情况调整core线程的数量 + * @author wuweifeng wrote on 2019-12-18 + * @version 1.0 + */ +public class Async { + + public static final ThreadPoolExecutor COMMON_POOL = + new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() * 2, 1024, + 15L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + (ThreadFactory) Thread::new); + + public static boolean beginWork(long timeout, ThreadPoolExecutor pool, List workerWrappers) throws ExecutionException, InterruptedException { + if(workerWrappers == null || workerWrappers.size() == 0) { + return false; + } + //定义一个map,存放所有的wrapper,key为wrapper的唯一id,value是该wrapper,可以从value中获取wrapper的result + Map forParamUseWrappers = new ConcurrentHashMap<>(); + CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()]; + for (int i = 0; i < workerWrappers.size(); i++) { + WorkerWrapper wrapper = workerWrappers.get(i); + futures[i] = CompletableFuture.runAsync(() -> wrapper.work(pool, timeout, forParamUseWrappers), pool); + } + try { + CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS); + return true; + } catch (TimeoutException e) { + Set set = new HashSet<>(); + totalWorkers(workerWrappers, set); + for (WorkerWrapper wrapper : set) { + wrapper.stopNow(); + } + return false; + } + } + + /** + * 如果想自定义线程池,请传pool。不自定义的话,就走默认的COMMON_POOL + */ + public static boolean beginWork(long timeout, ThreadPoolExecutor pool, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException { + if(workerWrapper == null || workerWrapper.length == 0) { + return false; + } + List workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toList()); + return beginWork(timeout, pool, workerWrappers); + } + + /** + * 同步阻塞,直到所有都完成,或失败 + */ + public static boolean beginWork(long timeout, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException { + return beginWork(timeout, COMMON_POOL, workerWrapper); + } + + /** + * 异步执行,直到所有都完成,或失败后,发起回调 + */ + public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) { + if (groupCallback == null) { + groupCallback = new DefaultGroupCallback(); + } + IGroupCallback finalGroupCallback = groupCallback; + CompletableFuture.runAsync(() -> { + try { + boolean success = beginWork(timeout, COMMON_POOL, workerWrapper); + if (success) { + finalGroupCallback.success(Arrays.asList(workerWrapper)); + } else { + finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException()); + } + } catch (ExecutionException | InterruptedException e) { + e.printStackTrace(); + finalGroupCallback.failure(Arrays.asList(workerWrapper), e); + } + }); + } + + /** + * 总共多少个执行单元 + */ + @SuppressWarnings("unchecked") + private static void totalWorkers(List workerWrappers, Set set) { + set.addAll(workerWrappers); + for (WorkerWrapper wrapper : workerWrappers) { + if (wrapper.getNextWrappers() == null) { + continue; + } + List wrappers = wrapper.getNextWrappers(); + totalWorkers(wrappers, set); + } + + } + + + public static void shutDown() { + COMMON_POOL.shutdown(); + } + + public static String getThreadCount() { + return "activeCount=" + COMMON_POOL.getActiveCount() + + " completedCount " + COMMON_POOL.getCompletedTaskCount() + + " largestCount " + COMMON_POOL.getLargestPoolSize(); + } +} diff --git a/asyncTool/src/main/java/com/jd/platform/async/executor/timer/SystemClock.java b/asyncTool/src/main/java/com/jd/platform/async/executor/timer/SystemClock.java new file mode 100644 index 0000000..6cba50a --- /dev/null +++ b/asyncTool/src/main/java/com/jd/platform/async/executor/timer/SystemClock.java @@ -0,0 +1,51 @@ +package com.jd.platform.async.executor.timer; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * 用于解决高并发下System.currentTimeMillis卡顿 + * @author lry + */ +public class SystemClock { + + private final int period; + + private final AtomicLong now; + + private static class InstanceHolder { + private static final SystemClock INSTANCE = new SystemClock(1); + } + + private SystemClock(int period) { + this.period = period; + this.now = new AtomicLong(System.currentTimeMillis()); + scheduleClockUpdating(); + } + + private static SystemClock instance() { + return InstanceHolder.INSTANCE; + } + + private void scheduleClockUpdating() { + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(runnable -> { + Thread thread = new Thread(runnable, "System Clock"); + thread.setDaemon(true); + return thread; + }); + scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS); + } + + private long currentTimeMillis() { + return now.get(); + } + + /** + * 用来替换原来的System.currentTimeMillis() + */ + public static long now() { + return instance().currentTimeMillis(); + } +} \ No newline at end of file diff --git a/asyncTool/src/main/java/com/jd/platform/async/worker/DependWrapper.java b/asyncTool/src/main/java/com/jd/platform/async/worker/DependWrapper.java new file mode 100644 index 0000000..6e56859 --- /dev/null +++ b/asyncTool/src/main/java/com/jd/platform/async/worker/DependWrapper.java @@ -0,0 +1,61 @@ +package com.jd.platform.async.worker; + + +import com.jd.platform.async.wrapper.WorkerWrapper; + +/** + * 对依赖的wrapper的封装 + * @author wuweifeng wrote on 2019-12-20 + * @version 1.0 + */ +public class DependWrapper { + + private WorkerWrapper dependWrapper; + /** + * 是否该依赖必须完成后才能执行自己.

+ * 因为存在一个任务,依赖于多个任务,是让这多个任务全部完成后才执行自己,还是某几个执行完毕就可以执行自己 + * 如 + * 1 + * ---3 + * 2 + * 或 + * 1---3 + * 2---3 + * 这两种就不一样,上面的就是必须12都完毕,才能3 + * 下面的就是1完毕就可以3 + */ + private boolean must = true; + + public DependWrapper(WorkerWrapper dependWrapper, boolean must) { + this.dependWrapper = dependWrapper; + this.must = must; + } + + public DependWrapper() { + } + + public WorkerWrapper getDependWrapper() { + return dependWrapper; + } + + public void setDependWrapper(WorkerWrapper dependWrapper) { + this.dependWrapper = dependWrapper; + } + + public boolean isMust() { + return must; + } + + public void setMust(boolean must) { + this.must = must; + } + + @Override + public String toString() { + return "DependWrapper{" + + "dependWrapper=" + dependWrapper + + ", must=" + must + + '}'; + } + +} diff --git a/asyncTool/src/main/java/com/jd/platform/async/worker/ResultState.java b/asyncTool/src/main/java/com/jd/platform/async/worker/ResultState.java new file mode 100755 index 0000000..40a9f1e --- /dev/null +++ b/asyncTool/src/main/java/com/jd/platform/async/worker/ResultState.java @@ -0,0 +1,14 @@ +package com.jd.platform.async.worker; + +/** + * 结果状态 + * @author wuweifeng wrote on 2019-11-19. + */ +public enum ResultState { + + SUCCESS, + TIMEOUT, + EXCEPTION, + DEFAULT //默认状态 + +} diff --git a/asyncTool/src/main/java/com/jd/platform/async/worker/WorkResult.java b/asyncTool/src/main/java/com/jd/platform/async/worker/WorkResult.java new file mode 100755 index 0000000..d6bbf30 --- /dev/null +++ b/asyncTool/src/main/java/com/jd/platform/async/worker/WorkResult.java @@ -0,0 +1,65 @@ +package com.jd.platform.async.worker; + +/** + * 执行结果 + */ +public class WorkResult { + + /** + * 执行的结果 + */ + private V result; + /** + * 结果状态 + */ + private ResultState resultState; + private Exception ex; + + public WorkResult(V result, ResultState resultState) { + this(result, resultState, null); + } + + public WorkResult(V result, ResultState resultState, Exception ex) { + this.result = result; + this.resultState = resultState; + this.ex = ex; + } + + public static WorkResult defaultResult() { + return new WorkResult<>(null, ResultState.DEFAULT); + } + + @Override + public String toString() { + return "WorkResult{" + + "result=" + result + + ", resultState=" + resultState + + ", ex=" + ex + + '}'; + } + + public Exception getEx() { + return ex; + } + + public void setEx(Exception ex) { + this.ex = ex; + } + + public V getResult() { + return result; + } + + public void setResult(V result) { + this.result = result; + } + + public ResultState getResultState() { + return resultState; + } + + public void setResultState(ResultState resultState) { + this.resultState = resultState; + } + +} diff --git a/asyncTool/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java b/asyncTool/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java new file mode 100755 index 0000000..30e1011 --- /dev/null +++ b/asyncTool/src/main/java/com/jd/platform/async/wrapper/WorkerWrapper.java @@ -0,0 +1,615 @@ +package com.jd.platform.async.wrapper; + +import com.jd.platform.async.callback.DefaultCallback; +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.exception.SkippedException; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.DependWrapper; +import com.jd.platform.async.worker.ResultState; +import com.jd.platform.async.worker.WorkResult; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 对每个worker及callback进行包装,一对一 + * + * @author wuweifeng wrote on 2019-11-19. + */ +public class WorkerWrapper { + + /** + * 该wrapper的唯一标识 + */ + private String id; + /** + * worker将来要处理的param + */ + private T param; + private IWorker worker; + private ICallback callback; + /** + * 在自己后面的wrapper,如果没有,自己就是末尾;如果有一个,就是串行;如果有多个,有几个就需要开几个线程

+ * -------2 + * 1 + * -------3 + * 如1后面有2、3 + */ + private List> nextWrappers; + /** + * 依赖的wrappers,有2种情况,1:必须依赖的全部完成后,才能执行自己 2:依赖的任何一个、多个完成了,就可以执行自己 + * 通过must字段来控制是否依赖项必须完成 + * 1 + * -------3 + * 2 + * 1、2执行完毕后才能执行3 + */ + private List dependWrappers; + /** + * 标记该事件是否已经被处理过了,譬如已经超时返回false了,后续rpc又收到返回值了,则不再二次回调 + * 经试验,volatile并不能保证"同一毫秒"内,多线程对该值的修改和拉取 + *

+ * 1-finish, 2-error, 3-working + */ + private AtomicInteger state = new AtomicInteger(0); + /** + * 该map存放所有wrapper的id和wrapper映射 + */ + private Map forParamUseWrappers; + /** + * 也是个钩子变量,用来存临时的结果 + */ + private volatile WorkResult workResult = WorkResult.defaultResult(); + /** + * 是否在执行自己前,去校验nextWrapper的执行结果

+ * 1 4 + * -------3 + * 2 + * 如这种在4执行前,可能3已经执行完毕了(被2执行完后触发的),那么4就没必要执行了。 + * 注意,该属性仅在nextWrapper数量<=1时有效,>1时的情况是不存在的 + */ + private volatile boolean needCheckNextWrapperResult = true; + + private static final int FINISH = 1; + private static final int ERROR = 2; + private static final int WORKING = 3; + private static final int INIT = 0; + + private WorkerWrapper(String id, IWorker worker, T param, ICallback callback) { + if (worker == null) { + throw new NullPointerException("async.worker is null"); + } + this.worker = worker; + this.param = param; + this.id = id; + //允许不设置回调 + if (callback == null) { + callback = new DefaultCallback<>(); + } + this.callback = callback; + } + + /** + * 开始工作 + * fromWrapper代表这次work是由哪个上游wrapper发起的 + */ + private void work(ThreadPoolExecutor poolExecutor, WorkerWrapper fromWrapper, long remainTime, Map forParamUseWrappers) { + this.forParamUseWrappers = forParamUseWrappers; + //将自己放到所有wrapper的集合里去 + forParamUseWrappers.put(id, this); + long now = SystemClock.now(); + //总的已经超时了,就快速失败,进行下一个 + if (remainTime <= 0) { + fastFail(INIT, null); + beginNext(poolExecutor, now, remainTime); + return; + } + //如果自己已经执行过了。 + //可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了 + if (getState() == FINISH || getState() == ERROR) { + beginNext(poolExecutor, now, remainTime); + return; + } + + //如果在执行前需要校验nextWrapper的状态 + if (needCheckNextWrapperResult) { + //如果自己的next链上有已经出结果或已经开始执行的任务了,自己就不用继续了 + if (!checkNextWrapperResult()) { + fastFail(INIT, new SkippedException()); + beginNext(poolExecutor, now, remainTime); + return; + } + } + + //如果没有任何依赖,说明自己就是第一批要执行的 + if (dependWrappers == null || dependWrappers.size() == 0) { + fire(); + beginNext(poolExecutor, now, remainTime); + return; + } + + /*如果有前方依赖,存在两种情况 + 一种是前面只有一个wrapper。即 A -> B + 一种是前面有多个wrapper。A C D -> B。需要A、C、D都完成了才能轮到B。但是无论是A执行完,还是C执行完,都会去唤醒B。 + 所以需要B来做判断,必须A、C、D都完成,自己才能执行 */ + + //只有一个依赖 + if (dependWrappers.size() == 1) { + doDependsOneJob(fromWrapper); + beginNext(poolExecutor, now, remainTime); + } else { + //有多个依赖时 + doDependsJobs(poolExecutor, dependWrappers, fromWrapper, now, remainTime); + } + + } + + + public void work(ThreadPoolExecutor poolExecutor, long remainTime, Map forParamUseWrappers) { + work(poolExecutor, null, remainTime, forParamUseWrappers); + } + + /** + * 总控制台超时,停止所有任务 + */ + public void stopNow() { + if (getState() == INIT || getState() == WORKING) { + fastFail(getState(), null); + } + } + + /** + * 判断自己下游链路上,是否存在已经出结果的或已经开始执行的 + * 如果没有返回true,如果有返回false + */ + private boolean checkNextWrapperResult() { + //如果自己就是最后一个,或者后面有并行的多个,就返回true + if (nextWrappers == null || nextWrappers.size() != 1) { + return getState() == INIT; + } + WorkerWrapper nextWrapper = nextWrappers.get(0); + boolean state = nextWrapper.getState() == INIT; + //继续校验自己的next的状态 + return state && nextWrapper.checkNextWrapperResult(); + } + + /** + * 进行下一个任务 + */ + private void beginNext(ThreadPoolExecutor poolExecutor, long now, long remainTime) { + //花费的时间 + long costTime = SystemClock.now() - now; + if (nextWrappers == null) { + return; + } + if (nextWrappers.size() == 1) { + nextWrappers.get(0).work(poolExecutor, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers); + return; + } + CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()]; + for (int i = 0; i < nextWrappers.size(); i++) { + int finalI = i; + futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI) + .work(poolExecutor, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), poolExecutor); + } + try { + CompletableFuture.allOf(futures).get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + } + + private void doDependsOneJob(WorkerWrapper dependWrapper) { + if (ResultState.TIMEOUT == dependWrapper.getWorkResult().getResultState()) { + workResult = defaultResult(); + fastFail(INIT, null); + } else if (ResultState.EXCEPTION == dependWrapper.getWorkResult().getResultState()) { + workResult = defaultExResult(dependWrapper.getWorkResult().getEx()); + fastFail(INIT, null); + } else { + //前面任务正常完毕了,该自己了 + fire(); + } + } + + private synchronized void doDependsJobs(ThreadPoolExecutor poolExecutor, List dependWrappers, WorkerWrapper fromWrapper, long now, long remainTime) { + boolean nowDependIsMust = false; + //创建必须完成的上游wrapper集合 + Set mustWrapper = new HashSet<>(); + for (DependWrapper dependWrapper : dependWrappers) { + if (dependWrapper.isMust()) { + mustWrapper.add(dependWrapper); + } + if (dependWrapper.getDependWrapper().equals(fromWrapper)) { + nowDependIsMust = dependWrapper.isMust(); + } + } + + //如果全部是不必须的条件,那么只要到了这里,就执行自己。 + if (mustWrapper.size() == 0) { + if (ResultState.TIMEOUT == fromWrapper.getWorkResult().getResultState()) { + fastFail(INIT, null); + } else { + fire(); + } + beginNext(poolExecutor, now, remainTime); + return; + } + + //如果存在需要必须完成的,且fromWrapper不是必须的,就什么也不干 + if (!nowDependIsMust) { + return; + } + + //如果fromWrapper是必须的 + boolean existNoFinish = false; + boolean hasError = false; + //先判断前面必须要执行的依赖任务的执行结果,如果有任何一个失败,那就不用走action了,直接给自己设置为失败,进行下一步就是了 + for (DependWrapper dependWrapper : mustWrapper) { + WorkerWrapper workerWrapper = dependWrapper.getDependWrapper(); + WorkResult tempWorkResult = workerWrapper.getWorkResult(); + //为null或者isWorking,说明它依赖的某个任务还没执行到或没执行完 + if (workerWrapper.getState() == INIT || workerWrapper.getState() == WORKING) { + existNoFinish = true; + break; + } + if (ResultState.TIMEOUT == tempWorkResult.getResultState()) { + workResult = defaultResult(); + hasError = true; + break; + } + if (ResultState.EXCEPTION == tempWorkResult.getResultState()) { + workResult = defaultExResult(workerWrapper.getWorkResult().getEx()); + hasError = true; + break; + } + + } + //只要有失败的 + if (hasError) { + fastFail(INIT, null); + beginNext(poolExecutor, now, remainTime); + return; + } + + //如果上游都没有失败,分为两种情况,一种是都finish了,一种是有的在working + //都finish的话 + if (!existNoFinish) { + //上游都finish了,进行自己 + fire(); + beginNext(poolExecutor, now, remainTime); + } + } + + /** + * 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程 + */ + private void fire() { + //阻塞取结果 + workResult = workerDoJob(); + } + + /** + * 快速失败 + */ + private boolean fastFail(int expect, Exception e) { + //试图将它从expect状态,改成Error + if (!compareAndSetState(expect, ERROR)) { + return false; + } + + //尚未处理过结果 + if (checkIsNullResult()) { + if (e == null) { + workResult = defaultResult(); + } else { + workResult = defaultExResult(e); + } + } + + callback.result(false, param, workResult); + return true; + } + + /** + * 具体的单个worker执行任务 + */ + private WorkResult workerDoJob() { + //避免重复执行 + if (!checkIsNullResult()) { + return workResult; + } + try { + //如果已经不是init状态了,说明正在被执行或已执行完毕。这一步很重要,可以保证任务不被重复执行 + if (!compareAndSetState(INIT, WORKING)) { + return workResult; + } + + callback.begin(); + + //执行耗时操作 + V resultValue = worker.action(param, forParamUseWrappers); + + //如果状态不是在working,说明别的地方已经修改了 + if (!compareAndSetState(WORKING, FINISH)) { + return workResult; + } + + workResult.setResultState(ResultState.SUCCESS); + workResult.setResult(resultValue); + //回调成功 + callback.result(true, param, workResult); + + return workResult; + } catch (Exception e) { + //避免重复回调 + if (!checkIsNullResult()) { + return workResult; + } + fastFail(WORKING, e); + return workResult; + } + } + + public WorkResult getWorkResult() { + return workResult; + } + + public List> getNextWrappers() { + return nextWrappers; + } + + public void setParam(T param) { + this.param = param; + } + + private boolean checkIsNullResult() { + return ResultState.DEFAULT == workResult.getResultState(); + } + + private void addDepend(WorkerWrapper workerWrapper, boolean must) { + addDepend(new DependWrapper(workerWrapper, must)); + } + + private void addDepend(DependWrapper dependWrapper) { + if (dependWrappers == null) { + dependWrappers = new ArrayList<>(); + } + //如果依赖的是重复的同一个,就不重复添加了 + for (DependWrapper wrapper : dependWrappers) { + if (wrapper.equals(dependWrapper)) { + return; + } + } + dependWrappers.add(dependWrapper); + } + + private void addNext(WorkerWrapper workerWrapper) { + if (nextWrappers == null) { + nextWrappers = new ArrayList<>(); + } + //避免添加重复 + for (WorkerWrapper wrapper : nextWrappers) { + if (workerWrapper.equals(wrapper)) { + return; + } + } + nextWrappers.add(workerWrapper); + } + + private void addNextWrappers(List> wrappers) { + if (wrappers == null) { + return; + } + for (WorkerWrapper wrapper : wrappers) { + addNext(wrapper); + } + } + + private void addDependWrappers(List dependWrappers) { + if (dependWrappers == null) { + return; + } + for (DependWrapper wrapper : dependWrappers) { + addDepend(wrapper); + } + } + + private WorkResult defaultResult() { + workResult.setResultState(ResultState.TIMEOUT); + workResult.setResult(worker.defaultValue()); + return workResult; + } + + private WorkResult defaultExResult(Exception ex) { + workResult.setResultState(ResultState.EXCEPTION); + workResult.setResult(worker.defaultValue()); + workResult.setEx(ex); + return workResult; + } + + + private int getState() { + return state.get(); + } + + public String getId() { + return id; + } + + private boolean compareAndSetState(int expect, int update) { + return this.state.compareAndSet(expect, update); + } + + private void setNeedCheckNextWrapperResult(boolean needCheckNextWrapperResult) { + this.needCheckNextWrapperResult = needCheckNextWrapperResult; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WorkerWrapper that = (WorkerWrapper) o; + return needCheckNextWrapperResult == that.needCheckNextWrapperResult && + Objects.equals(param, that.param) && + Objects.equals(worker, that.worker) && + Objects.equals(callback, that.callback) && + Objects.equals(nextWrappers, that.nextWrappers) && + Objects.equals(dependWrappers, that.dependWrappers) && + Objects.equals(state, that.state) && + Objects.equals(workResult, that.workResult); + } + + @Override + public int hashCode() { + return Objects.hash(param, worker, callback, nextWrappers, dependWrappers, state, workResult, needCheckNextWrapperResult); + } + + public static class Builder { + /** + * 该wrapper的唯一标识 + */ + private String id = UUID.randomUUID().toString(); + /** + * worker将来要处理的param + */ + private W param; + private IWorker worker; + private ICallback callback; + /** + * 自己后面的所有 + */ + private List> nextWrappers; + /** + * 自己依赖的所有 + */ + private List dependWrappers; + /** + * 存储强依赖于自己的wrapper集合 + */ + private Set> selfIsMustSet; + + private boolean needCheckNextWrapperResult = true; + + public Builder worker(IWorker worker) { + this.worker = worker; + return this; + } + + public Builder param(W w) { + this.param = w; + return this; + } + + public Builder id(String id) { + if (id != null) { + this.id = id; + } + return this; + } + + public Builder needCheckNextWrapperResult(boolean needCheckNextWrapperResult) { + this.needCheckNextWrapperResult = needCheckNextWrapperResult; + return this; + } + + public Builder callback(ICallback callback) { + this.callback = callback; + return this; + } + + public Builder depend(WorkerWrapper... wrappers) { + if (wrappers == null) { + return this; + } + for (WorkerWrapper wrapper : wrappers) { + depend(wrapper); + } + return this; + } + + public Builder depend(WorkerWrapper wrapper) { + return depend(wrapper, true); + } + + public Builder depend(WorkerWrapper wrapper, boolean isMust) { + if (wrapper == null) { + return this; + } + DependWrapper dependWrapper = new DependWrapper(wrapper, isMust); + if (dependWrappers == null) { + dependWrappers = new ArrayList<>(); + } + dependWrappers.add(dependWrapper); + return this; + } + + public Builder next(WorkerWrapper wrapper) { + return next(wrapper, true); + } + + public Builder next(WorkerWrapper wrapper, boolean selfIsMust) { + if (nextWrappers == null) { + nextWrappers = new ArrayList<>(); + } + nextWrappers.add(wrapper); + + //强依赖自己 + if (selfIsMust) { + if (selfIsMustSet == null) { + selfIsMustSet = new HashSet<>(); + } + selfIsMustSet.add(wrapper); + } + return this; + } + + public Builder next(WorkerWrapper... wrappers) { + if (wrappers == null) { + return this; + } + for (WorkerWrapper wrapper : wrappers) { + next(wrapper); + } + return this; + } + + public WorkerWrapper build() { + WorkerWrapper wrapper = new WorkerWrapper<>(id, worker, param, callback); + wrapper.setNeedCheckNextWrapperResult(needCheckNextWrapperResult); + if (dependWrappers != null) { + for (DependWrapper workerWrapper : dependWrappers) { + workerWrapper.getDependWrapper().addNext(wrapper); + wrapper.addDepend(workerWrapper); + } + } + if (nextWrappers != null) { + for (WorkerWrapper workerWrapper : nextWrappers) { + boolean must = false; + if (selfIsMustSet != null && selfIsMustSet.contains(workerWrapper)) { + must = true; + } + workerWrapper.addDepend(wrapper, must); + wrapper.addNext(workerWrapper); + } + } + + return wrapper; + } + + } +} diff --git a/asyncTool/src/test/java/depend/DeWorker.java b/asyncTool/src/test/java/depend/DeWorker.java new file mode 100755 index 0000000..e963816 --- /dev/null +++ b/asyncTool/src/test/java/depend/DeWorker.java @@ -0,0 +1,42 @@ +package depend; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class DeWorker implements IWorker, ICallback { + + @Override + public User action(String object, Map allWrappers) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return new User("user0"); + } + + + @Override + public User defaultValue() { + return new User("default User"); + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + System.out.println("worker0 的结果是:" + workResult.getResult()); + } + +} diff --git a/asyncTool/src/test/java/depend/DeWorker1.java b/asyncTool/src/test/java/depend/DeWorker1.java new file mode 100755 index 0000000..6cafc30 --- /dev/null +++ b/asyncTool/src/test/java/depend/DeWorker1.java @@ -0,0 +1,43 @@ +package depend; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class DeWorker1 implements IWorker, User>, ICallback, User> { + + @Override + public User action(WorkResult result, Map allWrappers) { + System.out.println("par1的入参来自于par0: " + result.getResult()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return new User("user1"); + } + + + @Override + public User defaultValue() { + return new User("default User"); + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, WorkResult param, WorkResult workResult) { + System.out.println("worker1 的结果是:" + workResult.getResult()); + } + +} diff --git a/asyncTool/src/test/java/depend/DeWorker2.java b/asyncTool/src/test/java/depend/DeWorker2.java new file mode 100755 index 0000000..3dd73e7 --- /dev/null +++ b/asyncTool/src/test/java/depend/DeWorker2.java @@ -0,0 +1,43 @@ +package depend; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class DeWorker2 implements IWorker, String>, ICallback, String> { + + @Override + public String action(WorkResult result, Map allWrappers) { + System.out.println("par2的入参来自于par1: " + result.getResult()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return result.getResult().getName(); + } + + + @Override + public String defaultValue() { + return "default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, WorkResult param, WorkResult workResult) { + System.out.println("worker2 的结果是:" + workResult.getResult()); + } + +} diff --git a/asyncTool/src/test/java/depend/LambdaTest.java b/asyncTool/src/test/java/depend/LambdaTest.java new file mode 100644 index 0000000..e4df734 --- /dev/null +++ b/asyncTool/src/test/java/depend/LambdaTest.java @@ -0,0 +1,74 @@ +package depend; + +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author sjsdfg + * @since 2020/6/14 + */ +public class LambdaTest { + public static void main(String[] args) throws Exception { + WorkerWrapper, String> workerWrapper2 = new WorkerWrapper.Builder, String>() + .worker((WorkResult result, Map allWrappers) -> { + System.out.println("par2的入参来自于par1: " + result.getResult()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return result.getResult().getName(); + }) + .callback((boolean success, WorkResult param, WorkResult workResult) -> + System.out.println(String.format("thread is %s, param is %s, result is %s", Thread.currentThread().getName(), param, workResult))) + .id("third") + .build(); + + WorkerWrapper, User> workerWrapper1 = new WorkerWrapper.Builder, User>() + .worker((WorkResult result, Map allWrappers) -> { + System.out.println("par1的入参来自于par0: " + result.getResult()); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return new User("user1"); + }) + .callback((boolean success, WorkResult param, WorkResult workResult) -> + System.out.println(String.format("thread is %s, param is %s, result is %s", Thread.currentThread().getName(), param, workResult))) + .id("second") + .next(workerWrapper2) + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker((String object, Map allWrappers) -> { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return new User("user0"); + }) + .param("0") + .id("first") + .next(workerWrapper1, true) + .callback((boolean success, String param, WorkResult workResult) -> + System.out.println(String.format("thread is %s, param is %s, result is %s", Thread.currentThread().getName(), param, workResult))) + .build(); + + //虽然尚未执行,但是也可以先取得结果的引用,作为下一个任务的入参。V1.2前写法,需要手工给 + //V1.3后,不用给wrapper setParam了,直接在worker的action里自行根据id获取即可.参考dependnew包下代码 + WorkResult result = workerWrapper.getWorkResult(); + WorkResult result1 = workerWrapper1.getWorkResult(); + workerWrapper1.setParam(result); + workerWrapper2.setParam(result1); + + Async.beginWork(3500, workerWrapper); + + System.out.println(workerWrapper2.getWorkResult()); + Async.shutDown(); + } +} diff --git a/asyncTool/src/test/java/depend/Test.java b/asyncTool/src/test/java/depend/Test.java new file mode 100644 index 0000000..971fdcf --- /dev/null +++ b/asyncTool/src/test/java/depend/Test.java @@ -0,0 +1,55 @@ +package depend; + +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.concurrent.ExecutionException; + + +/** + * 后面请求依赖于前面请求的执行结果 + * @author wuweifeng wrote on 2019-12-26 + * @version 1.0 + */ +public class Test { + + public static void main(String[] args) throws ExecutionException, InterruptedException { + DeWorker w = new DeWorker(); + DeWorker1 w1 = new DeWorker1(); + DeWorker2 w2 = new DeWorker2(); + + WorkerWrapper, String> workerWrapper2 = new WorkerWrapper.Builder, String>() + .worker(w2) + .callback(w2) + .id("third") + .build(); + + WorkerWrapper, User> workerWrapper1 = new WorkerWrapper.Builder, User>() + .worker(w1) + .callback(w1) + .id("second") + .next(workerWrapper2) + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .param("0") + .id("first") + .next(workerWrapper1, true) + .callback(w) + .build(); + + //虽然尚未执行,但是也可以先取得结果的引用,作为下一个任务的入参。V1.2前写法,需要手工给 + //V1.3后,不用给wrapper setParam了,直接在worker的action里自行根据id获取即可.参考dependnew包下代码 + WorkResult result = workerWrapper.getWorkResult(); + WorkResult result1 = workerWrapper1.getWorkResult(); + workerWrapper1.setParam(result); + workerWrapper2.setParam(result1); + + Async.beginWork(3500, workerWrapper); + + System.out.println(workerWrapper2.getWorkResult()); + Async.shutDown(); + } +} diff --git a/asyncTool/src/test/java/depend/User.java b/asyncTool/src/test/java/depend/User.java new file mode 100644 index 0000000..dfd6277 --- /dev/null +++ b/asyncTool/src/test/java/depend/User.java @@ -0,0 +1,29 @@ +package depend; + +/** + * 一个包装类 + * @author wuweifeng wrote on 2019-12-26 + * @version 1.0 + */ +public class User { + private String name; + + public User(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public String toString() { + return "User{" + + "name='" + name + '\'' + + '}'; + } +} diff --git a/asyncTool/src/test/java/dependnew/DeWorker.java b/asyncTool/src/test/java/dependnew/DeWorker.java new file mode 100755 index 0000000..6ae011f --- /dev/null +++ b/asyncTool/src/test/java/dependnew/DeWorker.java @@ -0,0 +1,42 @@ +package dependnew; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class DeWorker implements IWorker, ICallback { + + @Override + public User action(String object, Map allWrappers) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return new User("user0"); + } + + + @Override + public User defaultValue() { + return new User("default User"); + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + System.out.println("worker0 的结果是:" + workResult.getResult()); + } + +} diff --git a/asyncTool/src/test/java/dependnew/DeWorker1.java b/asyncTool/src/test/java/dependnew/DeWorker1.java new file mode 100755 index 0000000..0a56fdf --- /dev/null +++ b/asyncTool/src/test/java/dependnew/DeWorker1.java @@ -0,0 +1,45 @@ +package dependnew; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class DeWorker1 implements IWorker, ICallback { + + @Override + public User action(String object, Map allWrappers) { + System.out.println("-----------------"); + System.out.println("获取par0的执行结果: " + allWrappers.get("first").getWorkResult()); + System.out.println("取par0的结果作为自己的入参,并将par0的结果加上一些东西"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + User user0 = (User) allWrappers.get("first").getWorkResult().getResult(); + return new User(user0.getName() + " worker1 add"); + } + + @Override + public User defaultValue() { + return new User("default User"); + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + System.out.println("worker1 的结果是:" + workResult.getResult()); + } + +} diff --git a/asyncTool/src/test/java/dependnew/DeWorker2.java b/asyncTool/src/test/java/dependnew/DeWorker2.java new file mode 100755 index 0000000..c4f61bc --- /dev/null +++ b/asyncTool/src/test/java/dependnew/DeWorker2.java @@ -0,0 +1,45 @@ +package dependnew; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class DeWorker2 implements IWorker, ICallback { + + @Override + public String action(User object, Map allWrappers) { + System.out.println("-----------------"); + System.out.println("par1的执行结果是: " + allWrappers.get("second").getWorkResult()); + System.out.println("取par1的结果作为自己的入参,并将par1的结果加上一些东西"); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + User user1 = (User) allWrappers.get("second").getWorkResult().getResult(); + return user1.getName() + " worker2 add"; + } + + @Override + public String defaultValue() { + return "default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, User param, WorkResult workResult) { + System.out.println("worker2 的结果是:" + workResult.getResult()); + } + +} diff --git a/asyncTool/src/test/java/dependnew/Test.java b/asyncTool/src/test/java/dependnew/Test.java new file mode 100644 index 0000000..731e42b --- /dev/null +++ b/asyncTool/src/test/java/dependnew/Test.java @@ -0,0 +1,49 @@ +package dependnew; + +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.concurrent.ExecutionException; + + +/** + * 后面请求依赖于前面请求的执行结果 + * @author wuweifeng wrote on 2019-12-26 + * @version 1.0 + */ +public class Test { + + public static void main(String[] args) throws ExecutionException, InterruptedException { + DeWorker w = new DeWorker(); + DeWorker1 w1 = new DeWorker1(); + DeWorker2 w2 = new DeWorker2(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .id("third") + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .id("second") + .next(workerWrapper2) + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .param("0") + .id("first") + .next(workerWrapper1) + .callback(w) + .build(); + + //V1.3后,不用给wrapper setParam了,直接在worker的action里自行根据id获取即可 + + Async.beginWork(3500, workerWrapper); + + System.out.println(workerWrapper2.getWorkResult()); + Async.shutDown(); + } +} diff --git a/asyncTool/src/test/java/dependnew/User.java b/asyncTool/src/test/java/dependnew/User.java new file mode 100644 index 0000000..bbef801 --- /dev/null +++ b/asyncTool/src/test/java/dependnew/User.java @@ -0,0 +1,29 @@ +package dependnew; + +/** + * 一个包装类 + * @author wuweifeng wrote on 2019-12-26 + * @version 1.0 + */ +public class User { + private String name; + + public User(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + @Override + public String toString() { + return "User{" + + "name='" + name + '\'' + + '}'; + } +} diff --git a/asyncTool/src/test/java/parallel/ParTimeoutWorker.java b/asyncTool/src/test/java/parallel/ParTimeoutWorker.java new file mode 100755 index 0000000..7f7b9aa --- /dev/null +++ b/asyncTool/src/test/java/parallel/ParTimeoutWorker.java @@ -0,0 +1,49 @@ +package parallel; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class ParTimeoutWorker implements IWorker, ICallback { + + @Override + public String action(String object, Map allWrappers) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "result = " + SystemClock.now() + "---param = " + object + " from 0"; + } + + + @Override + public String defaultValue() { + return "worker0--default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + if (success) { + System.out.println("callback worker0 success--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } else { + System.err.println("callback worker0 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } + } + +} diff --git a/asyncTool/src/test/java/parallel/ParWorker.java b/asyncTool/src/test/java/parallel/ParWorker.java new file mode 100755 index 0000000..b174c51 --- /dev/null +++ b/asyncTool/src/test/java/parallel/ParWorker.java @@ -0,0 +1,49 @@ +package parallel; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class ParWorker implements IWorker, ICallback { + + @Override + public String action(String object, Map allWrappers) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "result = " + SystemClock.now() + "---param = " + object + " from 0"; + } + + + @Override + public String defaultValue() { + return "worker0--default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + if (success) { + System.out.println("callback worker0 success--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } else { + System.err.println("callback worker0 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } + } + +} diff --git a/asyncTool/src/test/java/parallel/ParWorker1.java b/asyncTool/src/test/java/parallel/ParWorker1.java new file mode 100755 index 0000000..7f13081 --- /dev/null +++ b/asyncTool/src/test/java/parallel/ParWorker1.java @@ -0,0 +1,53 @@ +package parallel; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class ParWorker1 implements IWorker, ICallback { + private long sleepTime = 1000; + + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } + + @Override + public String action(String object, Map allWrappers) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "result = " + SystemClock.now() + "---param = " + object + " from 1"; + } + + @Override + public String defaultValue() { + return "worker1--default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + if (success) { + System.out.println("callback worker1 success--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } else { + System.err.println("callback worker1 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } + } + +} diff --git a/asyncTool/src/test/java/parallel/ParWorker2.java b/asyncTool/src/test/java/parallel/ParWorker2.java new file mode 100755 index 0000000..0e89e45 --- /dev/null +++ b/asyncTool/src/test/java/parallel/ParWorker2.java @@ -0,0 +1,54 @@ +package parallel; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class ParWorker2 implements IWorker, ICallback { + private long sleepTime = 1000; + + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } + + @Override + public String action(String object, Map allWrappers) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "result = " + SystemClock.now() + "---param = " + object + " from 2"; + } + + + @Override + public String defaultValue() { + return "worker2--default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + if (success) { + System.out.println("callback worker2 success--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } else { + System.err.println("callback worker2 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } + } + +} diff --git a/asyncTool/src/test/java/parallel/ParWorker3.java b/asyncTool/src/test/java/parallel/ParWorker3.java new file mode 100755 index 0000000..4284b0f --- /dev/null +++ b/asyncTool/src/test/java/parallel/ParWorker3.java @@ -0,0 +1,53 @@ +package parallel; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class ParWorker3 implements IWorker, ICallback { + private long sleepTime = 1000; + + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } + @Override + public String action(String object, Map allWrappers) { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "result = " + SystemClock.now() + "---param = " + object + " from 3"; + } + + + @Override + public String defaultValue() { + return "worker3--default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + if (success) { + System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } else { + System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } + } + +} diff --git a/asyncTool/src/test/java/parallel/ParWorker4.java b/asyncTool/src/test/java/parallel/ParWorker4.java new file mode 100755 index 0000000..723c5f2 --- /dev/null +++ b/asyncTool/src/test/java/parallel/ParWorker4.java @@ -0,0 +1,49 @@ +package parallel; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class ParWorker4 implements IWorker, ICallback { + + @Override + public String action(String object, Map allWrappers) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "result = " + SystemClock.now() + "---param = " + object + " from 4"; + } + + + @Override + public String defaultValue() { + return "worker4--default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + if (success) { + System.out.println("callback worker4 success--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } else { + System.err.println("callback worker4 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } + } + +} diff --git a/asyncTool/src/test/java/parallel/TestPar.java b/asyncTool/src/test/java/parallel/TestPar.java new file mode 100755 index 0000000..1c2accf --- /dev/null +++ b/asyncTool/src/test/java/parallel/TestPar.java @@ -0,0 +1,868 @@ +package parallel; + + +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.concurrent.ExecutionException; + +/** + * 并行测试 + * + * @author wuweifeng wrote on 2019-11-20. + */ +public class TestPar { + public static void main(String[] args) throws Exception { + + testNormal(); +// testMulti(); +// testMultiReverse(); +// testMultiError2(); +// testMulti3(); +// testMulti3Reverse(); +// testMulti4(); +// testMulti4Reverse(); +// testMulti5(); +// testMulti5Reverse(); +// testMulti6(); +// testMulti7(); +// testMulti8(); +// testMulti9(); +// testMulti9Reverse(); + } + + /** + * 3个并行,测试不同时间的超时 + */ + private static void testNormal() throws InterruptedException, ExecutionException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + ParWorker2 w2 = new ParWorker2(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .build(); + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + Async.beginWork(1500, workerWrapper, workerWrapper1, workerWrapper2); +// Async.beginWork(800, workerWrapper, workerWrapper1, workerWrapper2); +// Async.beginWork(1000, workerWrapper, workerWrapper1, workerWrapper2); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + System.out.println(Async.getThreadCount()); + + System.out.println(workerWrapper.getWorkResult()); + Async.shutDown(); + } + + /** + * 0,2同时开启,1在0后面 + * 0---1 + * 2 + */ + private static void testMulti() throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + ParWorker2 w2 = new ParWorker2(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .next(workerWrapper1) + .build(); + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + Async.beginWork(2500, workerWrapper, workerWrapper2); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + Async.shutDown(); + } + + /** + * 0,2同时开启,1在0后面 + * 0---1 + * 2 + */ + private static void testMultiReverse() throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + ParWorker2 w2 = new ParWorker2(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .depend(workerWrapper) + .build(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .build(); + + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + Async.beginWork(2500, workerWrapper, workerWrapper2); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + Async.shutDown(); + } + + + /** + * 0,2同时开启,1在0后面. 组超时,则0和2成功,1失败 + * 0---1 + * 2 + */ + private static void testMultiError() throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + ParWorker2 w2 = new ParWorker2(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .next(workerWrapper1) + .build(); + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + Async.beginWork(1500, workerWrapper, workerWrapper2); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + Async.shutDown(); + } + + /** + * 0执行完,同时1和2, 1\2都完成后3 + * 1 + * 0 3 + * 2 + */ + private static void testMulti3() throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + ParWorker2 w2 = new ParWorker2(); + ParWorker3 w3 = new ParWorker3(); + + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("3") + .build(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .next(workerWrapper3) + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .next(workerWrapper3) + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .next(workerWrapper1, workerWrapper2) + .build(); + + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + Async.beginWork(3100, workerWrapper); +// Async.beginWork(2100, workerWrapper); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + System.out.println(Async.getThreadCount()); + Async.shutDown(); + } + + /** + * 0执行完,同时1和2, 1\2都完成后3 + * 1 + * 0 3 + * 2 + */ + private static void testMulti3Reverse() throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + ParWorker2 w2 = new ParWorker2(); + ParWorker3 w3 = new ParWorker3(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .build(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .depend(workerWrapper) + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .depend(workerWrapper) + .build(); + + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("3") + .depend(workerWrapper1, workerWrapper2) + .build(); + + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + Async.beginWork(3100, workerWrapper); +// Async.beginWork(2100, workerWrapper); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + System.out.println(Async.getThreadCount()); + Async.shutDown(); + } + + + /** + * 0执行完,同时1和2, 1\2都完成后3,2耗时2秒,1耗时1秒。3会等待2完成 + * 1 + * 0 3 + * 2 + * + * 执行结果0,1,2,3 + */ + private static void testMulti4() throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + + ParWorker2 w2 = new ParWorker2(); + w2.setSleepTime(2000); + + ParWorker3 w3 = new ParWorker3(); + + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("3") + .build(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .next(workerWrapper3) + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .next(workerWrapper3) + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .next(workerWrapper1, workerWrapper2) + .build(); + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + //正常完毕 + Async.beginWork(4100, workerWrapper); + //3会超时 +// Async.beginWork(3100, workerWrapper); + //2,3会超时 +// Async.beginWork(2900, workerWrapper); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + System.out.println(Async.getThreadCount()); + Async.shutDown(); + } + + /** + * 0执行完,同时1和2, 1\2都完成后3,2耗时2秒,1耗时1秒。3会等待2完成 + * 1 + * 0 3 + * 2 + * + * 执行结果0,1,2,3 + */ + private static void testMulti4Reverse() throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + + ParWorker2 w2 = new ParWorker2(); + w2.setSleepTime(2000); + + ParWorker3 w3 = new ParWorker3(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .build(); + + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("3") + .build(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .depend(workerWrapper) + .next(workerWrapper3) + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .depend(workerWrapper) + .next(workerWrapper3) + .build(); + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + //正常完毕 + Async.beginWork(4100, workerWrapper); + //3会超时 +// Async.beginWork(3100, workerWrapper); + //2,3会超时 +// Async.beginWork(2900, workerWrapper); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + System.out.println(Async.getThreadCount()); + Async.shutDown(); + } + + /** + * 0执行完,同时1和2, 1\2 任何一个执行完后,都执行3 + * 1 + * 0 3 + * 2 + * + * 则结果是: + * 0,2,3,1 + * 2,3分别是500、400.3执行完毕后,1才执行完 + */ + private static void testMulti5() throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + + ParWorker2 w2 = new ParWorker2(); + w2.setSleepTime(500); + + ParWorker3 w3 = new ParWorker3(); + w3.setSleepTime(400); + + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("3") + .build(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .next(workerWrapper3, false) + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .next(workerWrapper3, false) + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .next(workerWrapper1, workerWrapper2) + .build(); + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + //正常完毕 + Async.beginWork(4100, workerWrapper); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + System.out.println(Async.getThreadCount()); + Async.shutDown(); + } + + + /** + * 0执行完,同时1和2, 1\2 任何一个执行完后,都执行3 + * 1 + * 0 3 + * 2 + * + * 则结果是: + * 0,2,3,1 + * 2,3分别是500、400.3执行完毕后,1才执行完 + */ + private static void testMulti5Reverse() throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + + ParWorker2 w2 = new ParWorker2(); + w2.setSleepTime(500); + + ParWorker3 w3 = new ParWorker3(); + w3.setSleepTime(400); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .build(); + + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("3") + .build(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .depend(workerWrapper, true) + .next(workerWrapper3, false) + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .depend(workerWrapper, true) + .next(workerWrapper3, false) + .build(); + + + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + //正常完毕 + Async.beginWork(4100, workerWrapper); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + System.out.println(Async.getThreadCount()); + Async.shutDown(); + } + + /** + * 0执行完,同时1和2, 必须1执行完毕后,才能执行3. 无论2是否领先1完毕,都要等1 + * 1 + * 0 3 + * 2 + * + * 则结果是: + * 0,2,1,3 + * + * 2,3分别是500、400.2执行完了,1没完,那就等着1完毕,才能3 + */ + private static void testMulti6() throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + + ParWorker2 w2 = new ParWorker2(); + w2.setSleepTime(500); + + ParWorker3 w3 = new ParWorker3(); + w3.setSleepTime(400); + + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("3") + .build(); + + //设置2不是必须,1是必须的 + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .next(workerWrapper3) + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .next(workerWrapper3) + .build(); + + WorkerWrapper workerWrapper0 = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .next(workerWrapper2, workerWrapper1) + .build(); + + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + //正常完毕 + Async.beginWork(4100, workerWrapper0); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + System.out.println(Async.getThreadCount()); + Async.shutDown(); + } + + /** + * 两个0并行,上面0执行完,同时1和2, 下面0执行完开始1,上面的 必须1、2执行完毕后,才能执行3. 最后必须2、3都完成,才能4 + * 1 + * 0 3 + * 2 4 + * --------- + * 0 1 2 + * + * 则结果是: + * callback worker0 success--1577242870969----result = 1577242870968---param = 00 from 0-threadName:Thread-1 + * callback worker0 success--1577242870969----result = 1577242870968---param = 0 from 0-threadName:Thread-0 + * callback worker1 success--1577242871972----result = 1577242871972---param = 11 from 1-threadName:Thread-1 + * callback worker1 success--1577242871972----result = 1577242871972---param = 1 from 1-threadName:Thread-2 + * callback worker2 success--1577242871973----result = 1577242871973---param = 2 from 2-threadName:Thread-3 + * callback worker2 success--1577242872975----result = 1577242872975---param = 22 from 2-threadName:Thread-1 + * callback worker3 success--1577242872977----result = 1577242872977---param = 3 from 3-threadName:Thread-2 + * callback worker4 success--1577242873980----result = 1577242873980---param = 4 from 3-threadName:Thread-2 + */ + private static void testMulti7() throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + ParWorker2 w2 = new ParWorker2(); + ParWorker3 w3 = new ParWorker3(); + ParWorker4 w4 = new ParWorker4(); + + WorkerWrapper workerWrapper4 = new WorkerWrapper.Builder() + .worker(w4) + .callback(w4) + .param("4") + .build(); + + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("3") + .next(workerWrapper4) + .build(); + + //下面的2 + WorkerWrapper workerWrapper22 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("22") + .next(workerWrapper4) + .build(); + + //下面的1 + WorkerWrapper workerWrapper11 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("11") + .next(workerWrapper22) + .build(); + + //下面的0 + WorkerWrapper workerWrapper00 = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("00") + .next(workerWrapper11) + .build(); + + //上面的1 + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .next(workerWrapper3) + .build(); + + //上面的2 + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .next(workerWrapper3) + .build(); + + //上面的0 + WorkerWrapper workerWrapper0 = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .next(workerWrapper1, workerWrapper2) + .build(); + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + //正常完毕 + Async.beginWork(4100, workerWrapper00, workerWrapper0); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + System.out.println(Async.getThreadCount()); + Async.shutDown(); + } + + /** + * a1 -> b -> c + * a2 -> b -> c + * + * b、c + */ + private static void testMulti8() throws ExecutionException, InterruptedException { + ParWorker w = new ParWorker(); + ParWorker1 w1 = new ParWorker1(); + w1.setSleepTime(1005); + + ParWorker2 w2 = new ParWorker2(); + w2.setSleepTime(3000); + ParWorker3 w3 = new ParWorker3(); + w3.setSleepTime(1000); + + WorkerWrapper workerWrapper3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("c") + .build(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("b") + .next(workerWrapper3) + .build(); + + WorkerWrapper workerWrappera1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("a1") + .next(workerWrapper2) + .build(); + WorkerWrapper workerWrappera2 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("a2") + .next(workerWrapper2) + .build(); + + + Async.beginWork(6000, workerWrappera1, workerWrappera2); + Async.shutDown(); + } + + /** + * w1 -> w2 -> w3 + * --- last + * w + * w1和w并行,w执行完后就执行last,此时b、c还没开始,b、c就不需要执行了 + */ + private static void testMulti9() throws ExecutionException, InterruptedException { + ParWorker1 w1 = new ParWorker1(); + //注意这里,如果w1的执行时间比w长,那么w2和w3肯定不走。 如果w1和w执行时间一样长,多运行几次,会发现w2有时走有时不走 +// w1.setSleepTime(1100); + + ParWorker w = new ParWorker(); + ParWorker2 w2 = new ParWorker2(); + ParWorker3 w3 = new ParWorker3(); + ParWorker4 w4 = new ParWorker4(); + + WorkerWrapper last = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("last") + .build(); + + WorkerWrapper wrapperW = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("w") + .next(last, false) + .build(); + + WorkerWrapper wrapperW3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("w3") + .next(last, false) + .build(); + + WorkerWrapper wrapperW2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("w2") + .next(wrapperW3) + .build(); + + WorkerWrapper wrapperW1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("w1") + .next(wrapperW2) + .build(); + + Async.beginWork(6000, wrapperW, wrapperW1); + Async.shutDown(); + } + + /** + * w1 -> w2 -> w3 + * --- last + * w + * w1和w并行,w执行完后就执行last,此时b、c还没开始,b、c就不需要执行了 + */ + private static void testMulti9Reverse() throws ExecutionException, InterruptedException { + ParWorker1 w1 = new ParWorker1(); + //注意这里,如果w1的执行时间比w长,那么w2和w3肯定不走。 如果w1和w执行时间一样长,多运行几次,会发现w2有时走有时不走 +// w1.setSleepTime(1100); + + ParWorker w = new ParWorker(); + ParWorker2 w2 = new ParWorker2(); + ParWorker3 w3 = new ParWorker3(); + ParWorker4 w4 = new ParWorker4(); + + WorkerWrapper wrapperW1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("w1") + .build(); + + WorkerWrapper wrapperW = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("w") + .build(); + + WorkerWrapper last = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("last") + .depend(wrapperW) + .build(); + + WorkerWrapper wrapperW2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("w2") + .depend(wrapperW1) + .build(); + + WorkerWrapper wrapperW3 = new WorkerWrapper.Builder() + .worker(w3) + .callback(w3) + .param("w3") + .depend(wrapperW2) + .next(last, false) + .build(); + + Async.beginWork(6000, wrapperW, wrapperW1); + Async.shutDown(); + } +} diff --git a/asyncTool/src/test/java/seq/SeqTimeoutWorker.java b/asyncTool/src/test/java/seq/SeqTimeoutWorker.java new file mode 100755 index 0000000..0de5e0a --- /dev/null +++ b/asyncTool/src/test/java/seq/SeqTimeoutWorker.java @@ -0,0 +1,48 @@ +package seq; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class SeqTimeoutWorker implements IWorker, ICallback { + + @Override + public String action(String object, Map allWrappers) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "result = " + SystemClock.now() + "---param = " + object + " from 0"; + } + + @Override + public String defaultValue() { + return "worker0--default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + if (success) { + System.out.println("callback worker0 success--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } else { + System.err.println("callback worker0 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } + } + +} diff --git a/asyncTool/src/test/java/seq/SeqWorker.java b/asyncTool/src/test/java/seq/SeqWorker.java new file mode 100755 index 0000000..18c3457 --- /dev/null +++ b/asyncTool/src/test/java/seq/SeqWorker.java @@ -0,0 +1,49 @@ +package seq; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class SeqWorker implements IWorker, ICallback { + + @Override + public String action(String object, Map allWrappers) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "result = " + SystemClock.now() + "---param = " + object + " from 0"; + } + + + @Override + public String defaultValue() { + return "worker0--default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + if (success) { + System.out.println("callback worker0 success--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } else { + System.err.println("callback worker0 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } + } + +} diff --git a/asyncTool/src/test/java/seq/SeqWorker1.java b/asyncTool/src/test/java/seq/SeqWorker1.java new file mode 100755 index 0000000..ae445c6 --- /dev/null +++ b/asyncTool/src/test/java/seq/SeqWorker1.java @@ -0,0 +1,48 @@ +package seq; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class SeqWorker1 implements IWorker, ICallback { + + @Override + public String action(String object, Map allWrappers) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "result = " + SystemClock.now() + "---param = " + object + " from 0"; + } + + @Override + public String defaultValue() { + return "worker1--default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + if (success) { + System.out.println("callback worker1 success--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } else { + System.err.println("callback worker1 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } + } + +} diff --git a/asyncTool/src/test/java/seq/SeqWorker2.java b/asyncTool/src/test/java/seq/SeqWorker2.java new file mode 100755 index 0000000..34853ee --- /dev/null +++ b/asyncTool/src/test/java/seq/SeqWorker2.java @@ -0,0 +1,48 @@ +package seq; + + +import com.jd.platform.async.callback.ICallback; +import com.jd.platform.async.callback.IWorker; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.worker.WorkResult; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.Map; + +/** + * @author wuweifeng wrote on 2019-11-20. + */ +public class SeqWorker2 implements IWorker, ICallback { + + @Override + public String action(String object, Map allWrappers) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "result = " + SystemClock.now() + "---param = " + object + " from 0"; + } + + @Override + public String defaultValue() { + return "worker2--default"; + } + + @Override + public void begin() { + //System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis()); + } + + @Override + public void result(boolean success, String param, WorkResult workResult) { + if (success) { + System.out.println("callback worker2 success--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } else { + System.err.println("callback worker2 failure--" + SystemClock.now() + "----" + workResult.getResult() + + "-threadName:" +Thread.currentThread().getName()); + } + } + +} diff --git a/asyncTool/src/test/java/seq/TestSequential.java b/asyncTool/src/test/java/seq/TestSequential.java new file mode 100755 index 0000000..d4e1c67 --- /dev/null +++ b/asyncTool/src/test/java/seq/TestSequential.java @@ -0,0 +1,71 @@ +package seq; + + +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.concurrent.ExecutionException; + +/** + * 串行测试 + * @author wuweifeng wrote on 2019-11-20. + */ +public class TestSequential { + public static void main(String[] args) throws InterruptedException, ExecutionException { + + + SeqWorker w = new SeqWorker(); + SeqWorker1 w1 = new SeqWorker1(); + SeqWorker2 w2 = new SeqWorker2(); + + //顺序0-1-2 + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .next(workerWrapper2) + .build(); + + WorkerWrapper workerWrapper = new WorkerWrapper.Builder() + .worker(w) + .callback(w) + .param("0") + .next(workerWrapper1) + .build(); + +// testNormal(workerWrapper); + + testGroupTimeout(workerWrapper); + } + + private static void testNormal(WorkerWrapper workerWrapper) throws ExecutionException, InterruptedException { + long now = SystemClock.now(); + System.out.println("begin-" + now); + + Async.beginWork(3500, workerWrapper); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + Async.shutDown(); + } + + private static void testGroupTimeout(WorkerWrapper workerWrapper) throws ExecutionException, InterruptedException { + long now = SystemClock.now(); + System.out.println("begin-" + now); + + Async.beginWork(2500, workerWrapper); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + Async.shutDown(); + } +} diff --git a/asyncTool/src/test/java/seq/TestSequentialTimeout.java b/asyncTool/src/test/java/seq/TestSequentialTimeout.java new file mode 100755 index 0000000..f2b02de --- /dev/null +++ b/asyncTool/src/test/java/seq/TestSequentialTimeout.java @@ -0,0 +1,67 @@ +package seq; + + +import com.jd.platform.async.executor.Async; +import com.jd.platform.async.executor.timer.SystemClock; +import com.jd.platform.async.wrapper.WorkerWrapper; + +import java.util.concurrent.ExecutionException; + +/** + * 串行测试 + * @author wuweifeng wrote on 2019-11-20. + */ +@SuppressWarnings("Duplicates") +public class TestSequentialTimeout { + public static void main(String[] args) throws InterruptedException, ExecutionException { + testFirstTimeout(); + } + + /** + * begin-1576719450476 + * callback worker0 failure--1576719451338----worker0--default-threadName:main + * callback worker1 failure--1576719451338----worker1--default-threadName:main + * callback worker2 failure--1576719451338----worker2--default-threadName:main + * end-1576719451338 + * cost-862 + */ + private static void testFirstTimeout() throws ExecutionException, InterruptedException { + SeqWorker1 w1 = new SeqWorker1(); + SeqWorker2 w2 = new SeqWorker2(); + SeqTimeoutWorker t = new SeqTimeoutWorker(); + + WorkerWrapper workerWrapper2 = new WorkerWrapper.Builder() + .worker(w2) + .callback(w2) + .param("2") + .build(); + + WorkerWrapper workerWrapper1 = new WorkerWrapper.Builder() + .worker(w1) + .callback(w1) + .param("1") + .next(workerWrapper2) + .build(); + + //2在1后面串行 + //T会超时 + WorkerWrapper workerWrapperT = new WorkerWrapper.Builder() + .worker(t) + .callback(t) + .param("t") + .next(workerWrapper1) + .build(); + + + long now = SystemClock.now(); + System.out.println("begin-" + now); + + Async.beginWork(5000, workerWrapperT); + + System.out.println("end-" + SystemClock.now()); + System.err.println("cost-" + (SystemClock.now() - now)); + + Async.shutDown(); + } + +} diff --git a/common-common/pom.xml b/common-common/pom.xml index fc86ef4..f0e12d3 100644 --- a/common-common/pom.xml +++ b/common-common/pom.xml @@ -52,6 +52,23 @@ + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-sources + + jar + + + + + + + diff --git a/pom.xml b/pom.xml index c6e4cba..bddb002 100644 --- a/pom.xml +++ b/pom.xml @@ -14,6 +14,7 @@ archetype-standard common-common smart-datasource + asyncTool diff --git a/smart-datasource/pom.xml b/smart-datasource/pom.xml index 62f66a1..b91c5d6 100644 --- a/smart-datasource/pom.xml +++ b/smart-datasource/pom.xml @@ -40,6 +40,23 @@ + + + + org.apache.maven.plugins + maven-source-plugin + + + attach-sources + + jar + + + + + + +