Skip to content

<fix>[cloudbus]: support send confirm#3521

Open
ZStack-Robot wants to merge 1 commit intozsv_5.0.0from
sync/wenhao.zhang/c-1@@2
Open

<fix>[cloudbus]: support send confirm#3521
ZStack-Robot wants to merge 1 commit intozsv_5.0.0from
sync/wenhao.zhang/c-1@@2

Conversation

@ZStack-Robot
Copy link
Collaborator

  1. send in CloudBus means submit a message send task.
    function return do not means sending completion.
    so there is no way to set a limition in sender.
    add a FutureCompletion to confirm message send.
    provide a way to limited send.

  2. fix mn heartbeat thread interrupt handle

  3. refactor duplicate code for getChainInfo, fix synchronized target.

This patch is cherry-picked from ZSTAC-71213
(commit: af58834)

Resolves: ZSV-11510

Change-Id: I686361627566737a7775706263796b736967666c

sync from gitlab !9380

@coderabbitai
Copy link

coderabbitai bot commented Mar 17, 2026

Walkthrough

此PR将CloudBus的send接口从同步void改为返回FutureCompletion,传播并发控制流变更至多个实现与测试;同时调整平台IP选择、线程队列/任务信息展示、心跳中断处理、ChainInfo结构及新增错误码,并扩展/更新相关集成测试。

Changes

Cohort / File(s) Summary
CloudBus核心改动
core/src/main/java/org/zstack/core/cloudbus/CloudBus.java, core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.java, core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java
send(Message)send(NeedReplyMessage, CloudBusCallBack) 签名从void改为返回FutureCompletion,引入SEND_CONFIRMED成功哨兵,修改内部发送/重试/错误路径以返回或创建对应的FutureCompletion,并将发送失败映射为CLOUD_BUS_SEND_ERROR
测试与辅助类
test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy, test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy, test/src/test/groovy/org/zstack/test/integration/core/cloudbus/MessageSender.java, test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy
新增/改写多项集成测试以验证FutureCompletion行为(成功、超时、缺失节点、队列并发等);添加MessageSender测试工具类;调整测试时序与心跳模拟;改为在发送后await完成再断言。
平台网络选择
core/src/main/java/org/zstack/core/Platform.java
在选取默认路由接口IP时优先选择IPv4:遍历接口地址并首次选中Inet4Address,替代原先直接取首个地址的逻辑。
线程队列与任务信息重构
core/src/main/java/org/zstack/core/thread/DispatchQueueImpl.java, core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java, core/src/main/java/org/zstack/core/thread/ThreadFacadeImpl.java
重命名dump方法为dumpSignatureNameThread,将队列信息组装委托给ChainTaskQueueWrapper.getTaskQueueInfo()/ChainInfo,使pendingQueuefinal并调整同步策略;TaskInfoBuilder改为接收AbstractChainTaskThreadFacadeImpl.submit添加trace日志。
进度信息与错误码
header/src/main/java/org/zstack/header/core/progress/ChainInfo.java, header/src/main/java/org/zstack/header/errorcode/SysErrors.java
ChainInfo新增maxThreadNum字段及getter/setter,重写toString()以包含异步级别统计;新增错误码CLOUD_BUS_SEND_ERROR(2111)
管理节点心跳中断处理
portal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.java
心跳循环增加线程中断检测与调试日志,sleepAHeartbeatInterval改为返回boolean以区分正常睡眠和中断,调整失败处理在被中断时退出。

Sequence Diagram

sequenceDiagram
    participant Caller
    participant CloudBus as CloudBusImpl3
    participant ThreadPool
    participant Handler as MessageHandler
    participant FutureComp as FutureCompletion

    Caller->>CloudBus: send(Message msg)
    activate CloudBus
    CloudBus->>ThreadPool: submit async send task
    CloudBus-->>Caller: return SEND_CONFIRMED (FutureCompletion)
    deactivate CloudBus
    Caller->>FutureComp: await(timeout)
    activate ThreadPool
    ThreadPool->>Handler: deliver/process message
    Handler-->>CloudBus: reply/process result
    CloudBus->>FutureComp: complete(success/fail)
    FutureComp-->>Caller: notify completion
    deactivate ThreadPool
