From 496aa0b87cad6956127a31813707e2c8aedac21e Mon Sep 17 00:00:00 2001
From: CodeCaster
Date: Wed, 11 Jun 2025 10:37:28 +0800
Subject: [PATCH 1/4] optimize pom
---
.../fit-dependency => dependency}/pom.xml | 89 +++++++
framework/fel/java/fel-flow/pom.xml | 6 +
.../fel/java/fel-jacoco-aggregator/pom.xml | 7 +-
.../pipeline/huggingface/PipelineTest.java | 2 +-
framework/fel/java/pom.xml | 4 +-
framework/fit/java/pom.xml | 1 -
framework/ohscript/pom.xml | 79 +++---
framework/pom.xml | 1 +
framework/waterflow/java/pom.xml | 39 ++-
.../exceptions/WaterflowException.java | 9 +
.../fit/waterflow/utils/Entities.java | 11 +-
.../waterflow/java/waterflow-core/pom.xml | 10 -
.../java/waterflow-dependency/pom.xml | 234 ------------------
.../waterflow/java/waterflow-eco/pom.xml | 1 +
.../waterflow-bridge-fit-reactor/pom.xml | 11 +-
15 files changed, 180 insertions(+), 324 deletions(-)
rename framework/{fit/java/fit-dependency => dependency}/pom.xml (83%)
delete mode 100644 framework/waterflow/java/waterflow-dependency/pom.xml
diff --git a/framework/fit/java/fit-dependency/pom.xml b/framework/dependency/pom.xml
similarity index 83%
rename from framework/fit/java/fit-dependency/pom.xml
rename to framework/dependency/pom.xml
index 429164a0e..bee069c2a 100644
--- a/framework/fit/java/fit-dependency/pom.xml
+++ b/framework/dependency/pom.xml
@@ -28,6 +28,13 @@
Maintainer
+
+ Song Yongtan
+ 271667068@qq.com
+
+ Committer
+
+
@@ -44,6 +51,16 @@
3.5.0-SNAPSHOT
+
+ 1.17.5
+ 1.2.20
+ 1.2.83
+ 32.0.1-jre
+ 2.3.232
+ 1.18.36
+ 2.18.2
+ 3.5.13
+
3.27.3
5.12.2
@@ -380,6 +397,78 @@
fit-mybatis-common
${fit.version}
+
+
+
+ org.fitframework.waterflow
+ waterflow-bridge-fit-reactor
+ ${fit.version}
+
+
+ org.fitframework.waterflow
+ waterflow-core
+ ${fit.version}
+
+
+ org.fitframework.waterflow
+ waterflow-common
+ ${fit.version}
+
+
+
+
+ org.fitframework.ohscript
+ ohscript
+ ${fit.version}
+
+
+
+
+ com.alibaba
+ druid
+ ${druid.version}
+
+
+ com.alibaba
+ fastjson
+ ${fastjson.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ ${jackson.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ ${jackson.version}
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson.version}
+
+
+ com.google.guava
+ guava
+ ${guava.version}
+
+
+ net.bytebuddy
+ byte-buddy
+ ${byte-buddy.version}
+
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+ provided
+
+
+ org.mybatis
+ mybatis
+ ${mybatis.version}
+
diff --git a/framework/fel/java/fel-flow/pom.xml b/framework/fel/java/fel-flow/pom.xml
index 58bc88bc2..f3af5a8a5 100644
--- a/framework/fel/java/fel-flow/pom.xml
+++ b/framework/fel/java/fel-flow/pom.xml
@@ -47,6 +47,12 @@
waterflow-bridge-fit-reactor
+
+
+ org.projectlombok
+ lombok
+
+
org.fitframework.plugin
diff --git a/framework/fel/java/fel-jacoco-aggregator/pom.xml b/framework/fel/java/fel-jacoco-aggregator/pom.xml
index bc127ae65..b2c9a2928 100644
--- a/framework/fel/java/fel-jacoco-aggregator/pom.xml
+++ b/framework/fel/java/fel-jacoco-aggregator/pom.xml
@@ -12,11 +12,6 @@
fel-jacoco-aggregator
pom
-
-
- 1.0.0-SNAPSHOT
-
-
@@ -60,7 +55,7 @@
org.jacoco
jacoco-maven-plugin
- ${jacoco.version}
+ ${jacoco.maven-plugin.version}
**/*.jar
diff --git a/framework/fel/java/fel-pipeline-core/src/test/java/modelengine/fel/pipeline/huggingface/PipelineTest.java b/framework/fel/java/fel-pipeline-core/src/test/java/modelengine/fel/pipeline/huggingface/PipelineTest.java
index bc087e931..97bfcd122 100644
--- a/framework/fel/java/fel-pipeline-core/src/test/java/modelengine/fel/pipeline/huggingface/PipelineTest.java
+++ b/framework/fel/java/fel-pipeline-core/src/test/java/modelengine/fel/pipeline/huggingface/PipelineTest.java
@@ -39,7 +39,7 @@ public class PipelineTest {
static class TestCaseProvider implements ArgumentsProvider {
@Override
public Stream extends Arguments> provideArguments(ExtensionContext extensionContext) throws IOException {
- ObjectSerializer serializer = new JacksonObjectSerializer(null, null, null);
+ ObjectSerializer serializer = new JacksonObjectSerializer(null, null, null, true);
String resourceName = "/test_case.json";
String jsonContent = content(TestCaseProvider.class, resourceName);
diff --git a/framework/fel/java/pom.xml b/framework/fel/java/pom.xml
index e6baf7a1c..146b6ee1d 100644
--- a/framework/fel/java/pom.xml
+++ b/framework/fel/java/pom.xml
@@ -22,7 +22,7 @@
Song Yongtan
- 271667068.qq.com
+ 271667068@qq.com
Committer
@@ -39,6 +39,8 @@
fel-community
fel-core
fel-flow
+ fel-jacoco-aggregator
+ fel-pipeline-core
maven-plugins
plugins
services
diff --git a/framework/fit/java/pom.xml b/framework/fit/java/pom.xml
index 41f0957ee..3643e9c15 100644
--- a/framework/fit/java/pom.xml
+++ b/framework/fit/java/pom.xml
@@ -45,7 +45,6 @@
fit-broker
fit-builtin
fit-conf
- fit-dependency
fit-extension
fit-reactor
fit-ioc
diff --git a/framework/ohscript/pom.xml b/framework/ohscript/pom.xml
index 2b8d6eb52..4e9784393 100644
--- a/framework/ohscript/pom.xml
+++ b/framework/ohscript/pom.xml
@@ -29,7 +29,7 @@
Song Yongtan
- 271667068.qq.com
+ 271667068@qq.com
Committer
@@ -50,16 +50,6 @@
3.5.0-SNAPSHOT
-
- 1.17.5
- 1.2.83
- 1.18.36
-
-
- 3.27.3
- 5.12.2
- 5.17.0
-
3.14.0
3.2.7
@@ -70,64 +60,61 @@
0.8.13
+
+
+
+ org.fitframework
+ fit-dependency
+ ${fit.version}
+ pom
+ import
+
+
+
+
-
+
- net.bytebuddy
- byte-buddy
- ${byte-buddy.version}
+ org.fitframework
+ fit-api
- com.alibaba
- fastjson
- ${fastjson.version}
+ org.fitframework.service
+ fit-http-classic
-
+
- org.projectlombok
- lombok
- ${lombok.version}
- provided
+ com.alibaba
+ fastjson
-
-
- org.fitframework
- fit-api
- ${fit.version}
+ net.bytebuddy
+ byte-buddy
- org.fitframework.service
- fit-http-classic
- ${fit.version}
+ org.projectlombok
+ lombok
+
+ org.assertj
+ assertj-core
+
org.junit.jupiter
junit-jupiter
- ${junit5.version}
- test
org.mockito
mockito-core
- ${mockito.version}
- test
-
-
- org.assertj
- assertj-core
- ${assertj.version}
- test
org.fitframework
fit-runtime
- ${fit.version}
test
@@ -135,49 +122,41 @@
org.fitframework.plugin
fit-value-fastjson
- ${fit.version}
test
org.fitframework.plugin
fit-message-serializer-cbor
- ${fit.version}
test
org.fitframework.plugin
fit-message-serializer-json-jackson
- ${fit.version}
test
org.fitframework.plugin
fit-http-server-netty
- ${fit.version}
test
org.fitframework.plugin
fit-server-http
- ${fit.version}
test
org.fitframework.plugin
fit-http-handler-registry
- ${fit.version}
test
org.fitframework.plugin
fit-client-http
- ${fit.version}
test
org.fitframework.plugin
fit-http-client-okhttp
- ${fit.version}
test
diff --git a/framework/pom.xml b/framework/pom.xml
index 7ca5826ef..2b8a71c7c 100644
--- a/framework/pom.xml
+++ b/framework/pom.xml
@@ -9,6 +9,7 @@
pom
+ dependency
fel/java
fit/java
ohscript
diff --git a/framework/waterflow/java/pom.xml b/framework/waterflow/java/pom.xml
index ee8d0adbf..3d69ca886 100644
--- a/framework/waterflow/java/pom.xml
+++ b/framework/waterflow/java/pom.xml
@@ -22,7 +22,7 @@
Song Yongtan
- 271667068.qq.com
+ 271667068@qq.com
Committer
@@ -38,7 +38,6 @@
waterflow-common
waterflow-core
- waterflow-dependency
waterflow-eco
@@ -49,13 +48,14 @@
3.5.0-SNAPSHOT
- 3.5.0-SNAPSHOT
3.1.0
3.14.0
3.8.1
+ 3.2.7
3.4.2
+ 3.11.2
3.5.3
3.3.1
0.7.0
@@ -65,9 +65,9 @@
- org.fitframework.waterflow
- waterflow-dependency
- ${waterflow.version}
+ org.fitframework
+ fit-dependency
+ ${fit.version}
pom
import
@@ -91,6 +91,33 @@
+
+ org.apache.maven.plugins
+ maven-gpg-plugin
+ ${maven.gpg.version}
+
+
+ sign-artifacts
+ deploy
+
+ sign
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+ ${maven.javadoc.version}
+
+
+ attach-javadocs
+
+ jar
+
+
+
+
org.apache.maven.plugins
maven-source-plugin
diff --git a/framework/waterflow/java/waterflow-common/src/main/java/modelengine/fit/waterflow/exceptions/WaterflowException.java b/framework/waterflow/java/waterflow-common/src/main/java/modelengine/fit/waterflow/exceptions/WaterflowException.java
index 1794dd7a1..10e6a748c 100644
--- a/framework/waterflow/java/waterflow-common/src/main/java/modelengine/fit/waterflow/exceptions/WaterflowException.java
+++ b/framework/waterflow/java/waterflow-common/src/main/java/modelengine/fit/waterflow/exceptions/WaterflowException.java
@@ -18,6 +18,10 @@
* @since 1.0
*/
public class WaterflowException extends FitException {
+ /**
+ * Represents additional arguments associated with this exception.
+ * These arguments can be used for logging, debugging, or custom error handling.
+ */
private Object[] args;
/**
@@ -52,6 +56,11 @@ public WaterflowException(Throwable cause, ErrorCodes error, Object... args) {
this.args = args;
}
+ /**
+ * Returns the additional arguments associated with this exception.
+ *
+ * @return An array of objects representing the arguments.
+ */
public Object[] getArgs() {
return args;
}
diff --git a/framework/waterflow/java/waterflow-common/src/main/java/modelengine/fit/waterflow/utils/Entities.java b/framework/waterflow/java/waterflow-common/src/main/java/modelengine/fit/waterflow/utils/Entities.java
index 661d644cb..0c4e422c6 100644
--- a/framework/waterflow/java/waterflow-common/src/main/java/modelengine/fit/waterflow/utils/Entities.java
+++ b/framework/waterflow/java/waterflow-common/src/main/java/modelengine/fit/waterflow/utils/Entities.java
@@ -193,7 +193,7 @@ public static boolean match(String expectedId, String actualId) {
* @param map1 The first map to compare (may be null).
* @param map2 The second map to compare (may be null).
* @return {@code true} If the maps are equal according to the specified criteria,
- * {@code false} otherwise.
+ * {@code false} otherwise.
*/
public static boolean equals(Map map1, Map map2) {
if (map1 == null) {
@@ -222,13 +222,10 @@ public static boolean equals(Map map1, Map map2) {
* Note: This implementation considers [1,2,2] and [1,1,2] as equal due to set conversion.
*
* @param The type of elements in the lists.
- * @param list1 The first list to compare (may be null).
- * @param list2 The second list to compare (may be null).
+ * @param list1 The first list to compare (maybe null).
+ * @param list2 The second list to compare (maybe null).
* @return {@code true} If the lists contain the same elements regardless of order,
- * {@code false} otherwise.
- * @apiNote This method performs a set-based comparison, which means it doesn't preserve
- * element ordering or duplicate counts. For strict list equality that considers
- * order and duplicates, use {@link List#equals}.
+ * {@code false} otherwise.
*/
public static boolean equals(List list1, List list2) {
if (list1 == null) {
diff --git a/framework/waterflow/java/waterflow-core/pom.xml b/framework/waterflow/java/waterflow-core/pom.xml
index da80900d8..0c98ce2f4 100644
--- a/framework/waterflow/java/waterflow-core/pom.xml
+++ b/framework/waterflow/java/waterflow-core/pom.xml
@@ -31,15 +31,5 @@
org.mockito
mockito-core
-
- org.mockito
- mockito-junit-jupiter
-
-
- junit-jupiter-api
- org.junit.jupiter
-
-
-
\ No newline at end of file
diff --git a/framework/waterflow/java/waterflow-dependency/pom.xml b/framework/waterflow/java/waterflow-dependency/pom.xml
deleted file mode 100644
index 23a240dfd..000000000
--- a/framework/waterflow/java/waterflow-dependency/pom.xml
+++ /dev/null
@@ -1,234 +0,0 @@
-
-
- 4.0.0
-
- org.fitframework.waterflow
- waterflow-dependency
- 3.5.0-SNAPSHOT
- pom
-
- Dependency Management of A Reactive Process Engine combining Traditional BPM Capabilities
- https://github.com/ModelEngine-Group/fit-framework
-
-
-
- MIT License
- https://opensource.org/licenses/MIT
- repo
-
-
-
-
-
- Song Yongtan
- 271667068.qq.com
-
- Committer
-
-
-
-
-
- scm:git:git://github.com/ModelEngine-Group/fit-framework.git
- scm:git:ssh://github.com/ModelEngine-Group/fit-framework.git
- https://github.com/ModelEngine-Group/fit-framework
-
-
-
- UTF-8
- UTF-8
- 17
-
-
- 3.5.0-SNAPSHOT
- 3.5.0-SNAPSHOT
- 3.5.0-SNAPSHOT
-
-
- 1.2.20
- 1.2.83
- 32.0.1-jre
- 2.3.232
- 1.18.36
- 2.18.2
- 3.5.13
-
-
- 3.27.3
- 5.12.2
- 5.17.0
- 5.9.0
-
-
-
-
-
-
- org.fitframework
- fit-api
- ${fit.version}
-
-
- org.fitframework.service
- fit-http-classic
- ${fit.version}
-
-
- org.fitframework.integration
- fit-mybatis
- ${fit.version}
-
-
- org.fitframework
- fit-reactor
- ${fit.version}
-
-
- org.fitframework.extension
- fit-schedule
- ${fit.version}
-
-
- org.fitframework.service
- fit-service-registry-and-discovery
- ${fit.version}
-
-
- org.fitframework.extension
- fit-transaction
- ${fit.version}
-
-
- org.fitframework
- fit-util
- ${fit.version}
-
-
- org.fitframework.service
- fit-security
- ${fit.version}
-
-
-
-
- org.fitframework.waterflow
- waterflow-bridge-fit-reactor
- ${waterflow.version}
-
-
- org.fitframework.waterflow
- waterflow-core
- ${waterflow.version}
-
-
- org.fitframework.waterflow
- waterflow-common
- ${waterflow.version}
-
-
-
-
- org.fitframework.ohscript
- ohscript
- ${ohscript.version}
-
-
-
-
- com.alibaba
- druid
- ${druid.version}
-
-
- com.alibaba
- fastjson
- ${fastjson.version}
-
-
- org.projectlombok
- lombok
- ${lombok.version}
-
-
-
- com.fasterxml.jackson.core
- jackson-annotations
- ${jackson.version}
-
-
- com.fasterxml.jackson.core
- jackson-core
- ${jackson.version}
-
-
- com.fasterxml.jackson.core
- jackson-databind
- ${jackson.version}
-
-
- org.projectlombok
- lombok
- ${lombok.version}
- provided
-
-
- org.mybatis
- mybatis
- ${mybatis.version}
-
-
-
-
- org.junit.jupiter
- junit-jupiter
- ${junit5.version}
- test
-
-
- org.mockito
- mockito-core
- ${mockito.version}
- test
-
-
- org.mockito
- mockito-junit-jupiter
- ${mockito-junit-jupiter.version}
- test
-
-
- org.assertj
- assertj-core
- ${assertj.version}
- test
-
-
- com.google.guava
- guava
- ${guava.version}
- test
-
-
- com.h2database
- h2
- ${h2.version}
- test
-
-
-
-
-
-
-
- org.sonatype.central
- central-publishing-maven-plugin
- 0.7.0
- true
-
- central
-
-
-
-
-
diff --git a/framework/waterflow/java/waterflow-eco/pom.xml b/framework/waterflow/java/waterflow-eco/pom.xml
index c9bc340f1..aa065a2d6 100644
--- a/framework/waterflow/java/waterflow-eco/pom.xml
+++ b/framework/waterflow/java/waterflow-eco/pom.xml
@@ -2,6 +2,7 @@
4.0.0
+
org.fitframework.waterflow
waterflow-parent
diff --git a/framework/waterflow/java/waterflow-eco/waterflow-bridge-fit-reactor/pom.xml b/framework/waterflow/java/waterflow-eco/waterflow-bridge-fit-reactor/pom.xml
index 5942fdc2b..a7d8cd5e9 100644
--- a/framework/waterflow/java/waterflow-eco/waterflow-bridge-fit-reactor/pom.xml
+++ b/framework/waterflow/java/waterflow-eco/waterflow-bridge-fit-reactor/pom.xml
@@ -29,14 +29,9 @@
mockito-core
- org.mockito
- mockito-junit-jupiter
-
-
- junit-jupiter-api
- org.junit.jupiter
-
-
+ org.projectlombok
+ lombok
+ test
From 920e7d61bfaf9b6063e600f1c616f2162e4b3b15 Mon Sep 17 00:00:00 2001
From: CodeCaster
Date: Wed, 11 Jun 2025 14:40:44 +0800
Subject: [PATCH 2/4] optimize javadoc
---
.../domain/context/CompleteContext.java | 9 ++
.../domain/context/FlatMapWindow.java | 10 ++
.../waterflow/domain/context/FlowContext.java | 34 ++++-
.../waterflow/domain/context/FlowSession.java | 10 +-
.../waterflow/domain/context/FlowTrace.java | 3 +
.../waterflow/domain/context/MatchWindow.java | 15 +++
.../domain/context/StateContext.java | 1 +
.../fit/waterflow/domain/context/Window.java | 24 +++-
.../waterflow/domain/context/WindowToken.java | 21 ++++
.../repo/flowcontext/FlowContextRepo.java | 99 ++++++++-------
.../context/repo/flowtrace/FlowTraceRepo.java | 2 +-
.../domain/enums/FlowDefinitionStatus.java | 7 ++
.../waterflow/domain/enums/FlowLogType.java | 11 ++
.../domain/enums/FlowNodeStatus.java | 29 ++++-
.../domain/enums/FlowNodeTriggerMode.java | 9 ++
.../waterflow/domain/enums/FlowNodeType.java | 33 ++++-
.../domain/enums/FlowTraceStatus.java | 24 +++-
.../waterflow/domain/enums/ParallelMode.java | 10 +-
.../waterflow/domain/enums/ProcessType.java | 7 ++
.../domain/events/FlowCallbackEvent.java | 6 +
.../domain/events/FlowTaskCreatedEvent.java | 23 ++++
.../fit/waterflow/domain/flow/Flow.java | 3 +
.../fit/waterflow/domain/states/Activity.java | 5 +
.../waterflow/domain/states/Conditions.java | 1 +
.../waterflow/domain/states/DataStart.java | 25 ++++
.../fit/waterflow/domain/states/Fork.java | 9 +-
.../domain/states/MatchToHappen.java | 2 +
.../fit/waterflow/domain/states/Parallel.java | 1 +
.../fit/waterflow/domain/states/Start.java | 7 ++
.../domain/stream/callbacks/ToCallback.java | 5 +
.../waterflow/domain/stream/nodes/Blocks.java | 11 +-
.../domain/stream/nodes/FlatMapNode.java | 9 ++
.../waterflow/domain/stream/nodes/From.java | 33 +++--
.../waterflow/domain/stream/nodes/Node.java | 6 +-
.../domain/stream/nodes/Retryable.java | 6 +
.../fit/waterflow/domain/stream/nodes/To.java | 116 +++++++++++-------
.../domain/stream/operators/Operators.java | 14 +--
.../operators/SessionWindowCondition.java | 2 -
.../domain/stream/operators/WindowArg.java | 12 +-
.../domain/stream/reactive/Callback.java | 10 +-
.../domain/stream/reactive/Processor.java | 5 +-
.../domain/stream/reactive/Publisher.java | 55 +++++----
.../domain/stream/reactive/Subscriber.java | 2 +-
.../domain/stream/reactive/Subscription.java | 5 +-
.../domain/stream/reactive/When.java | 2 +
.../waterflow/domain/utils/IdGenerator.java | 8 ++
46 files changed, 570 insertions(+), 171 deletions(-)
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/CompleteContext.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/CompleteContext.java
index a96a7790e..7d28698f7 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/CompleteContext.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/CompleteContext.java
@@ -16,6 +16,15 @@
* @since 1.0
*/
public class CompleteContext extends FlowContext {
+ /**
+ * 构造一个 {@link CompleteContext} 实例。
+ *
+ * 该构造函数用于在 session window complete 时创建一个结束上下文,通知 reduce 节点结束累积操作。
+ *
+ *
+ * @param context 表示当前上下文的 {@link FlowContext}。
+ * @param position 表示上下文当前所处位置的 {@link String}。
+ */
public CompleteContext(FlowContext context, String position) {
super(context.getStreamId(), context.getRootId(), null, context.getTraceId(), position,
context.getParallel(), context.getParallelMode(), context.getSession());
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlatMapWindow.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlatMapWindow.java
index cd3ea7298..09824ab93 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlatMapWindow.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlatMapWindow.java
@@ -31,6 +31,11 @@ public class FlatMapWindow extends Window {
@Getter
private Window source;
+ /**
+ * 创建一个flatmap window
+ *
+ * @param from flatmap source window
+ */
public FlatMapWindow(FlatMapSourceWindow from) {
super();
this.from = from;
@@ -112,6 +117,11 @@ public Object acc() {
return this.from.acc();
}
+ /**
+ * 设置acc
+ *
+ * @param acc acc
+ */
@Override
public void setAcc(Object acc) {
this.from.setAcc(acc);
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowContext.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowContext.java
index ca0cda984..8184bc5ba 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowContext.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowContext.java
@@ -150,9 +150,10 @@ public class FlowContext extends IdGenerator implements StateContext {
* @param data 表示上下文里所带数据的 {@link T}。
* @param traceId 表示路径唯一标识的 {@link Set}{@code <}的{@link String}{@code >}。
* @param position 表示上下文当前所处的位置的 {@link String}。
+ * @param session 表示上下文会话信息的 {@link FlowSession}。
*/
public FlowContext(String streamId, String rootId, T data, Set traceId, String position,
- FlowSession session) {
+ FlowSession session) {
this(streamId, rootId, data, traceId, position, "", "", session);
}
@@ -166,6 +167,7 @@ public FlowContext(String streamId, String rootId, T data, Set traceId,
* @param position 表示上下文当前所处的位置的 {@link String}。
* @param parallel 表示并行节点唯一标识的 {@link String}。
* @param parallelMode 表示并行模式的 {@link String}。
+ * @param session 表示上下文会话信息的 {@link FlowSession}。
*/
public FlowContext(String streamId, String rootId, T data, Set traceId, String position, String parallel,
String parallelMode, FlowSession session) {
@@ -182,7 +184,7 @@ public FlowContext(String streamId, String rootId, T data, Set traceId,
this.index = this.createIndex(); // 0起始,说明保序
}
- private Integer createIndex(){
+ private Integer createIndex() {
return session.preserved() ? session.getWindow().tokenCount() : -1;
}
@@ -261,13 +263,20 @@ public FlowContext toBatch(String toBatchId) {
/**
* 在 map,reduce,produce 的过程中把大多数上一个 context 的内容复制给下一个。
*
+ * @param 表示返回值类型的泛型参数。
* @param data 表示处理后数据的 {@link R}。
* @param position 表示处理后所处的节点的 {@link String}。
* @return 表示新的上下文的 {@link FlowContext}{@code <}{@link R}{@code >}。
*/
public FlowContext generate(R data, String position) {
- FlowContext context = new FlowContext<>(this.streamId, this.rootId, data, this.traceId, this.position,
- this.parallel, this.parallelMode, this.session);
+ FlowContext context = new FlowContext<>(this.streamId,
+ this.rootId,
+ data,
+ this.traceId,
+ this.position,
+ this.parallel,
+ this.parallelMode,
+ this.session);
context.position = position;
context.previous = this.id;
context.batchId = this.batchId;
@@ -278,6 +287,7 @@ public FlowContext generate(R data, String position) {
/**
* 在 map,reduce,produce 的过程中把大多数上一个 context 的内容复制给下一个。
*
+ * @param 表示返回值类型的泛型参数。
* @param dataList 表示处理后数据的 {@link List}{@code <}{@link R}{@code >}。
* @param position 表示处理后所处节点的 {@link String}。
* @return 表示新的上下文的 {@link List}{@code <}{@link FlowContext}{@code <}{@link R}{@code >}{@code >}。
@@ -289,13 +299,20 @@ public List> generate(List dataList, String position) {
/**
* 用于 when.convert 数据时候的转换 context,除了包裹的数据类型不一样,所有其他信息都一样。
*
+ * @param 表示返回值类型的泛型参数。
* @param data 表示转换后数据的 {@link R}。
* @param id 表示 contextId 的 {@link String}。
* @return 表示转换后的 context 的 {@link FlowContext}{@code <}{@link R}{@code >}。
*/
public FlowContext convertData(R data, String id) {
- FlowContext context = new FlowContext<>(this.streamId, this.rootId, data, this.traceId, this.position,
- this.parallel, this.parallelMode, this.session);
+ FlowContext context = new FlowContext<>(this.streamId,
+ this.rootId,
+ data,
+ this.traceId,
+ this.position,
+ this.parallel,
+ this.parallelMode,
+ this.session);
context.previous = this.previous;
context.status = this.status;
context.id = id;
@@ -336,6 +353,11 @@ public Object keyBy() {
return this.session.keyBy();
}
+ /**
+ * 获取window
+ *
+ * @return 窗口
+ */
public Window getWindow() {
return this.getSession().getWindow();
}
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowSession.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowSession.java
index 2e833c3cc..81539f4f6 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowSession.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowSession.java
@@ -12,7 +12,6 @@
import modelengine.fitframework.util.ObjectUtils;
import java.util.Map;
-import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@@ -99,7 +98,7 @@ public FlowSession(String id) {
/**
* 构造方法,使用指定的 ID 和保序标识。
*
- * @param id session 的唯一标识
+ * @param id session 的唯一标识
* @param preserved 是否保序
*/
public FlowSession(String id, boolean preserved) {
@@ -131,7 +130,7 @@ public FlowSession(FlowSession session) {
* @param session the original {@link FlowSession} to copy properties from.
* @param window the {@link Window} configuration to apply to the new session.
* @return a new {@link FlowSession} instance with properties copied from the original session.
- * and the specified window configuration applied
+ * and the specified window configuration applied
*/
public static FlowSession from(FlowSession session, Window window) {
FlowSession newSession = new FlowSession(session.getId(), session.preserved);
@@ -149,7 +148,7 @@ public static FlowSession from(FlowSession session, Window window) {
* @param session the original {@link FlowSession} to copy state from.
* @param preserved {@code boolean} indicates whether the new session should be created as a preserved session.
* @return a new root-level {@link FlowSession} initialized with the specified preservation state.
- * and containing copied state from the original session
+ * and containing copied state from the original session
*/
public static FlowSession newRootSession(FlowSession session, boolean preserved) {
FlowSession newSession = new FlowSession(preserved);
@@ -187,6 +186,7 @@ public Window getWindow() {
/**
* 设置当前 session 的window对象,并确保window关联到当前 session。
*
+ * @param 泛型类型,表示上下文的数据类型
* @param window 要设置的 Window 实例
*/
public void setWindow(Window window) {
@@ -258,6 +258,7 @@ public void setState(String key, Object value) {
/**
* 获取指定键的内置上下文数据。
*
+ * @param 泛型类型,表示上下文的数据类型
* @param key 表示键的 {@link String}。
* @return 上下文数据 {@link R}。
*/
@@ -319,6 +320,7 @@ private void copyState(FlowSession session) {
/**
* 开始当前 session 的窗口,如果窗口尚未初始化,则创建一个新的 Window 实例并关联到当前 session。
*
+ * @param 泛型类型,表示上下文的数据类型
* @return 当前的 Window 实例
*/
public Window begin() {
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowTrace.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowTrace.java
index 20966a6fe..6dd0b811f 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowTrace.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowTrace.java
@@ -74,6 +74,9 @@ public class FlowTrace extends IdGenerator {
*/
private FlowTraceStatus status = FlowTraceStatus.READY;
+ /**
+ * 默认构造函数
+ */
public FlowTrace() {
contextPool = new HashSet<>();
}
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java
index 684daddb9..261ff5f50 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java
@@ -25,12 +25,27 @@ public class MatchWindow extends Window {
private final Set arms = new HashSet<>();
+ /**
+ * 创建一个MatchWindow
+ *
+ * @param source 源窗口
+ * @param id 窗口ID
+ * @param data 窗口数据
+ */
public MatchWindow(Window source, UUID id, Object data) {
super(inputs -> false, id);
this.from = source;
source.addTo(this);
}
+ /**
+ * 创建一个MatchWindow
+ *
+ * @param source 源窗口
+ * @param id 窗口ID
+ * @param data 窗口数据
+ * @return 返回创建的MatchWindow对象
+ */
public static synchronized MatchWindow from(Window source, UUID id, Object data) {
MatchWindow window = all.get(id.toString());
if (window == null) {
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/StateContext.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/StateContext.java
index 6ad0c9048..9cdf66b59 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/StateContext.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/StateContext.java
@@ -16,6 +16,7 @@ public interface StateContext {
* 获取指定key的上下文数据
*
* @param key 指定key
+ * @param 返回值的类型
* @return 上下文数据
*/
R getState(String key);
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/Window.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/Window.java
index a5aa805b8..e9fe3a10f 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/Window.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/Window.java
@@ -78,15 +78,29 @@ public class Window implements Completable {
private To node = null;
+ /**
+ * 创建窗口
+ *
+ * @param condition 窗口条件
+ * @param id 窗口ID
+ */
public Window(Operators.WindowCondition condition, UUID id) {
this.condition = condition;
this.id = id;
}
+ /**
+ * 创建窗口
+ *
+ * @param condition 窗口条件
+ */
public Window(Operators.WindowCondition condition) {
this(condition, UUID.randomUUID());
}
+ /**
+ * 创建窗口
+ */
public Window() {
this(arg -> false);
}
@@ -94,7 +108,7 @@ public Window() {
/**
* 待删除
*
- * @return
+ * @return 待删除的token数量
*/
public int getTosSize() {
return tos.size();
@@ -133,7 +147,8 @@ public String id() {
* @return 是否到达
*/
public boolean fulfilled() {
- WindowArg arg = new WindowArg(this.isComplete(), this.tokens.size(),
+ WindowArg arg = new WindowArg(this.isComplete(),
+ this.tokens.size(),
this.tokens.stream().filter(t -> !t.initialized() && !t.isReduced()).count(),
Duration.between(this.now.get(), LocalDateTime.now()));
// consuming and consumed are all counted
@@ -314,7 +329,8 @@ public void setCompleteHook(To to, FlowContext context) {
}
/**
- * if this session window is closed and all elements have been consumed, then notify listener stream that i'm totally consumed
+ * if this session window is closed and all elements have been consumed, then notify listener stream that i'm
+ * totally consumed
**/
public void tryFinish() {
synchronized (this) {
@@ -342,7 +358,7 @@ public Integer tokenCount() {
/**
* 待删除
*
- * @return
+ * @return token数量
*/
public synchronized String debugTokens() {
return this.tokens.hashCode() + "-" + this.tokens.stream()
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/WindowToken.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/WindowToken.java
index 3f6be10b6..b8206897a 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/WindowToken.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/WindowToken.java
@@ -19,13 +19,29 @@ public class WindowToken {
* 状态枚举
*/
enum Status {
+ /**
+ * 初始化
+ */
INITIALIZED,
+
+ /**
+ * 正在处理
+ */
CONSUMING,
+
+ /**
+ * 已处理完成
+ */
CONSUMED
}
private final Window window;
+ /**
+ * 状态
+ *
+ * @return 状态
+ */
public Status getStatus() {
return this.status;
}
@@ -34,6 +50,11 @@ public Status getStatus() {
private boolean reduced;
+ /**
+ * 构造函数
+ *
+ * @param window 窗口
+ */
public WindowToken(Window window) {
this.window = window;
}
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java
index ac7bcef37..aab19d6c2 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java
@@ -7,11 +7,10 @@
package modelengine.fit.waterflow.domain.context.repo.flowcontext;
import modelengine.fit.waterflow.ErrorCodes;
-import modelengine.fit.waterflow.exceptions.WaterflowException;
import modelengine.fit.waterflow.domain.context.FlowContext;
import modelengine.fit.waterflow.domain.context.FlowTrace;
-import modelengine.fit.waterflow.domain.enums.FlowNodeStatus;
import modelengine.fit.waterflow.domain.stream.operators.Operators;
+import modelengine.fit.waterflow.exceptions.WaterflowException;
import java.util.List;
import java.util.Map;
@@ -28,94 +27,105 @@ public interface FlowContextRepo {
/**
* 人工任务节点拉取边上的上下文,在节点的preprocess中处理
*
+ * @param 泛型类型,表示上下文的数据类型
* @param streamId 版本ID
- * @param posIds posId
- * @param status status
- * @return List>
+ * @param posIds posId列表
+ * @param status 状态
+ * @return 上下文列表
*/
List> getContextsByPosition(String streamId, List posIds, String status);
/**
* 获取节点处理完后产生的新的context,发送给下个节点处理,后续可以判断是否删除该方法
*
+ * @param 泛型类型,表示上下文的数据类型
* @param streamId 版本ID
* @param posId posId
* @param batchId 批次ID
- * @param status status
- * @return List>
+ * @param status 状态
+ * @return 上下文列表
*/
List> getContextsByPosition(String streamId, String posId, String batchId, String status);
/**
- * getContextsByTrace
+ * 根据traceId获取上下文
*
- * @param traceId transId
- * @return List>
+ * @param 泛型类型,表示上下文的数据类型
+ * @param traceId traceId
+ * @return 上下文列表
*/
List> getContextsByTrace(String traceId);
/**
* 批量保存context
*
- * @param contexts contexts
+ * @param 泛型类型,表示上下文的数据类型
+ * @param contexts 上下文列表
*/
void save(List> contexts);
/**
* 批量更新context的内容,不更新status和position
*
- * @param contexts contexts
+ * @param 泛型类型,表示上下文的数据类型
+ * @param contexts 上下文列表
*/
default void update(List> contexts) {
save(contexts);
}
/**
- * updateToSent
+ * 更新context的状态为已发送
*
- * @param contexts contexts
+ * @param 泛型类型,表示上下文的数据类型
+ * @param contexts 上下文列表
*/
void updateToSent(List> contexts);
/**
- * getContextsByParallel
+ * 根据parallelId获取上下文
*
- * @param parallelId parallelId
- * @return List>
+ * @param 泛型类型,表示上下文的数据类型
+ * @param parallelId 并行ID
+ * @return 上下文列表
*/
List> getContextsByParallel(String parallelId);
/**
- * getById
+ * 根据id获取上下文
*
- * @param id id
- * @return FlowContext
+ * @param 泛型类型,表示上下文的数据类型
+ * @param id 上下文ID
+ * @return 上下文对象
*/
FlowContext getById(String id);
/**
- * 根据ids查找FlowContext
+ * 根据ids查找FlowContext
*
- * @param ids ids
- * @return List>
+ * @param 泛型类型,表示上下文的数据类型
+ * @param ids 上下文ID列表
+ * @return 上下文列表
*/
List> getByIds(List ids);
/**
* 查找和指定一批ID对应的状态为PENDING且SENT了的流程上下文
*
- * @param ids ids
- * @return List>
+ * @param 泛型类型,表示上下文的数据类型
+ * @param ids 上下文ID列表
+ * @return 上下文列表
*/
List> getPendingAndSentByIds(List ids);
/**
* 查找map节点所有from事件上待处理的上下文
*
+ * @param 泛型类型,表示上下文的数据类型
* @param streamId 流程版本ID
- * @param subscriptions from事件的事件ID
+ * @param subscriptions from事件的事件ID列表
* @param sessions 涉及保序的sessions
- * @return 待处理的上下文
+ * @return 待处理的上下文列表
*/
List> requestMappingContext(String streamId, List subscriptions,
Map sessions);
@@ -123,10 +133,11 @@ List> requestMappingContext(String streamId, List sub
/**
* 查找produce节点所有from事件上待处理的上下文
*
+ * @param 泛型类型,表示上下文的数据类型
* @param streamId 流程版本ID
- * @param subscriptions from事件的事件ID
+ * @param subscriptions from事件的事件ID列表
* @param filter filter校验器
- * @return 待处理的上下文
+ * @return 待处理的上下文列表
*/
List> requestProducingContext(String streamId, List subscriptions,
Operators.Filter filter);
@@ -134,9 +145,10 @@ List> requestProducingContext(String streamId, List s
/**
* 查找流程对应版本所有上下文
*
+ * @param 泛型类型,表示上下文的数据类型
* @param metaId 流程metaId标识
* @param version 流程对应版本
- * @return 对应所有上下文
+ * @return 对应所有上下文列表
*/
default List> findByStreamId(String metaId, String version) {
throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "findByStreamId");
@@ -145,9 +157,10 @@ default List> findByStreamId(String metaId, String version) {
/**
* 查找流程对应版本正在运行的上下文
*
+ * @param 泛型类型,表示上下文的数据类型
* @param metaId metaId 流程metaId标识
* @param version 流程对应版本
- * @return 对应所有上下文
+ * @return 对应所有上下文列表
*/
default List> findRunningContextByMetaId(String metaId, String version) {
throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "findRunningContextByMetaId");
@@ -166,8 +179,9 @@ default void delete(String metaId, String version) {
/**
* 批量更新trace的contextPool
*
- * @param after context
- * @param traces 需要更新的tranceId列表
+ * @param 泛型类型,表示上下文的数据类型
+ * @param after 更新后的上下文列表
+ * @param traces 需要更新的traceId列表
*/
default void updateContextPool(List> after, Set traces) {
save(after);
@@ -176,7 +190,8 @@ default void updateContextPool(List> after, Set trace
/**
* 保存contexts
*
- * @param trace 對應的trace
+ * @param 泛型类型,表示上下文的数据类型
+ * @param trace 对应的trace
* @param flowContext 待保存的contexts
*/
void save(FlowTrace trace, FlowContext flowContext);
@@ -184,15 +199,17 @@ default void updateContextPool(List> after, Set trace
/**
* 批量更新context的上下文数据flowData字段
*
- * @param contexts contexts
+ * @param 泛型类型,表示上下文的数据类型
+ * @param contexts 上下文列表
*/
void updateFlowData(List> contexts);
/**
* 批量更新context的status和position
*
- * @param contexts contexts
- * @param status 状态 {@link FlowNodeStatus}
+ * @param 泛型类型,表示上下文的数据类型
+ * @param contexts 上下文列表
+ * @param status 状态
* @param position 位置
*/
default void updateStatus(List> contexts, String status, String position) {
@@ -202,7 +219,7 @@ default void updateStatus(List> contexts, String status, Stri
/**
* 更新context和trace的状态
*
- * @param traceIds traceIds
+ * @param traceIds traceIds列表
*/
default void updateToTerminated(List traceIds) {
}
@@ -210,8 +227,8 @@ default void updateToTerminated(List traceIds) {
/**
* 判断trace终止
*
- * @param traceIds traceIds
- * @return boolean
+ * @param traceIds traceIds列表
+ * @return 是否终止
*/
default boolean isTracesTerminate(List traceIds) {
return false;
@@ -220,8 +237,8 @@ default boolean isTracesTerminate(List traceIds) {
/**
* 更新序号
*
+ * @param 泛型类型,表示上下文的数据类型
* @param contexts 上下文信息
- * @param 数据类型
*/
void updateIndex(List> contexts);
}
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowtrace/FlowTraceRepo.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowtrace/FlowTraceRepo.java
index 93801ffee..e64569989 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowtrace/FlowTraceRepo.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowtrace/FlowTraceRepo.java
@@ -37,7 +37,7 @@ public interface FlowTraceRepo {
* 根据 ids 查找FlowTrace
*
* @param ids traceId列表
- * @return List
+ * @return List
*/
List getByIds(List ids);
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowDefinitionStatus.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowDefinitionStatus.java
index 98bc1eb68..6fd50b5e7 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowDefinitionStatus.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowDefinitionStatus.java
@@ -21,7 +21,14 @@
*/
@Getter
public enum FlowDefinitionStatus {
+ /**
+ * 流程定义处于激活状态
+ */
ACTIVE("active"),
+
+ /**
+ * 流程定义处于非激活状态
+ */
INACTIVE("inactive");
private final String code;
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowLogType.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowLogType.java
index 759ea7c95..1e3940e29 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowLogType.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowLogType.java
@@ -13,7 +13,18 @@
* @since 1.0
*/
public enum FlowLogType {
+ /**
+ * 信息
+ */
INFO,
+
+ /**
+ * 警告
+ */
WARNING,
+
+ /**
+ * 错误
+ */
ERROR
}
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java
index 565d9ef98..5a637ceec 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java
@@ -14,11 +14,38 @@
* @since 1.0
*/
public enum FlowNodeStatus {
+ /**
+ * 新创建的节点状态
+ */
NEW,
+
+ /**
+ * 节点处于等待状态,停留在事件边上
+ */
PENDING,
+
+ /**
+ * 节点已准备好,但尚未更新数据库
+ */
READY, // 未更新数据库
+
+ /**
+ * 节点正在处理中,但尚未更新数据库
+ */
PROCESSING, // 未更新数据库
+
+ /**
+ * 节点已完成处理
+ */
ARCHIVED,
+
+ /**
+ * 节点已终止
+ */
TERMINATE,
+
+ /**
+ * 节点处理过程中发生错误
+ */
ERROR
-}
+}
\ No newline at end of file
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeTriggerMode.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeTriggerMode.java
index 879da7e88..fd19a4b58 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeTriggerMode.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeTriggerMode.java
@@ -16,7 +16,16 @@
*/
@Getter
public enum FlowNodeTriggerMode {
+ /**
+ * 定义自动模式常量
+ * 该常量表示某种特性或模式是自动启用的
+ */
AUTO(true),
+
+ /**
+ * 默认手动模式
+ * 该常量表示某种特性或模式是手动启用的
+ */
MANUAL(false);
private final boolean auto;
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeType.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeType.java
index c883777af..d30422a95 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeType.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeType.java
@@ -22,13 +22,44 @@
*/
@Getter
public enum FlowNodeType {
+ /**
+ * 开始节点
+ */
START("START", false),
+
+ /**
+ * 状态节点
+ */
STATE("STATE", false),
+
+ /**
+ * 条件节点
+ */
CONDITION("CONDITION", false),
+
+ /**
+ * 并行节点
+ */
PARALLEL("PARALLEL", false),
+
+ /**
+ * 分支节点
+ */
FORK("FORK", true),
+
+ /**
+ * 合并节点
+ */
JOIN("JOIN", true),
+
+ /**
+ * 事件节点
+ */
EVENT("EVENT", true),
+
+ /**
+ * 结束节点
+ */
END("END", false);
private final String code;
@@ -52,4 +83,4 @@ public static FlowNodeType getNodeType(String code) {
.findFirst()
.orElseThrow(() -> new WaterflowParamException(ENUM_CONVERT_FAILED, "FlowNodeType", code));
}
-}
+}
\ No newline at end of file
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowTraceStatus.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowTraceStatus.java
index 22d9d9214..b6e23ee66 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowTraceStatus.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowTraceStatus.java
@@ -19,15 +19,29 @@
* @since 1.0
*/
public enum FlowTraceStatus {
- // 未执行
+ /**
+ * 未执行
+ */
READY,
- // 执行中
+
+ /**
+ * 执行中
+ */
RUNNING,
- // 执行完成
+
+ /**
+ * 执行完成
+ */
ARCHIVED,
- // 执行失败
+
+ /**
+ * 执行失败
+ */
ERROR,
- // 已终止
+
+ /**
+ * 已终止
+ */
TERMINATE;
/**
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/ParallelMode.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/ParallelMode.java
index bc1b6ce61..8af3cdec8 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/ParallelMode.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/ParallelMode.java
@@ -21,9 +21,15 @@
*/
@Getter
public enum ParallelMode {
+ /**
+ * 所有满足条件节点都执行
+ */
ALL("all"),
- EITHER("either"),
- ;
+
+ /**
+ * 满足条件节点执行
+ */
+ EITHER("either");
private final String code;
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/ProcessType.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/ProcessType.java
index b872a70f7..0f77169e5 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/ProcessType.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/ProcessType.java
@@ -14,6 +14,13 @@
* @since 1.0
*/
public enum ProcessType {
+ /**
+ * 前置处理
+ */
PRE_PROCESS,
+
+ /**
+ * 后置处理
+ */
PROCESS
}
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/events/FlowCallbackEvent.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/events/FlowCallbackEvent.java
index 2cd1b1057..ea583449d 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/events/FlowCallbackEvent.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/events/FlowCallbackEvent.java
@@ -24,6 +24,12 @@ public class FlowCallbackEvent implements Event {
private final Object publisher;
+ /**
+ * 构造函数
+ *
+ * @param flowContexts 回调函数参数
+ * @param publisher 发布者
+ */
public FlowCallbackEvent(List> flowContexts, Object publisher) {
this.flowContexts = flowContexts;
this.publisher = publisher;
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/events/FlowTaskCreatedEvent.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/events/FlowTaskCreatedEvent.java
index 814772684..11999dfd3 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/events/FlowTaskCreatedEvent.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/events/FlowTaskCreatedEvent.java
@@ -25,6 +25,14 @@ public class FlowTaskCreatedEvent implements Event {
private final Object publisher;
+ /**
+ * flow任务创建事件的构造方法
+ *
+ * @param flowContextId 流上下文ID列表
+ * @param streamId 流ID
+ * @param nodeId 节点ID
+ * @param publisher 发布者对象
+ */
public FlowTaskCreatedEvent(List flowContextId, String streamId, String nodeId, Object publisher) {
this.flowContextId = flowContextId;
this.streamId = streamId;
@@ -37,14 +45,29 @@ public Object publisher() {
return this.publisher;
}
+ /**
+ * 获取流上下文ID列表
+ *
+ * @return 流上下文ID列表
+ */
public List getFlowContextId() {
return flowContextId;
}
+ /**
+ * 获取流ID
+ *
+ * @return 流ID
+ */
public String getStreamId() {
return streamId;
}
+ /**
+ * 获取节点ID
+ *
+ * @return 节点ID
+ */
public String getNodeId() {
return nodeId;
}
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/flow/Flow.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/flow/Flow.java
index 2906a5f97..4e972a177 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/flow/Flow.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/flow/Flow.java
@@ -59,6 +59,9 @@ public abstract class Flow extends IdGenerator {
private Consumer completeListener;
+ /**
+ * 构造函数
+ */
protected Flow() {
// default session for unbound stream
this.defaultSession = new FlowSession();
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Activity.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Activity.java
index 62914ddb0..90adb854d 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Activity.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Activity.java
@@ -20,6 +20,11 @@ public abstract class Activity> {
@Getter
private final F flow;
+ /**
+ * 构造函数
+ *
+ * @param flow 流对象
+ */
protected Activity(F flow) {
this.flow = flow;
}
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Conditions.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Conditions.java
index 387170005..dcad8bce8 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Conditions.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Conditions.java
@@ -37,6 +37,7 @@ protected Conditions(State node) {
/**
* 创建一个分支,在满足该分支条件时执行逻辑。
*
+ * @param 表示返回值类型的泛型参数。
* @param whether 表示条件判定函数的 {@link Operators.Whether}{@code <} {@link I}{@code >}。
* @param processor 表示处理器的 {@link Operators.BranchProcessor}{@code <} {@link O}{@code ,} {@link D}{@code ,}
* {@link I}{@code ,} {@link F}{@code >}。
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/DataStart.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/DataStart.java
index e02b4ba63..b35bd7646 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/DataStart.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/DataStart.java
@@ -37,20 +37,43 @@ public class DataStart {
private final Emitter emitter;
+ /**
+ * 使用单个数据初始化 DataStart。
+ *
+ * @param state 开始节点
+ * @param data 单个数据
+ */
public DataStart(Start> state, D data) {
this(state, FlowEmitter.mono(data));
}
+ /**
+ * 使用数据数组初始化 DataStart。
+ *
+ * @param state 开始节点
+ * @param data 数据数组
+ */
public DataStart(Start> state, D[] data) {
this(state, FlowEmitter.flux(data));
}
+ /**
+ * 使用数据发射器初始化 DataStart。
+ *
+ * @param state 开始节点
+ * @param emitter 数据发射器
+ */
public DataStart(Start> state, Emitter emitter) {
this.state = state;
this.emitter = emitter;
this.start = this;
}
+ /**
+ * 使用开始节点初始化 DataStart,不指定发射器。
+ *
+ * @param state 开始节点
+ */
protected DataStart(Start> state) {
this(state, (Emitter) null);
}
@@ -202,6 +225,8 @@ public DataState process(Operators.Process processor) {
/**
* 触发数据的发射。
+ *
+ * @param session 流会话
*/
protected void offer(FlowSession session) {
if (this.emitter == null) {
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Fork.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Fork.java
index 1197260c6..e49eed575 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Fork.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Fork.java
@@ -64,6 +64,7 @@ public Fork fork(Operators.BranchProcessor processor) {
/**
* 生成join节点,到这里parallel结束,回到一般节点
*
+ * @param join节点的输出数据类型
* @param init 初始值
* @param processor join后的数据再处理一下
* @return 回到一般节点
@@ -79,11 +80,11 @@ public State join(Supplier init, Operators.Reduce proce
public synchronized R process(FlowContext input) {
input.getSession().setAsAccumulator();
Object key = input.getParallel();
- Map
*
+ * @param 表示输出数据类型。
* @param init 表示聚合操作初始值提供者的 {@link Supplier}{@code <}{@link R}{@code >}。
* @param processor 表示数据聚合器的 {@link Operators.ProcessReduce}{@code <}{@link O}{@code , }{@link R}{@code >}。
* @return 表示数据聚合节点的 {@link State}{@code <}{@link R}{@code , }{@link D}{@code , }
@@ -344,6 +349,7 @@ public synchronized void process(FlowContext input) {
/**
* 对数据流进行分组处理,根据指定的 keyGetter 获取的键进行分组。
*
+ * @param 表示输出数据类型。
* @param keyGetter 表示提供聚合键的 {@link Operators.Map}{@code <}{@link O}{@code , }{@link R}{@code >}。
* @return 表示聚合后的节点的 {@link Tuple}{@code <}{@link R}{@code , }{@link O}{@code >}。
*/
@@ -375,6 +381,7 @@ public State, D, O, F> keyBy(Operators.Map keyGetter) {
* 生成一个数据处理节点,将每个数据通过指定的方式进行处理后,形成一个新的数据,并继续发送。
*
* @param processor 表示处理器的 {@link Operators.Produce}{@code <}{@link O}{@code ,}{@link R}{@code >}。
+ * @param 表示处理后的数据类型。
* @return 表示新的处理节点的 {@link State}{@code <}{@link List}{@code <}{@link R}{@code >}
* {@code ,}{@link D}{@code ,?,}{@link F}{@code >}
*/
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/callbacks/ToCallback.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/callbacks/ToCallback.java
index 63f2061ad..3619c4896 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/callbacks/ToCallback.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/callbacks/ToCallback.java
@@ -19,6 +19,11 @@
public class ToCallback implements Callback {
private final List products;
+ /**
+ * 构造方法
+ *
+ * @param products 待回调发送的数据
+ */
public ToCallback(List products) {
this.products = products;
}
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Blocks.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Blocks.java
index a5f80c890..11fd262b3 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Blocks.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Blocks.java
@@ -53,7 +53,7 @@ public void process(List> contexts) {
* 有能力validate某个item是否满足条件
* 辉子 2019-11-21
*
- * @param
+ * @param 输入数据类型
*/
public static class ValidatorBlock extends Block {
/**
@@ -94,15 +94,24 @@ public void process(List> contexts) {
/**
* FilterBlock
*
+ * @param 输入数据类型
* @since 1.0
*/
public static class FilterBlock extends Block {
private final Operators.Filter filter;
+ /**
+ * 空filter构造器,其实就是默认全部通过验证
+ */
public FilterBlock() {
this(null);
}
+ /**
+ * 用户在设置时将filter传入block,但不会传入相应的节点,只会在调用process的时候传入节点
+ *
+ * @param filter 过滤器
+ */
public FilterBlock(Operators.Filter filter) {
this.filter = filter;
}
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/FlatMapNode.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/FlatMapNode.java
index fe679b3f6..0d0ab03d1 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/FlatMapNode.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/FlatMapNode.java
@@ -21,6 +21,15 @@
* @since 1.0
*/
public class FlatMapNode extends Node {
+ /**
+ * 构造函数
+ *
+ * @param streamId 流ID
+ * @param wrapper 包装器
+ * @param repo 上下文仓库
+ * @param messenger 上下文消息
+ * @param locks 上下文锁
+ */
public FlatMapNode(String streamId, Operators.Map, R> wrapper, FlowContextRepo repo,
FlowContextMessenger messenger, FlowLocks locks) {
super(streamId, wrapper, repo, messenger, locks);
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java
index 13416a390..79ec8c254 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java
@@ -8,8 +8,6 @@
import static modelengine.fit.waterflow.ErrorCodes.FLOW_ENGINE_INVALID_MANUAL_TASK;
-import modelengine.fit.waterflow.domain.context.repo.flowsession.FlowSessionRepo;
-import modelengine.fit.waterflow.exceptions.WaterflowException;
import modelengine.fit.waterflow.domain.context.FlatMapSourceWindow;
import modelengine.fit.waterflow.domain.context.FlatMapWindow;
import modelengine.fit.waterflow.domain.context.FlowContext;
@@ -19,6 +17,7 @@
import modelengine.fit.waterflow.domain.context.repo.flowcontext.FlowContextMessenger;
import modelengine.fit.waterflow.domain.context.repo.flowcontext.FlowContextRepo;
import modelengine.fit.waterflow.domain.context.repo.flowlock.FlowLocks;
+import modelengine.fit.waterflow.domain.context.repo.flowsession.FlowSessionRepo;
import modelengine.fit.waterflow.domain.enums.FlowNodeStatus;
import modelengine.fit.waterflow.domain.enums.FlowTraceStatus;
import modelengine.fit.waterflow.domain.enums.ParallelMode;
@@ -32,6 +31,7 @@
import modelengine.fit.waterflow.domain.stream.reactive.When;
import modelengine.fit.waterflow.domain.utils.IdGenerator;
import modelengine.fit.waterflow.domain.utils.UUIDUtil;
+import modelengine.fit.waterflow.exceptions.WaterflowException;
import modelengine.fitframework.inspection.Validation;
import modelengine.fitframework.util.CollectionUtils;
import modelengine.fitframework.util.ObjectUtils;
@@ -75,11 +75,26 @@ public class From extends IdGenerator implements Publisher {
private final String streamId;
+ /**
+ * 构造函数
+ *
+ * @param repo contextRepo
+ * @param messenger messenger
+ * @param locks 锁
+ */
public From(FlowContextRepo repo, FlowContextMessenger messenger, FlowLocks locks) {
this(null, repo, messenger, locks);
// 单纯的from的id就是stream id,因为单纯的from是数据的起点,其他都是subscriber
}
+ /**
+ * 构造函数
+ *
+ * @param streamId 流ID
+ * @param repo 上下文仓库
+ * @param messenger 消息传递者
+ * @param locks 流程锁
+ */
public From(String streamId, FlowContextRepo repo, FlowContextMessenger messenger, FlowLocks locks) {
this.streamId = streamId != null && !"".equals(streamId.trim()) ? streamId : this.id;
this.repo = repo;
@@ -245,14 +260,12 @@ public String offer(I[] data, FlowSession session) {
Set traceId = new HashSet<>();
traceId.add(trace.getId());
Window window = session.begin();
- List> contexts = Arrays.stream(data)
- .map(d -> {
- FlowContext context = new FlowContext<>(this.getStreamId(), this.getId(), d, traceId,
- this.getId(), session);
- window.createToken();
- return context;
- })
- .collect(Collectors.toList());
+ List> contexts = Arrays.stream(data).map(d -> {
+ FlowContext context =
+ new FlowContext<>(this.getStreamId(), this.getId(), d, traceId, this.getId(), session);
+ window.createToken();
+ return context;
+ }).collect(Collectors.toList());
List> after = this.startNodeMarkAsHandled(contexts, trace);
after.forEach(this::generateIndex);
this.offer(after);
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Node.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Node.java
index f57015fc8..d63ed2876 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Node.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Node.java
@@ -30,8 +30,8 @@
* 中间节点,既是数据发送者,也是数据接受者
* 由于没有多重继承,node直接继承自To,并生成一个From,引用所有From的实现
*
- * @param
- * @param
+ * @param 输入数据类型
+ * @param 输出数据类型
* @author 高诗意
* @since 1.0
*/
@@ -114,7 +114,7 @@ protected Node(String streamId, Operators.Map, R> processor, Flow
* @param repo contextRepo
* @param messenger messenger
* @param locks 流程锁
- * @return From
+ * @return {@link From}{@code <}{@link R}{@code >}
*/
protected From initFrom(FlowContextRepo repo, FlowContextMessenger messenger, FlowLocks locks) {
return new From<>(this.getStreamId(), repo, messenger, locks); // node里的from跟随subscriber的streamId
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Retryable.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Retryable.java
index fa3779cb9..d9096d367 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Retryable.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Retryable.java
@@ -24,6 +24,12 @@ public class Retryable {
private final Subscriber to;
+ /**
+ * 构造方法
+ *
+ * @param repo 仓库
+ * @param to 订阅者
+ */
public Retryable(FlowContextRepo repo, Subscriber to) {
this.repo = repo;
this.to = to;
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java
index e03d75a12..d5308526d 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java
@@ -10,7 +10,6 @@
import static modelengine.fit.waterflow.ErrorCodes.FLOW_NODE_MAX_TASK;
import lombok.Getter;
-import modelengine.fit.waterflow.exceptions.WaterflowException;
import modelengine.fit.waterflow.domain.common.Constants;
import modelengine.fit.waterflow.domain.context.FlowContext;
import modelengine.fit.waterflow.domain.context.FlowSession;
@@ -35,6 +34,7 @@
import modelengine.fit.waterflow.domain.utils.Identity;
import modelengine.fit.waterflow.domain.utils.SleepUtil;
import modelengine.fit.waterflow.domain.utils.UUIDUtil;
+import modelengine.fit.waterflow.exceptions.WaterflowException;
import modelengine.fitframework.inspection.Validation;
import modelengine.fitframework.log.Logger;
import modelengine.fitframework.schedule.Task;
@@ -59,8 +59,8 @@
* FitStream的数据处理节点,上一个节点是下一个节点的publisher
* 辉子 2019-10-31
*
- * @param 该节点处理函数入参类型
- * @param 该节点处理函数返回值类型
+ * @param 该节点处理函数入参类型
+ * @param 该节点处理函数返回值类型
* @author 高诗意
* @since 1.0
*/
@@ -141,7 +141,8 @@ public class To extends IdGenerator implements Subscriber {
private Map processingSessions = new ConcurrentHashMap<>();
private Operators.Validator validator = (repo, to) -> repo.requestMappingContext(to.streamId,
- to.froms.stream().map(Identity::getId).collect(Collectors.toList()), to.processingSessions);
+ to.froms.stream().map(Identity::getId).collect(Collectors.toList()),
+ to.processingSessions);
private Blocks.Block block = null;
@@ -293,20 +294,24 @@ private synchronized void triggerNodeProcessor(ProcessType type) {
preProcessRunning = true;
String threadName = getThreadName(PRE_PROCESS_T_NAME_PREFIX);
preProcessT = new Thread(() -> preProcess(type), threadName);
- preProcessT.setUncaughtExceptionHandler((thread, error) ->
- LOG.error("run preProcessT error, message:{}", error.getMessage()));
+ preProcessT.setUncaughtExceptionHandler((thread, error) -> LOG.error("run preProcessT error, message:{}",
+ error.getMessage()));
preProcessT.start();
- LOG.debug("[{}] preprocess main loop starts for stream-id: {}, node-id: {}", threadName, this.streamId,
+ LOG.debug("[{}] preprocess main loop starts for stream-id: {}, node-id: {}",
+ threadName,
+ this.streamId,
this.id);
}
if (type == ProcessType.PROCESS && (processT == null || !processRunning)) {
processRunning = true;
String threadName = getThreadName(PROCESS_T_NAME_PREFIX);
processT = new Thread(() -> process(type), threadName);
- processT.setUncaughtExceptionHandler((thread, error) ->
- LOG.error("run processT error, message:{}", error.getMessage()));
+ processT.setUncaughtExceptionHandler((thread, error) -> LOG.error("run processT error, message:{}",
+ error.getMessage()));
processT.start();
- LOG.debug("[{}] process main loop starts for stream-id: {}, node-id: {}", threadName, this.streamId,
+ LOG.debug("[{}] process main loop starts for stream-id: {}, node-id: {}",
+ threadName,
+ this.streamId,
this.id);
}
}
@@ -336,16 +341,19 @@ private void preProcess(ProcessType type) {
if (CollectionUtils.isEmpty(ready)) {
preProcessRunning = false;
LOG.debug("[{}] preprocess main loop exit for stream-id: {}, node-id: {}",
- this.getThreadName(PRE_PROCESS_T_NAME_PREFIX), this.streamId, this.id);
+ this.getThreadName(PRE_PROCESS_T_NAME_PREFIX),
+ this.streamId,
+ this.id);
this.handlePreProcessConcurrentConflict();
return;
}
messenger.send(this.getId(), ready);
} catch (Exception ex) {
ready.forEach( // 如果是数据库或者redis挂了,会死循环,线程不退出等待数据库或者redis恢复
- r -> LOG.error(
- "Preprocess main loop exception stream-id: {}, node-id: {}, context-id: {}.",
- this.streamId, this.id, r.getId()));
+ r -> LOG.error("Preprocess main loop exception stream-id: {}, node-id: {}, context-id: {}.",
+ this.streamId,
+ this.id,
+ r.getId()));
LOG.debug("Preprocess main loop exception details: ", ex);
} finally {
SleepUtil.sleep(SLEEP_MILLS);
@@ -366,7 +374,9 @@ private void handlePreProcessConcurrentConflict() {
return;
}
LOG.info("[{}] preprocess thread conflict happens for stream-id: {}, node-id: {}",
- this.getThreadName(PRE_PROCESS_T_NAME_PREFIX), this.streamId, this.id);
+ this.getThreadName(PRE_PROCESS_T_NAME_PREFIX),
+ this.streamId,
+ this.id);
this.accept(ProcessType.PRE_PROCESS, concurrentConflictContexts);
}
@@ -463,6 +473,11 @@ public Operators.Filter defaultFilter() {
}
}
+ /**
+ * 设置验证器,用于验证上下文数据。
+ *
+ * @param validator 验证器对象
+ */
public void setValidator(Operators.Validator validator) {
if (validator == null) {
this.validator = (i, all) -> new ArrayList<>();
@@ -508,7 +523,10 @@ public void onProcess(ProcessType type, List> preList, boolean is
});
} catch (Exception ex) {
LOG.error("Node process exception stream-id: {}, node-id: {}, position-id: {}, traceId: {}. caused by: {}",
- this.streamId, this.id, preList.get(0).getPosition(), preList.get(0).getTraceId(),
+ this.streamId,
+ this.id,
+ preList.get(0).getPosition(),
+ preList.get(0).getTraceId(),
ex.getClass().getName());
LOG.debug("Error, message: {}.", ex.getMessage());
LOG.debug("Node process exception details: ", ex);
@@ -656,7 +674,9 @@ public Boolean isAuto() {
@Override
public List> nextContexts(String batchId) {
- return ObjectUtils.cast(this.flowContextRepo.getContextsByPosition(this.streamId, this.getId(), batchId,
+ return ObjectUtils.cast(this.flowContextRepo.getContextsByPosition(this.streamId,
+ this.getId(),
+ batchId,
FlowNodeStatus.NEW.toString()));
}
@@ -689,8 +709,8 @@ private void introduceToProcess(List> contexts) {
}
private boolean isParallelJoined(FlowContext context) {
- List> contextsByParallel = this.getFlowContextRepo()
- .getContextsByParallel(context.getParallel());
+ List> contextsByParallel =
+ this.getFlowContextRepo().getContextsByParallel(context.getParallel());
return contextsByParallel.stream().anyMatch(FlowContext::isJoined);
}
@@ -725,11 +745,14 @@ private int getNextAccOrder(FlowSession session) {
}
/**
- * ProcessMode
+ * ProcessMode 枚举,定义了节点的处理模式。
*
* @since 1.0
*/
public enum ProcessMode {
+ /**
+ * Producing 模式
+ */
PRODUCING {
@Override
public List> process(To to, List> contexts) {
@@ -742,9 +765,14 @@ public List> process(To to, List List> requestAll(To to) {
return to.flowContextRepo.requestProducingContext(to.streamId,
- to.froms.stream().map(Identity::getId).collect(Collectors.toList()), to.postFilter());
+ to.froms.stream().map(Identity::getId).collect(Collectors.toList()),
+ to.postFilter());
}
},
+
+ /**
+ * Mapping 模式
+ */
MAPPING {
@Override
public List> process(To to, List> contexts) {
@@ -769,7 +797,8 @@ public List> process(To to, List List> requestAll(To to) {
/**
* 节点处理器
*
- * @param to to
- * @param contexts contexts
- * @return List>
+ * @param 流程实例执行时的入参数据类型
+ * @param 流程实例执行时的出参数据类型
+ * @param to 当前节点
+ * @param contexts 上下文列表
+ * @return 处理后的上下文列表
*/
public abstract List> process(To to, List> contexts);
/**
- * 节点request边上pending的数据
- * 首先通过分布式锁,保证每次只有一个节点线程可以请求到一批次contexts(以batchID为维度)
- * 其次过滤出ready的contexts,并且将其状态更新为ready,然后释放分布式锁
- * 最后将ready的contexts提交给节点线程池处理
- * 保证一批次contexts一次只有一个线程在处理
- * 非常重要!退出机制增加保护策略,避免A线程退出过程中,B线程放数据到边上数据得不到处理的场景:
- * 这时A线程未标记退出,B线程已经完成触发动作,B线程以为A线程还在处理,而A线程直接就会退出,因此由A线程判断是否再触发一次
+ * 节点 request 边上 pending 的数据
*
- * @param 流程实例执行时的入参数据类型,用于泛型推倒
- * @param 流程实例执行时的出参数据类型,用于泛型推倒
+ * @param 流程实例执行时的入参数据类型
+ * @param 流程实例执行时的出参数据类型
+ * @param type 处理类型
* @param to 当前节点
*/
public void request(ProcessType type, To to) {
@@ -832,7 +858,9 @@ public void request(ProcessType type, To to) {
if (CollectionUtils.isEmpty(ready)) {
to.processRunning = false;
LOG.debug("[{}] process main loop exit for stream-id: {}, node-id: {}",
- to.getThreadName(To.PROCESS_T_NAME_PREFIX), to.streamId, to.id);
+ to.getThreadName(To.PROCESS_T_NAME_PREFIX),
+ to.streamId,
+ to.id);
handleProcessConcurrentConflict(to);
return;
}
@@ -846,7 +874,9 @@ public void request(ProcessType type, To to) {
// 如果是数据库或者redis挂了,会死循环,线程不退出等待数据库或者redis恢复
ready.forEach(r -> LOG.error(
"Process main loop exception, " + "stream-id: {}, node-id: {}, context-id: {}.",
- to.streamId, to.id, r.getId()));
+ to.streamId,
+ to.id,
+ r.getId()));
LOG.debug("Process main loop exception details: ", ex);
} finally {
if (!isSubmitted) {
@@ -858,11 +888,11 @@ public void request(ProcessType type, To to) {
}
/**
- * 查找节点连接的边上所有的contexts,由子类负责实现
+ * 查找节点连接的边上所有的 contexts
*
+ * @param 流程实例执行时的入参数据类型
+ * @param 流程实例执行时的出参数据类型
* @param to 本节点节点类
- * @param 流程实例执行时的入参数据类型,用于泛型推倒
- * @param 流程实例执行时的出参数据类型,用于泛型推倒
* @return 获取所有该节点待处理的数据
*/
protected abstract List> requestAll(To to);
@@ -892,8 +922,8 @@ private List> requestReady(To to) {
*
* @param to 本节点节点类
* @param pre 本节点获取到边上所有的contexts集合
- * @param 流程实例执行时的入参数据类型,用于泛型推倒
- * @param 流程实例执行时的出参数据类型,用于泛型推倒
+ * @param 流程实例执行时的入参数据类型
+ * @param 流程实例执行时的出参数据类型
* @return ready的context列表
*/
private List> filterReady(To to, List> pre) {
@@ -920,7 +950,9 @@ private void handleProcessConcurrentConflict(To to) {
return;
}
LOG.info("[{}] process thread conflict happens for stream-id: {}, node-id: {}",
- to.getThreadName(To.PROCESS_T_NAME_PREFIX), to.streamId, to.id);
+ to.getThreadName(To.PROCESS_T_NAME_PREFIX),
+ to.streamId,
+ to.id);
to.accept(ProcessType.PROCESS, pending);
}
}
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/Operators.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/Operators.java
index c48dc0966..3d1afbe46 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/Operators.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/Operators.java
@@ -146,7 +146,7 @@ public interface Produce {
* process
*
* @param input input
- * @return List
+ * @return R
*/
List process(List input);
}
@@ -218,15 +218,15 @@ public interface ErrorHandler {
/**
* 用于when的条件判定
*
- * @param
+ * @param 输入类型
*/
@FunctionalInterface
public interface Whether {
/**
- * is
+ * 判断输入是否满足条件
*
- * @param input input
- * @return boolean
+ * @param input 输入值
+ * @return 如果满足条件返回true,否则返回false
*/
boolean is(T input);
}
@@ -271,7 +271,7 @@ public interface Filter {
* process
*
* @param input input
- * @return List>
+ * @return 筛选结果
*/
List> process(List> input);
}
@@ -279,7 +279,7 @@ public interface Filter {
/**
* 用于单条验证原材料是否符合生产标准
*
- * @param
+ * @param 原材料类型
*/
@FunctionalInterface
public interface Validator {
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/SessionWindowCondition.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/SessionWindowCondition.java
index a5c0df836..e93859790 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/SessionWindowCondition.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/SessionWindowCondition.java
@@ -26,7 +26,6 @@ private SessionWindowCondition(String key, Operators.WindowCondition windowCondi
*
* @param key 用于构建window的key
* @param windowCondition 给定的window条件
- * @param 数据类型
* @return 构造后的sessionWindow
*/
public static SessionWindowCondition from(String key, Operators.WindowCondition windowCondition) {
@@ -43,7 +42,6 @@ public static SessionWindowCondition from(Operators.WindowCondition windowCondit
return from(null, windowCondition);
}
-
@Override
public boolean fulfilled(WindowArg arg) {
return this.windowCondition.fulfilled(arg);
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/WindowArg.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/WindowArg.java
index 5d23400b1..7cf2cb82a 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/WindowArg.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/WindowArg.java
@@ -20,6 +20,14 @@ public class WindowArg {
private final Duration timeToNow;
+ /**
+ * 构造函数
+ *
+ * @param isSessionComplete 是否会话完成
+ * @param dataLength 数据长度
+ * @param countToNow 计数到当前
+ * @param timeToNow 时间到当前
+ */
public WindowArg(boolean isSessionComplete, long dataLength, long countToNow, Duration timeToNow) {
this.isSessionComplete = isSessionComplete;
this.dataLength = dataLength;
@@ -28,9 +36,9 @@ public WindowArg(boolean isSessionComplete, long dataLength, long countToNow, Du
}
/**
- * session是否完成
+ * 确认window是否完成
*
- * @return
+ * @return 完成状态
*/
public boolean isSessionComplete() {
return this.isSessionComplete;
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Callback.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Callback.java
index 4d58672fa..cf9f7eef1 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Callback.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Callback.java
@@ -16,16 +16,16 @@
*/
public interface Callback {
/**
- * getAll
+ * 获取所有处理的对象
*
- * @return List
+ * @return 包含所有处理对象的列表
*/
List getAll();
/**
- * get
+ * 获取单个处理的对象
*
- * @return O
+ * @return 单个处理对象
*/
O get();
-}
+}
\ No newline at end of file
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Processor.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Processor.java
index 37694fc22..614bfd08b 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Processor.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Processor.java
@@ -40,12 +40,11 @@ public interface Processor extends Publisher, Subscriber {
* @param nodeId 表示 {@code displayFlow} 流程节点名称的 {@link String}。
* @return 表示数据处理器自身的 {@link Processor}{@code <}{@link T}{@code , }{@link R}{@code >}。
* @throws IllegalArgumentException
- *
+ *
* - 当 {@code displayFlow} 为 {@code null} 时。
* - 当 {@code name} 为 {@code null} 、空字符串或只有空白字符的字符串时。
* - 当 {@code nodeId} 为 {@code null} 、空字符串或只有空白字符的字符串时。
- *
- *
+ *
*/
Processor displayAs(String name, Flow displayFlow, String nodeId);
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Publisher.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Publisher.java
index 3dfcf621b..a70ca76a7 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Publisher.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Publisher.java
@@ -47,43 +47,45 @@ default void handle(I data, FlowSession flowSession) {
* conditions
*
* @param whether 判定条件
- * @return Processor
+ * @return Processor
*/
Processor conditions(Operators.Whether whether);
/**
* parallel
*
- * @param mode mode
- * @param whether whether
- * @return Processor
+ * @param mode 并行模式
+ * @param whether 判定条件
+ * @return Processor
*/
Processor parallel(ParallelMode mode, Operators.Whether whether);
/**
* join
*
- * @param processor processor
- * @param whether whether
- * @return Processor
+ * @param processor 数据处理器
+ * @param whether 判定条件
+ * @param 返回值类型
+ * @return Processor
*/
Processor join(Operators.Map, O> processor, Operators.Whether whether);
/**
* just
*
- * @param processor processor
- * @param whether whether
- * @return Processor
+ * @param processor 数据处理器
+ * @param whether 判定条件
+ * @return Processor
*/
Processor just(Operators.Just> processor, Operators.Whether whether);
/**
* map
*
- * @param processor processor
- * @param whether whether
- * @return Processor
+ * @param processor 数据处理器
+ * @param whether 判定条件
+ * @param 返回值类型
+ * @return Processor
*/
Processor map(Operators.Map, O> processor, Operators.Whether whether);
@@ -102,40 +104,45 @@ default void handle(I data, FlowSession flowSession) {
* process处理,并往下发射新的数据,支持操作 session KV状态数据
*
* @param processor 携带数据、KV下文和发射器的处理器
- * @param whether whether
- * @return Processor
+ * @param whether 判定条件
+ * @param 返回值类型
+ * @return Processor
*/
Processor process(Operators.Process, O> processor, Operators.Whether whether);
/**
* subscribe
*
- * @param subscriber subscriber
+ * @param subscriber 订阅者
+ * @param 订阅者处理的数据类型
*/
void subscribe(Subscriber subscriber);
/**
* subscribe
*
- * @param subscriber subscriber
- * @param whether whether
+ * @param subscriber 订阅者
+ * @param whether 判定条件
+ * @param 订阅者处理的数据类型
*/
void subscribe(Subscriber subscriber, Operators.Whether whether);
/**
* subscribe
*
- * @param eventId eventId
- * @param subscriber subscriber
+ * @param eventId 事件ID
+ * @param subscriber 订阅者
+ * @param 订阅者处理的数据类型
*/
void subscribe(String eventId, Subscriber subscriber);
/**
* subscribe
*
- * @param eventId eventId
- * @param subscriber subscriber
- * @param whether whether
+ * @param eventId 事件ID
+ * @param subscriber 订阅者
+ * @param whether 判定条件
+ * @param 订阅者处理的数据类型
*/
void subscribe(String eventId, Subscriber subscriber, Operators.Whether whether);
@@ -190,7 +197,7 @@ default void handle(I data, FlowSession flowSession) {
/**
* getSubscriptions
*
- * @return List>
+ * @return 订阅者列表
*/
List> getSubscriptions();
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java
index 7b71ac7cc..2e1edd2c6 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java
@@ -130,7 +130,7 @@ public interface Subscriber extends StreamIdentity, Emitter>
+ * @return {@link List}{@code <}{@link FlowContext}{@code <}{@link O}{@code >>}
*/
List> nextContexts(String batchId);
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscription.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscription.java
index 39a2a769a..39da6646d 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscription.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscription.java
@@ -30,14 +30,15 @@ public interface Subscription extends StreamIdentity {
/**
* getWhether
*
- * @return Whether
+ * @return Whether
*/
Operators.Whether getWhether();
/**
* getTo
*
- * @return Subscriber
+ * @param R
+ * @return Subscriber
*/
Subscriber getTo();
}
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/When.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/When.java
index 6db2155f5..4b05d99f9 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/When.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/When.java
@@ -51,6 +51,7 @@ public class When extends IdGenerator implements Subscription {
/**
* When
*
+ * @param R
* @param streamId streamId
* @param to to
* @param whether whether
@@ -70,6 +71,7 @@ public When(String streamId, Subscriber to, Operators.Whether wheth
/**
* When
*
+ * @param R
* @param streamId streamId
* @param eventId eventId
* @param to to
diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/IdGenerator.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/IdGenerator.java
index b81b29681..7f03a0ee7 100644
--- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/IdGenerator.java
+++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/IdGenerator.java
@@ -23,10 +23,18 @@ public abstract class IdGenerator implements Identity {
@Setter
protected String id;
+ /**
+ * 构造函数
+ */
public IdGenerator() {
this(UUIDUtil.uuid());
}
+ /**
+ * 构造函数
+ *
+ * @param id ID
+ */
public IdGenerator(String id) {
this.id = id;
}
From 1de8e417e470a8ef8003a2ed427af832e1e574ad Mon Sep 17 00:00:00 2001
From: CodeCaster
Date: Wed, 11 Jun 2025 23:21:57 +0800
Subject: [PATCH 3/4] optimize FEL javadoc and pom
---
framework/dependency/pom.xml | 34 ++-
.../fel/engine/activities/AiActivity.java | 5 +
.../fel/engine/activities/AiMatchHappen.java | 6 +
.../engine/activities/AiMatchToHappen.java | 10 +-
.../fel/engine/activities/AiParallel.java | 1 +
.../fel/engine/activities/AiStart.java | 5 +-
.../modelengine/fel/engine/flows/AiFlow.java | 5 +
.../fel/engine/flows/AiProcessFlow.java | 5 +
.../fel/engine/flows/ConverseLatch.java | 25 +++
.../operators/models/ChatBlockModel.java | 13 +-
.../engine/operators/models/ChatChunk.java | 7 +-
.../operators/models/ChatFlowModel.java | 6 +
.../operators/patterns/AbstractAgent.java | 8 +-
.../patterns/AbstractFlowPattern.java | 5 +-
.../operators/patterns/SimplePattern.java | 5 +
.../engine/operators/patterns/SyncTipper.java | 5 +
.../patterns/support/DefaultAgent.java | 7 +
.../fel/engine/util/AiFlowSession.java | 4 +
.../pipeline/huggingface/GeneralPipeline.java | 3 +-
.../pipeline/huggingface/PipelineTask.java | 111 +++++++++-
framework/fel/java/pom.xml | 198 +++---------------
.../pipeline/HuggingFacePipelineService.java | 3 +-
framework/ohscript/pom.xml | 4 +-
framework/pom.xml | 37 ++++
framework/waterflow/java/pom.xml | 3 +-
25 files changed, 326 insertions(+), 189 deletions(-)
diff --git a/framework/dependency/pom.xml b/framework/dependency/pom.xml
index bee069c2a..af3706461 100644
--- a/framework/dependency/pom.xml
+++ b/framework/dependency/pom.xml
@@ -56,7 +56,7 @@
1.2.20
1.2.83
32.0.1-jre
- 2.3.232
+ portable-1.8.4
1.18.36
2.18.2
3.5.13
@@ -415,6 +415,33 @@
${fit.version}
+
+
+ org.fitframework.fel
+ fel-core
+ ${fit.version}
+
+
+ org.fitframework.fel
+ tool-service
+ ${fit.version}
+
+
+ org.fitframework.fel
+ tool-info
+ ${fit.version}
+
+
+ org.fitframework.fel
+ tool-mcp-common
+ ${fit.version}
+
+
+ org.fitframework.fel
+ tool-mcp-client-service
+ ${fit.version}
+
+
org.fitframework.ohscript
@@ -453,6 +480,11 @@
guava
${guava.version}
+
+ com.hankcs
+ hanlp
+ ${hanlp.version}
+
net.bytebuddy
byte-buddy
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiActivity.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiActivity.java
index ee30f9931..4288e5c92 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiActivity.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiActivity.java
@@ -32,6 +32,11 @@ protected AiActivity(F flow) {
this.flow = Validation.notNull(flow, "Flow cannot be null.");
}
+ /**
+ * 获取 AI 节点所对应的流程对象。
+ *
+ * @return AI 节点所对应的流程对象。
+ */
public F flow() {
return this.flow;
}
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiMatchHappen.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiMatchHappen.java
index cd0cf9bae..f2f56e412 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiMatchHappen.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiMatchHappen.java
@@ -30,6 +30,12 @@ public class AiMatchHappen, F extends AiFlow>
private final F flow;
+ /**
+ * 创建一个 AI 流程匹配发生器。
+ *
+ * @param matchHappen 匹配发生器。
+ * @param flow AI 流程。
+ */
public AiMatchHappen(MatchHappen matchHappen, F flow) {
this.matchHappen = Validation.notNull(matchHappen, "MatchHappen cannot be null.");
this.flow = Validation.notNull(flow, "Flow cannot be null.");
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiMatchToHappen.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiMatchToHappen.java
index 16d53b3c3..ccad0f2fd 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiMatchToHappen.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiMatchToHappen.java
@@ -31,6 +31,12 @@ public class AiMatchToHappen, F extends AiFlow>
private final F flow;
+ /**
+ * 创建一个 {@link AiMatchToHappen} 对象。
+ *
+ * @param matchToHappen 匹配条件。
+ * @param flow AI 流程。
+ */
public AiMatchToHappen(MatchToHappen matchToHappen, F flow) {
this.matchToHappen = Validation.notNull(matchToHappen, "matchToHappen cannot be null.");
this.flow = Validation.notNull(flow, "Flow cannot be null.");
@@ -75,8 +81,8 @@ public AiMatchHappen match(Operators.Whether whether,
/**
* 提供一个默认的处理逻辑,并结束条件节点。
*
- * @param processor 表示分支处理器的 {@link AiBranchProcessor}{@code <}{@link O}{@code , }{@link D}{@code ,
- * }{@link ?}{@code , }{@link RF}{@code , }{@link F}{@code >}。
+ * @param processor 表示分支处理器的 {@link AiBranchProcessor}{@code <}{@link O}{@code , }{@link D}{@code ,?,
+ * }{@link RF}{@code , }{@link F}{@code >}。
* @param 表示第一个条件分支指定的返回类型。
* @return 表示条件节点的 {@link AiState}{@code <}{@link O}{@code , }{@link D}{@code , }{@link O}{@code ,
* }{@link RF}{@code , }{@link F}{@code >}。
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiParallel.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiParallel.java
index 7841ca22e..033c1596e 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiParallel.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiParallel.java
@@ -40,6 +40,7 @@ public AiParallel(Parallel parallel, F flow) {
/**
* 生成平行节点的子分支。
*
+ * @param 表示分支处理器的输入参数的类型。
* @param processor 表示分支处理器的 {@link AiBranchProcessor}{@code <}{@link O}{@code , }{@link D}{@code ,
* }{@link I}{@code , }{@link RF}{@code , }{@link F}{@code >}。
* @return 表示平行节点子分支的 {@link AiFork}{@code <}{@link O}{@code , }{@link D}{@code , }{@link I}{@code ,
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiStart.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiStart.java
index 0bd7962c5..19b32abe5 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiStart.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiStart.java
@@ -397,6 +397,7 @@ public AiState synthesize(Synthesizer synthesizer) {
/**
* 将模型处理返回值的格式化解析。
*
+ * @param 表示解析器的返回值类型。
* @param parser 表示格式化解析器的 {@link Parser}{@code <}{@link R}{@code >}。
* @return 表示格式化解析节点的 {@link AiState}{@code <}{@link O}{@code , }{@link D}{@code , }{@link O}{@code ,
* }{@link RF}{@code , }{@link F}{@code >}。
@@ -505,8 +506,8 @@ public AiState delegate(AiProcessFlow aiFlow) {
* @return 表示委托节点的 {@link AiState}{@code <}{@link R}{@code , }{@link D}{@code , }{@link O}{@code ,
* }{@link RF}{@code , }{@link F}{@code >}。
* @throws IllegalArgumentException
- * - 当 {@code aiFlow} 为 {@code null}时。
- *
- 当 {@code nodeId} 为 {@code null} 、空字符串或只有空白字符的字符串时。
+ *
- 当 {@code aiFlow} 为 {@code null}时。
+ * - 当 {@code nodeId} 为 {@code null} 、空字符串或只有空白字符的字符串时。
*
*/
public AiState delegate(AiProcessFlow aiFlow, String nodeId) {
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/AiFlow.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/AiFlow.java
index ff35a5357..2cc984bae 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/AiFlow.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/AiFlow.java
@@ -23,6 +23,11 @@
public class AiFlow> extends IdGenerator {
private final F flow;
+ /**
+ * 创建一个 AI 流程。
+ *
+ * @param flow 流程。
+ */
public AiFlow(F flow) {
this.flow = Validation.notNull(flow, "Flow cannot be null.");
}
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/AiProcessFlow.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/AiProcessFlow.java
index c7d132e90..ce53b1cef 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/AiProcessFlow.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/AiProcessFlow.java
@@ -32,6 +32,11 @@ public class AiProcessFlow extends AiFlow>
private final Map, EmitterListener> listeners =
new ConcurrentHashMap<>();
+ /**
+ * 创建一个流程。
+ *
+ * @param flow 表示流程。
+ */
public AiProcessFlow(ProcessFlow flow) {
super(flow);
}
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/ConverseLatch.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/ConverseLatch.java
index b1d8b409d..e28b226cb 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/ConverseLatch.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/ConverseLatch.java
@@ -73,22 +73,47 @@ public T await() {
return this.data;
}
+ /**
+ * Returns the data.
+ *
+ * @return The data.
+ */
public T data() {
return this.data;
}
+ /**
+ * Sets the data.
+ *
+ * @param data The data.
+ */
public void data(T data) {
this.data = data;
}
+ /**
+ * Returns the throwable.
+ *
+ * @return The throwable.
+ */
public Throwable throwable() {
return this.throwable;
}
+ /**
+ * Sets the throwable.
+ *
+ * @param throwable The throwable.
+ */
public void throwable(Throwable throwable) {
this.throwable = throwable;
}
+ /**
+ * Returns the count-down latch.
+ *
+ * @return The count-down latch.
+ */
public CountDownLatch countDownLatch() {
return this.countDownLatch;
}
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/models/ChatBlockModel.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/models/ChatBlockModel.java
index 84da4294d..dceb5d0c4 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/models/ChatBlockModel.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/models/ChatBlockModel.java
@@ -7,11 +7,11 @@
package modelengine.fel.engine.operators.models;
import modelengine.fel.core.chat.ChatMessage;
+import modelengine.fel.core.chat.ChatModel;
import modelengine.fel.core.chat.ChatOption;
import modelengine.fel.core.chat.MessageType;
import modelengine.fel.core.chat.Prompt;
import modelengine.fel.core.chat.support.AiMessage;
-import modelengine.fel.core.chat.ChatModel;
import modelengine.fel.core.memory.Memory;
import modelengine.fel.core.model.BlockModel;
import modelengine.fel.engine.util.AiFlowSession;
@@ -32,10 +32,21 @@ public class ChatBlockModel implements BlockModel {
private final ChatModel provider;
private final ChatOption option;
+ /**
+ * 创建一个阻塞对话模型。
+ *
+ * @param provider 聊天模型提供者。
+ */
public ChatBlockModel(ChatModel provider) {
this(provider, ChatOption.custom().build());
}
+ /**
+ * 创建一个阻塞对话模型。
+ *
+ * @param provider 聊天模型提供者。
+ * @param option 聊天模型选项。
+ */
public ChatBlockModel(ChatModel provider, ChatOption option) {
this.provider = Validation.notNull(provider, "The model provider cannot be null.");
this.option = Validation.notNull(option, "The chat options cannot be null.");
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/models/ChatChunk.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/models/ChatChunk.java
index e75e21e12..70a3261fb 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/models/ChatChunk.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/models/ChatChunk.java
@@ -31,8 +31,11 @@ public class ChatChunk implements ChatMessage {
private final StringBuilder text = new StringBuilder();
private final List toolCalls = new ArrayList<>();
- public ChatChunk() {
- }
+ /**
+ * 创建一个空的 {@link ChatChunk}。
+ */
+ public ChatChunk() {}
+
/**
* 使用文本数据、媒体数据和工具请求初始化 {@link ChatChunk}。
*
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/models/ChatFlowModel.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/models/ChatFlowModel.java
index 1a00e8273..bcc31a0bd 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/models/ChatFlowModel.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/models/ChatFlowModel.java
@@ -31,6 +31,12 @@ public class ChatFlowModel implements FlowModel {
private final ChatOption option;
+ /**
+ * 创建一个流式对话模型。
+ *
+ * @param chatModel 对话模型。
+ * @param option 对话参数。
+ */
public ChatFlowModel(ChatModel chatModel, ChatOption option) {
this.chatModel = notNull(chatModel, "The model provider can not be null.");
this.option = option;
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/AbstractAgent.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/AbstractAgent.java
index f12bcce1a..c2ff3111e 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/AbstractAgent.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/AbstractAgent.java
@@ -9,7 +9,6 @@
import static modelengine.fitframework.inspection.Validation.notBlank;
import static modelengine.fitframework.inspection.Validation.notNull;
-import lombok.Getter;
import modelengine.fel.core.chat.ChatMessage;
import modelengine.fel.core.chat.Prompt;
import modelengine.fel.core.chat.support.AiMessage;
@@ -53,10 +52,11 @@ protected AbstractAgent(ChatFlowModel flowModel) {
*
* @param flowModel 表示模型推理服务的 {@link ChatFlowModel}。
* @param memoryId agentMsgKey 表示 Agent 响应的所在自定义键的 {@link String}。
- * @throws IllegalArgumentException
+ * @throws IllegalArgumentException
+ *
* - 当 {@code toolProvider} 、 {@code chatStreamModel} 和 {@code options} 任一个为 {@code null} 时。
* - 当 {@code agentMsgKey} 为 {@code null} 、空字符串或只有空白字符的字符串时。
- *
+ *
*/
protected AbstractAgent(ChatFlowModel flowModel, String memoryId) {
this.model = notNull(flowModel, "The flow model cannot be null.");
@@ -69,7 +69,7 @@ protected AbstractAgent(ChatFlowModel flowModel, String memoryId) {
* @return 配置的模型对象。
*/
public ChatFlowModel getModel() {
- return model;
+ return this.model;
}
/**
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/AbstractFlowPattern.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/AbstractFlowPattern.java
index ec475432c..7de5954bf 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/AbstractFlowPattern.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/AbstractFlowPattern.java
@@ -39,6 +39,9 @@ public abstract class AbstractFlowPattern implements FlowPattern {
resultAction.process(data, session);
};
+ /**
+ * Constructor.
+ */
protected AbstractFlowPattern() {
this.flowSupplier = LazyLoader.of(() -> {
AiProcessFlow flow = buildFlow();
@@ -108,8 +111,8 @@ public Flow origin() {
* Built the flow session for starting the conversation.
*
* @param emitter The {@link FlowEmitter}{@code <}{@link O}{@code >} representing output emitter.
- * @return The new {@link FlowSession}.
* @param The output data type.
+ * @return The new {@link FlowSession}.
*/
protected static FlowSession buildFlowSession(FlowEmitter emitter) {
FlowSession mainSession = AiFlowSession.require();
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/SimplePattern.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/SimplePattern.java
index df4b2c9dd..066afeb08 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/SimplePattern.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/SimplePattern.java
@@ -20,6 +20,11 @@
public class SimplePattern implements Pattern {
private final Function func;
+ /**
+ * 创建一个委托单元。
+ *
+ * @param func 委托函数。
+ */
public SimplePattern(Function func) {
this.func = Validation.notNull(func, "The action function cannot be null.");
}
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/SyncTipper.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/SyncTipper.java
index c637bf5ce..561427fbc 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/SyncTipper.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/SyncTipper.java
@@ -88,6 +88,7 @@ static Pattern value(String key, AiProcessFlow flow) {
/**
* 简单键值对分支。
*
+ * @param