diff --git a/airpower-core/src/main/java/cn/hamm/airpower/model/TaskFlow.java b/airpower-core/src/main/java/cn/hamm/airpower/model/TaskFlow.java new file mode 100644 index 0000000000000000000000000000000000000000..7f1cb00633f2c58b0de0ec86ee0bc30db3377ae6 --- /dev/null +++ b/airpower-core/src/main/java/cn/hamm/airpower/model/TaskFlow.java @@ -0,0 +1,179 @@ +package cn.hamm.airpower.model; + + +import org.jetbrains.annotations.Contract; +import org.jetbrains.annotations.NotNull; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.*; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + *

任务流程管理器

+ * + * @author Hamm.cn + */ +public class TaskFlow { + /** + *

使用的线程池服务

+ */ + private static final ExecutorService EXECUTOR = new ThreadPoolExecutor( + 1, + 5, + 60, + TimeUnit.SECONDS, + new ArrayBlockingQueue<>(10), + Executors.defaultThreadFactory(), + new ThreadPoolExecutor.AbortPolicy() + ); + + /** + *

创建一个任务

+ * + * @param data 任务传递的数据 + * @param 任务数据类型 + * @return 任务对象 + */ + public static @NotNull TaskFlow init(D data) { + return next(data); + } + + /** + *

任务的下一步

+ * + * @param data 任务数据 + * @param 任务数据类型 + * @return 任务对象 + */ + public static @NotNull TaskFlow next(D data) { + TaskFlow taskFlow = new TaskFlow<>(); + taskFlow.data = data; + return taskFlow; + } + + /** + *

任务数据

+ */ + private D data = null; + + /** + *

任务步骤

+ */ + private final List>> steps = new ArrayList<>(); + + /** + *

任务步骤处理前置方法

+ */ + private Consumer beforeStep = null; + + /** + *

任务步骤处理后置方法

+ */ + private Consumer afterStep = null; + + /** + *

任务执行成功方法

+ */ + private Consumer onSuccess = null; + + /** + *

任务执行失败方法

+ */ + private BiConsumer> onError = null; + + /** + *

添加下一步

+ * + * @param next 下一步 + * @return 任务对象 + */ + public TaskFlow next(Function> next) { + steps.add(next); + return this; + } + + /** + *

开始任务

+ */ + public final void start() { + if (steps.isEmpty()) { + if (Objects.nonNull(onSuccess)) { + onSuccess.accept(data); + } + return; + } + Function> function = steps.get(0); + steps.remove(0); + TaskFlow.EXECUTOR.execute(() -> { + try { + if (Objects.nonNull(beforeStep)) { + beforeStep.accept(data); + } + TaskFlow taskFlow = function.apply(data); + if (Objects.nonNull(afterStep)) { + afterStep.accept(data); + } + this.data = taskFlow.data; + this.start(); + } catch (Exception exception) { + if (Objects.nonNull(onError)) { + onError.accept(exception, this); + return; + } + throw exception; + } + }); + } + + /** + *

任务成功后执行

+ * + * @param onSuccess 回调方法 + * @return 任务对象 + */ + @Contract(value = "_ -> this", mutates = "this") + public final TaskFlow onSuccess(Consumer onSuccess) { + this.onSuccess = onSuccess; + return this; + } + + /** + *

任务失败后执行

+ * + * @param onError 回调方法 + * @return 任务对象 + */ + @Contract(value = "_ -> this", mutates = "this") + public final TaskFlow onError(BiConsumer> onError) { + this.onError = onError; + return this; + } + + /** + *

任务步骤执行前执行

+ * + * @param beforeStep 回调方法 + * @return 任务对象 + */ + @Contract(value = "_ -> this", mutates = "this") + public final TaskFlow beforeStep(Consumer beforeStep) { + this.beforeStep = beforeStep; + return this; + } + + /** + *

任务步骤执行后执行

+ * + * @param afterStep 回调方法 + * @return 任务对象 + */ + @Contract(value = "_ -> this", mutates = "this") + public final TaskFlow afterStep(Consumer afterStep) { + this.afterStep = afterStep; + return this; + } +} \ No newline at end of file