Loading
sequenceDiagram
    participant Caller
    participant CloudBus as CloudBusImpl3
    participant HTTPClient
    participant RemoteNode
    participant FutureComp as FutureCompletion

    Caller->>CloudBus: send(NeedReplyMessage msg, callback)
    activate CloudBus
    CloudBus->>HTTPClient: httpSend(msg)
    CloudBus-->>Caller: return SEND_CONFIRMED (FutureCompletion)
    deactivate CloudBus
    activate HTTPClient
    HTTPClient->>RemoteNode: POST message
    alt Node responds
        RemoteNode-->>HTTPClient: 200/response
        HTTPClient->>Caller: invoke callback(reply)
        HTTPClient->>FutureComp: complete(success)
    else ManagementNodeNotFound
        HTTPClient-->>CloudBus: throw ManagementNodeNotFoundException
        CloudBus->>HTTPClient: attempt resend to alternate node
        HTTPClient->>RemoteNode: POST to alternate
        RemoteNode-->>HTTPClient: response
        HTTPClient->>Caller: invoke callback(reply)
        HTTPClient->>FutureComp: complete(success)
    end
    deactivate HTTPClient
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰 我在代码林中跳跃歌唱,
未来完成带来新希望,
消息飞出又归来,回声轻响,
线程与心跳共舞忙,
我啃着萝卜庆改良✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 1.61% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title '[cloudbus]: support send confirm' clearly and concisely describes the main change: adding FutureCompletion to CloudBus send operations for send confirmation.
Description check ✅ Passed The description is related to the changeset and explains the purpose: adding FutureCompletion for send confirmation, fixing MN heartbeat handling, and refactoring duplicate code.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch sync/wenhao.zhang/c-1@@2
📝 Coding Plan
  • Generate coding plan for human review comments

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy (1)

60-65: ⚠️ Potential issue | 🔴 Critical

Bug: prepareInvalidRecords() 被调用了两次,将导致测试失败

第 61 行和第 65 行都调用了 prepareInvalidRecords(),这会创建 4 条记录而不是预期的 2 条。后续断言 assert count == 2(第 75、81 行)将会失败。

根据 PR 描述,意图是在测试开始时调用该方法,因此应删除第 65 行的重复调用。

🐛 建议修复:删除重复调用
 void testUnexpectedManagementNodeRecord() {
     prepareInvalidRecords()
     ManagementNodeGlobalConfig.NODE_HEARTBEAT_INTERVAL.updateValue(1)
     PortalGlobalProperty.MAX_HEARTBEAT_FAILURE = 2

-    prepareInvalidRecords()
     int heartbeatFailureTimeout = ManagementNodeGlobalConfig.NODE_HEARTBEAT_INTERVAL.value(Integer.class) * PortalGlobalProperty.MAX_HEARTBEAT_FAILURE
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy`
around lines 60 - 65, The test method testUnexpectedManagementNodeRecord
erroneously calls prepareInvalidRecords() twice, creating 4 records instead of
the intended 2 and causing the subsequent assertions (assert count == 2) to
fail; fix it by removing the second call to prepareInvalidRecords() inside
testUnexpectedManagementNodeRecord so that prepareInvalidRecords() is only
invoked once at test start, leaving the subsequent heartbeat config lines
(ManagementNodeGlobalConfig.NODE_HEARTBEAT_INTERVAL.updateValue and
PortalGlobalProperty.MAX_HEARTBEAT_FAILURE) and the rest of the test unchanged.
🧹 Nitpick comments (1)
core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java (1)

35-53: 重构清晰,新增 setContext 调用完善了信息填充。

方法签名从 AbstractTimeStatisticFuture 改为 AbstractChainTask,消除了内部重复的 .getTask() 调用,代码更简洁。

Line 52 的空列表处理:当 contextList 为空时设置为 "\n",若这是为了保持输出格式一致性则可以接受。如果空字符串更合适,可考虑调整:

-        info.setContext(info.getContextList().isEmpty() ? "\n" : String.format("\nCONTEXT: %s\n", StringUtils.join(info.getContextList(), "\n")));
+        info.setContext(info.getContextList().isEmpty() ? "" : String.format("\nCONTEXT: %s\n", StringUtils.join(info.getContextList(), "\n")));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java` around lines
35 - 53, The current loadTaskInfo(TaskInfo info, AbstractChainTask task, int
index) sets info.setContext to "\n" when contextList is empty; change this to
set an empty string ("" ) instead to avoid emitting a lone newline—update the
final assignment that uses info.getContextList().isEmpty() ? "\n" :
String.format("\nCONTEXT: %s\n", ...) to use "" for the empty-case so callers of
TaskInfo.getContext() receive an empty value when there is no context; touch the
loadTaskInfo method and the TaskInfo.setContext/getContext usage if any
consumers expect non-empty formatting.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java`:
- Around line 594-614: The code currently calls httpSendInQueue(ip) twice in
httpSend(); remove the first call inside the try block and only invoke
httpSendInQueue(ip) once at the end. Concretely, in method httpSend() keep the
destMaker.getNodeInfo(managementNodeId).getNodeIP() assignment in the try, but
do not call httpSendInQueue(ip) there; preserve the catch behavior that uses
deadMessageManager.handleManagementNodeNotFoundError(...) and returns
SEND_CONFIRMED when handled, otherwise rethrow, and then let the single final
return call httpSendInQueue(ip) execute. This ensures httpSendInQueue(...) is
only invoked once.

In `@core/src/main/java/org/zstack/core/Platform.java`:
- Around line 821-828: The loop that sets ip for each InetAddress (using iface
and ip variables with InetAddress/Inet4Address) currently overwrites ip on every
iteration and breaks only when encountering an IPv4, which makes the fallback
address unpredictable when no IPv4 exists; change the logic so that you
immediately break and use the IPv4 address when found (ia instanceof
Inet4Address), but otherwise only assign a fallback ip once (e.g. first
non-link-local address or first global-scope address) and never overwrite that
fallback on subsequent iterations—skip link-local addresses when selecting the
fallback so the chosen non-IPv4 address is stable and usable if no IPv4 is
present.

In `@core/src/main/java/org/zstack/core/thread/DispatchQueueImpl.java`:
- Around line 948-954: The comment and code disagree: the comment says
"pendingQueue is synchronized with chainTasks, do not synchronize itself" but
the block synchronizes on pendingQueue; update them to match by either (a)
keeping synchronization on pendingQueue and changing the comment to state that
pendingQueue is independently synchronized (modify the comment near pendingQueue
in DispatchQueueImpl to reflect independent locking), or (b) synchronizing on
chainTasks instead of pendingQueue (replace synchronized (pendingQueue) with
synchronized (chainTasks)) so the code actually synchronizes with chainTasks;
ensure any references to TaskInfoBuilder.buildPendingTaskInfo(cf, now, index++)
remain inside the chosen synchronized block.

In
`@portal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.java`:
- Around line 911-918: sleepAHeartbeatInterval() currently swallows an
InterruptedException by returning false but its callers in
ManagementNodeManagerImpl (the heartbeat / DB failure recovery loop) ignore that
return value, causing an interrupt to be treated as a DB failure and potentially
stopping the node; update the caller(s) of sleepAHeartbeatInterval() in
ManagementNodeManagerImpl (the heartbeat/DB recovery loop) to check the boolean
result and on false immediately exit the heartbeat thread (break/return) to
perform a graceful stop, or alternatively change sleepAHeartbeatInterval() to
propagate the interruption (throw InterruptedException) and let the caller
handle it—ensure the interrupt flag remains set
(Thread.currentThread().interrupt()) and do not count an interrupt as a DB
failure retry.

In
`@test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy`:
- Around line 398-400: The test currently ignores whether FutureCompletion
actually finished within 5000ms — it calls completion.await(5000L) but only
asserts !completion.isSuccess(), which will also pass if the future never
completed; update the assertion to verify the await returned (i.e. completion
completed) and then assert the error state like the callback branch: call
completion.await(5000L) and assert that it indicates completion (not timed out)
and that completion.isSuccess() is false (or check
completion.getError()/hasError), tying this to the same FutureCompletion
returned by bus.send(msg) so the test fails if send never completes.
- Around line 425-439: The test currently spawns 50 threads that call
MessageSender.sendMsg() and immediately samples
ThreadFacade.getChainTaskInfo("http-send-in-queue"), which can produce a false
pass because messages may not have been enqueued yet; modify testSendQueue to
(1) add a synchronization barrier so all sender threads start at the same time
(e.g., use a CountDownLatch or CyclicBarrier before calling
MessageSender.sendMsg()), (2) after releasing threads, poll
ThreadFacade.getChainTaskInfo("http-send-in-queue") with a short sleep loop
until the chain appears or runningTask.size() > 0 (with a timeout), and only
then perform the assertion that info.runningTask.size() <= 1; reference
MessageSender.sendMsg(), ThreadFacade.getChainTaskInfo("http-send-in-queue") and
the test method that launches the threads to implement these changes.

---

Outside diff comments:
In
`@test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy`:
- Around line 60-65: The test method testUnexpectedManagementNodeRecord
erroneously calls prepareInvalidRecords() twice, creating 4 records instead of
the intended 2 and causing the subsequent assertions (assert count == 2) to
fail; fix it by removing the second call to prepareInvalidRecords() inside
testUnexpectedManagementNodeRecord so that prepareInvalidRecords() is only
invoked once at test start, leaving the subsequent heartbeat config lines
(ManagementNodeGlobalConfig.NODE_HEARTBEAT_INTERVAL.updateValue and
PortalGlobalProperty.MAX_HEARTBEAT_FAILURE) and the rest of the test unchanged.

---

Nitpick comments:
In `@core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java`:
- Around line 35-53: The current loadTaskInfo(TaskInfo info, AbstractChainTask
task, int index) sets info.setContext to "\n" when contextList is empty; change
this to set an empty string ("" ) instead to avoid emitting a lone
newline—update the final assignment that uses info.getContextList().isEmpty() ?
"\n" : String.format("\nCONTEXT: %s\n", ...) to use "" for the empty-case so
callers of TaskInfo.getContext() receive an empty value when there is no
context; touch the loadTaskInfo method and the TaskInfo.setContext/getContext
usage if any consumers expect non-empty formatting.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 33f8364e-5f95-493f-a86a-7f10f677b8c4

📥 Commits

Reviewing files that changed from the base of the PR and between 421e9c8 and cedb855.

⛔ Files ignored due to path filters (1)
  • conf/errorCodes/sys.xml is excluded by !**/*.xml
📒 Files selected for processing (14)
  • core/src/main/java/org/zstack/core/Platform.java
  • core/src/main/java/org/zstack/core/cloudbus/CloudBus.java
  • core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.java
  • core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java
  • core/src/main/java/org/zstack/core/thread/DispatchQueueImpl.java
  • core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java
  • core/src/main/java/org/zstack/core/thread/ThreadFacadeImpl.java
  • header/src/main/java/org/zstack/header/core/progress/ChainInfo.java
  • header/src/main/java/org/zstack/header/errorcode/SysErrors.java
  • portal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.java
  • test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy
  • test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy
  • test/src/test/groovy/org/zstack/test/integration/core/cloudbus/MessageSender.java
  • test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy

Comment on lines +821 to +828
for (InetAddress ia : Collections.list(iface.getInetAddresses())) {
ip = ia.getHostAddress();
if (ia instanceof Inet4Address) {
// we prefer IPv4 address
ip = ia.getHostAddress();
break;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

IPv4 不存在时的回退地址选择存在不稳定行为。

在 Line [822] 每次循环都会覆盖 ip。如果该网卡没有 IPv4,最终会返回“最后一个”地址,可能是不可用的链路本地 IPv6,导致管理地址选取不稳定。

🔧 建议修复
                 if (defaultLine.contains(name)) {
-                    for (InetAddress ia : Collections.list(iface.getInetAddresses())) {
-                        ip = ia.getHostAddress();
-                        if (ia instanceof Inet4Address) {
-                            // we prefer IPv4 address
-                            ip = ia.getHostAddress();
-                            break;
-                        }
-                    }
+                    String selectedIp = null;
+                    for (InetAddress ia : Collections.list(iface.getInetAddresses())) {
+                        String candidate = ia.getHostAddress();
+                        if (ia instanceof Inet4Address) {
+                            // we prefer IPv4 address
+                            selectedIp = candidate;
+                            break;
+                        }
+                        if (selectedIp == null && !ia.isLoopbackAddress() && !ia.isLinkLocalAddress()) {
+                            selectedIp = candidate;
+                        }
+                    }
+                    if (selectedIp != null) {
+                        ip = selectedIp;
+                        break;
+                    }
                 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/main/java/org/zstack/core/Platform.java` around lines 821 - 828, The
loop that sets ip for each InetAddress (using iface and ip variables with
InetAddress/Inet4Address) currently overwrites ip on every iteration and breaks
only when encountering an IPv4, which makes the fallback address unpredictable
when no IPv4 exists; change the logic so that you immediately break and use the
IPv4 address when found (ia instanceof Inet4Address), but otherwise only assign
a fallback ip once (e.g. first non-link-local address or first global-scope
address) and never overwrite that fallback on subsequent iterations—skip
link-local addresses when selecting the fallback so the chosen non-IPv4 address
is stable and usable if no IPv4 is present.

Comment on lines +398 to +400
FutureCompletion completion = bus.send(msg)
completion.await(5000L)
assert !completion.isSuccess()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

这里没有区分“发送失败”和“根本没完成”。

await(5000L) 的结果被忽略,后面只断言 !completion.isSuccess()。如果 FutureCompletion 在 5 秒内根本没有完成,这里一样会通过,正好把 send confirm 的回归漏掉。建议至少补一个完成后的错误态断言,和下面回调分支保持一致。

建议修改
         FutureCompletion completion = bus.send(msg)
         completion.await(5000L)
         assert !completion.isSuccess()
+        assert completion.getErrorCode() != null
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
FutureCompletion completion = bus.send(msg)
completion.await(5000L)
assert !completion.isSuccess()
FutureCompletion completion = bus.send(msg)
completion.await(5000L)
assert !completion.isSuccess()
assert completion.getErrorCode() != null
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy`
around lines 398 - 400, The test currently ignores whether FutureCompletion
actually finished within 5000ms — it calls completion.await(5000L) but only
asserts !completion.isSuccess(), which will also pass if the future never
completed; update the assertion to verify the await returned (i.e. completion
completed) and then assert the error state like the callback branch: call
completion.await(5000L) and assert that it indicates completion (not timed out)
and that completion.isSuccess() is false (or check
completion.getError()/hasError), tying this to the same FutureCompletion
returned by bus.send(msg) so the test fails if send never completes.

Comment on lines +425 to +439
def threads = []
1.upto(50) { i ->
Thread t = Thread.start {
new MessageSender().sendMsg()
}
threads << t
}

ThreadFacade thdf = bean(ThreadFacade.class)

ChainInfo info = thdf.getChainTaskInfo("http-send-in-queue")
assert info.runningTask.size() <= 1
sleep(100)
info = thdf.getChainTaskInfo("http-send-in-queue")
assert info.runningTask.size() <= 1
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

testSendQueue() 现在可能会“空跑通过”。

50 个线程启动后立即读取 http-send-in-queue,没有任何同步点保证发送任务已经入队。这样即使队列串行化退化,这个断言也可能在链路尚未建立时直接通过,变成假阳性。建议先对齐起跑,再等待链路出现后采样。

建议修改
     void testSendQueue() {
         def threads = []
+        CountDownLatch ready = new CountDownLatch(50)
+        CountDownLatch start = new CountDownLatch(1)
         1.upto(50) { i ->
             Thread t = Thread.start {
+                ready.countDown()
+                start.await()
                 new MessageSender().sendMsg()
             }
             threads << t
         }
+        assert ready.await(5, TimeUnit.SECONDS)
+        start.countDown()
 
         ThreadFacade thdf = bean(ThreadFacade.class)
-
-        ChainInfo info = thdf.getChainTaskInfo("http-send-in-queue")
-        assert info.runningTask.size() <= 1
-        sleep(100)
-        info = thdf.getChainTaskInfo("http-send-in-queue")
-        assert info.runningTask.size() <= 1
+        retryInSecs {
+            ChainInfo info = thdf.getChainTaskInfo("http-send-in-queue")
+            assert info != null
+            assert info.runningTask.size() <= 1
+        }
 
         threads.each { it.join() }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy`
around lines 425 - 439, The test currently spawns 50 threads that call
MessageSender.sendMsg() and immediately samples
ThreadFacade.getChainTaskInfo("http-send-in-queue"), which can produce a false
pass because messages may not have been enqueued yet; modify testSendQueue to
(1) add a synchronization barrier so all sender threads start at the same time
(e.g., use a CountDownLatch or CyclicBarrier before calling
MessageSender.sendMsg()), (2) after releasing threads, poll
ThreadFacade.getChainTaskInfo("http-send-in-queue") with a short sleep loop
until the chain appears or runningTask.size() > 0 (with a timeout), and only
then perform the assertion that info.runningTask.size() <= 1; reference
MessageSender.sendMsg(), ThreadFacade.getChainTaskInfo("http-send-in-queue") and
the test method that launches the threads to implement these changes.

@MatheMatrix MatheMatrix force-pushed the sync/wenhao.zhang/c-1@@2 branch from cedb855 to 79e1cff Compare March 19, 2026 07:36
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java (1)

566-591: ⚠️ Potential issue | 🟠 Major

httpSendInQueue 中异常处理不完整,可能导致调用者挂起。

如果 httpSend(ip) 抛出异常,sendCompletion.success() 不会被调用,FutureCompletion 将永远不会完成。调用 await() 的调用者将无限期阻塞。

建议添加异常处理
 private FutureCompletion httpSendInQueue(String ip) {
     FutureCompletion sendCompletion = new FutureCompletion(null);
     thdf.chainSubmit(new ChainTask(sendCompletion) {
         `@Override`
         public String getSyncSignature() {
             return "http-send-in-queue";
         }

         `@Override`
         public void run(SyncTaskChain chain) {
-            httpSend(ip);
-            sendCompletion.success();
+            try {
+                httpSend(ip);
+                sendCompletion.success();
+            } catch (Throwable t) {
+                sendCompletion.fail(err(CLOUD_BUS_SEND_ERROR, t.getMessage()));
+            }
             chain.next();
         }

         `@Override`
         protected int getSyncLevel() {
             return CloudBusGlobalProperty.HTTP_MAX_CONN;
         }

         `@Override`
         public String getName() {
             return getSyncSignature();
         }
     });
     return sendCompletion;
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java` around lines
566 - 591, The httpSendInQueue task currently calls httpSend(ip) without
catching exceptions, so if httpSend throws the FutureCompletion sendCompletion
is never completed and callers block; wrap the httpSend(ip) invocation in
try/catch/finally inside the ChainTask.run: on success call
sendCompletion.success(), on exception complete the FutureCompletion with
failure (e.g. sendCompletion.fail(...) or equivalent) passing the
exception/error details, and ensure chain.next() is always invoked in a finally
block so the sync chain continues.
♻️ Duplicate comments (1)
test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy (1)

425-443: ⚠️ Potential issue | 🟡 Minor

testSendQueue() 存在竞态条件风险。

50 个线程启动后立即读取 http-send-in-queue 的状态,没有同步机制保证发送任务已经入队。这可能导致测试在任务尚未入队时就通过断言,产生假阳性结果。

建议添加同步屏障确保所有线程同时开始发送,并等待队列中有任务后再进行采样:

建议修改
 void testSendQueue() {
     def threads = []
+    CountDownLatch ready = new CountDownLatch(50)
+    CountDownLatch start = new CountDownLatch(1)
     1.upto(50) { i ->
         Thread t = Thread.start {
+            ready.countDown()
+            start.await()
             new MessageSender().sendMsg()
         }
         threads << t
     }
+    ready.await(5, TimeUnit.SECONDS)
+    start.countDown()

     ThreadFacade thdf = bean(ThreadFacade.class)

-    ChainInfo info = thdf.getChainTaskInfo("http-send-in-queue")
-    assert info.runningTask.size() <= 1
-    sleep(100)
-    info = thdf.getChainTaskInfo("http-send-in-queue")
-    assert info.runningTask.size() <= 1
+    retryInSecs {
+        ChainInfo info = thdf.getChainTaskInfo("http-send-in-queue")
+        assert info != null
+        assert info.runningTask.size() <= 1
+    }

     threads.each { it.join() }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy`
around lines 425 - 443, testSendQueue() has a race: it samples
ThreadFacade.getChainTaskInfo("http-send-in-queue") immediately after starting
50 threads so the messages may not have been enqueued yet; modify
testSendQueue() to coordinate thread start and ensure the queue contains tasks
before asserting by using a synchronization barrier (e.g., CountDownLatch or
CyclicBarrier) so all MessageSender.sendMsg() threads wait to start together,
then wait/poll ThreadFacade.getChainTaskInfo("http-send-in-queue") until
runningTask.size() reflects enqueued work (or a short timeout) before performing
the assert; reference the testSendQueue(), MessageSender.sendMsg(), ThreadFacade
and getChainTaskInfo("http-send-in-queue") symbols when making the change.
🧹 Nitpick comments (1)
test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy (1)

100-114: 使用 sleep 可能导致测试不稳定。

c.await() 之后使用 sleep(1000) 来等待异步操作完成,这种方式可能在 CI 环境或系统负载较高时导致测试不稳定。建议考虑使用 retryInSecs 或其他同步机制来确保断言的可靠性。

不过考虑到这是测试代码且原有的 retryInSecs 逻辑被简化,当前实现可以接受。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy`
around lines 100 - 114, The test uses sleep(1000) after c.await(), which can
make it flaky; replace the sleep-based wait with a deterministic retry/polling
loop (e.g., retryInSecs or polling with a timeout) that checks the same
condition you're asserting against
(mgr.managementNodeNotFoundHandlers.asMap().values().message does not contain
reply) until it becomes true or a timeout is reached; update the block around
c.await(), bus.registerService(secondNodeService),
mockAManagementNode(secondNodeUuid) and
cleanup()/bus.unregisterService(secondNodeService) to remove sleep and use the
retry/poll mechanism to ensure the assertion is stable.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Outside diff comments:
In `@core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java`:
- Around line 566-591: The httpSendInQueue task currently calls httpSend(ip)
without catching exceptions, so if httpSend throws the FutureCompletion
sendCompletion is never completed and callers block; wrap the httpSend(ip)
invocation in try/catch/finally inside the ChainTask.run: on success call
sendCompletion.success(), on exception complete the FutureCompletion with
failure (e.g. sendCompletion.fail(...) or equivalent) passing the
exception/error details, and ensure chain.next() is always invoked in a finally
block so the sync chain continues.

---

Duplicate comments:
In
`@test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy`:
- Around line 425-443: testSendQueue() has a race: it samples
ThreadFacade.getChainTaskInfo("http-send-in-queue") immediately after starting
50 threads so the messages may not have been enqueued yet; modify
testSendQueue() to coordinate thread start and ensure the queue contains tasks
before asserting by using a synchronization barrier (e.g., CountDownLatch or
CyclicBarrier) so all MessageSender.sendMsg() threads wait to start together,
then wait/poll ThreadFacade.getChainTaskInfo("http-send-in-queue") until
runningTask.size() reflects enqueued work (or a short timeout) before performing
the assert; reference the testSendQueue(), MessageSender.sendMsg(), ThreadFacade
and getChainTaskInfo("http-send-in-queue") symbols when making the change.

---

Nitpick comments:
In
`@test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy`:
- Around line 100-114: The test uses sleep(1000) after c.await(), which can make
it flaky; replace the sleep-based wait with a deterministic retry/polling loop
(e.g., retryInSecs or polling with a timeout) that checks the same condition
you're asserting against
(mgr.managementNodeNotFoundHandlers.asMap().values().message does not contain
reply) until it becomes true or a timeout is reached; update the block around
c.await(), bus.registerService(secondNodeService),
mockAManagementNode(secondNodeUuid) and
cleanup()/bus.unregisterService(secondNodeService) to remove sleep and use the
retry/poll mechanism to ensure the assertion is stable.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 543c5406-130b-4628-a923-7e3bc97d328d

📥 Commits

Reviewing files that changed from the base of the PR and between cedb855 and 79e1cff.

⛔ Files ignored due to path filters (1)
  • conf/errorCodes/sys.xml is excluded by !**/*.xml
📒 Files selected for processing (14)
  • core/src/main/java/org/zstack/core/Platform.java
  • core/src/main/java/org/zstack/core/cloudbus/CloudBus.java
  • core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.java
  • core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java
  • core/src/main/java/org/zstack/core/thread/DispatchQueueImpl.java
  • core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java
  • core/src/main/java/org/zstack/core/thread/ThreadFacadeImpl.java
  • header/src/main/java/org/zstack/header/core/progress/ChainInfo.java
  • header/src/main/java/org/zstack/header/errorcode/SysErrors.java
  • portal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.java
  • test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy
  • test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy
  • test/src/test/groovy/org/zstack/test/integration/core/cloudbus/MessageSender.java
  • test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy
✅ Files skipped from review due to trivial changes (2)
  • core/src/main/java/org/zstack/core/thread/ThreadFacadeImpl.java
  • header/src/main/java/org/zstack/header/errorcode/SysErrors.java
🚧 Files skipped from review as they are similar to previous changes (4)
  • test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy
  • test/src/test/groovy/org/zstack/test/integration/core/cloudbus/MessageSender.java
  • core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.java
  • core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java

1. send in CloudBus means submit a message send task.
function return do not means sending completion.
so there is no way to set a limition in sender.
add a FutureCompletion to confirm message send.
provide a way to limited send.

2. fix mn heartbeat thread interrupt handle

3. refactor duplicate code for getChainInfo, fix synchronized target.

This patch is cherry-picked from ZSTAC-71213
(commit: af58834)

Resolves: ZSV-11510

Change-Id: I686361627566737a7775706263796b736967666c
@MatheMatrix MatheMatrix force-pushed the sync/wenhao.zhang/c-1@@2 branch from 79e1cff to 19fa46e Compare March 19, 2026 16:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant