<fix>[cloudbus]: support send confirm#3521
Conversation
Walkthrough此PR将CloudBus的send接口从同步void改为返回FutureCompletion,传播并发控制流变更至多个实现与测试;同时调整平台IP选择、线程队列/任务信息展示、心跳中断处理、ChainInfo结构及新增错误码,并扩展/更新相关集成测试。 Changes
Sequence DiagramsequenceDiagram
participant Caller
participant CloudBus as CloudBusImpl3
participant ThreadPool
participant Handler as MessageHandler
participant FutureComp as FutureCompletion
Caller->>CloudBus: send(Message msg)
activate CloudBus
CloudBus->>ThreadPool: submit async send task
CloudBus-->>Caller: return SEND_CONFIRMED (FutureCompletion)
deactivate CloudBus
Caller->>FutureComp: await(timeout)
activate ThreadPool
ThreadPool->>Handler: deliver/process message
Handler-->>CloudBus: reply/process result
CloudBus->>FutureComp: complete(success/fail)
FutureComp-->>Caller: notify completion
deactivate ThreadPool
sequenceDiagram
participant Caller
participant CloudBus as CloudBusImpl3
participant HTTPClient
participant RemoteNode
participant FutureComp as FutureCompletion
Caller->>CloudBus: send(NeedReplyMessage msg, callback)
activate CloudBus
CloudBus->>HTTPClient: httpSend(msg)
CloudBus-->>Caller: return SEND_CONFIRMED (FutureCompletion)
deactivate CloudBus
activate HTTPClient
HTTPClient->>RemoteNode: POST message
alt Node responds
RemoteNode-->>HTTPClient: 200/response
HTTPClient->>Caller: invoke callback(reply)
HTTPClient->>FutureComp: complete(success)
else ManagementNodeNotFound
HTTPClient-->>CloudBus: throw ManagementNodeNotFoundException
CloudBus->>HTTPClient: attempt resend to alternate node
HTTPClient->>RemoteNode: POST to alternate
RemoteNode-->>HTTPClient: response
HTTPClient->>Caller: invoke callback(reply)
HTTPClient->>FutureComp: complete(success)
end
deactivate HTTPClient
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
📝 Coding Plan
Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy (1)
60-65:⚠️ Potential issue | 🔴 CriticalBug:
prepareInvalidRecords()被调用了两次,将导致测试失败第 61 行和第 65 行都调用了
prepareInvalidRecords(),这会创建 4 条记录而不是预期的 2 条。后续断言assert count == 2(第 75、81 行)将会失败。根据 PR 描述,意图是在测试开始时调用该方法,因此应删除第 65 行的重复调用。
🐛 建议修复:删除重复调用
void testUnexpectedManagementNodeRecord() { prepareInvalidRecords() ManagementNodeGlobalConfig.NODE_HEARTBEAT_INTERVAL.updateValue(1) PortalGlobalProperty.MAX_HEARTBEAT_FAILURE = 2 - prepareInvalidRecords() int heartbeatFailureTimeout = ManagementNodeGlobalConfig.NODE_HEARTBEAT_INTERVAL.value(Integer.class) * PortalGlobalProperty.MAX_HEARTBEAT_FAILURE🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy` around lines 60 - 65, The test method testUnexpectedManagementNodeRecord erroneously calls prepareInvalidRecords() twice, creating 4 records instead of the intended 2 and causing the subsequent assertions (assert count == 2) to fail; fix it by removing the second call to prepareInvalidRecords() inside testUnexpectedManagementNodeRecord so that prepareInvalidRecords() is only invoked once at test start, leaving the subsequent heartbeat config lines (ManagementNodeGlobalConfig.NODE_HEARTBEAT_INTERVAL.updateValue and PortalGlobalProperty.MAX_HEARTBEAT_FAILURE) and the rest of the test unchanged.
🧹 Nitpick comments (1)
core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java (1)
35-53: 重构清晰,新增setContext调用完善了信息填充。方法签名从
AbstractTimeStatisticFuture改为AbstractChainTask,消除了内部重复的.getTask()调用,代码更简洁。Line 52 的空列表处理:当
contextList为空时设置为"\n",若这是为了保持输出格式一致性则可以接受。如果空字符串更合适,可考虑调整:- info.setContext(info.getContextList().isEmpty() ? "\n" : String.format("\nCONTEXT: %s\n", StringUtils.join(info.getContextList(), "\n"))); + info.setContext(info.getContextList().isEmpty() ? "" : String.format("\nCONTEXT: %s\n", StringUtils.join(info.getContextList(), "\n")));🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java` around lines 35 - 53, The current loadTaskInfo(TaskInfo info, AbstractChainTask task, int index) sets info.setContext to "\n" when contextList is empty; change this to set an empty string ("" ) instead to avoid emitting a lone newline—update the final assignment that uses info.getContextList().isEmpty() ? "\n" : String.format("\nCONTEXT: %s\n", ...) to use "" for the empty-case so callers of TaskInfo.getContext() receive an empty value when there is no context; touch the loadTaskInfo method and the TaskInfo.setContext/getContext usage if any consumers expect non-empty formatting.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java`:
- Around line 594-614: The code currently calls httpSendInQueue(ip) twice in
httpSend(); remove the first call inside the try block and only invoke
httpSendInQueue(ip) once at the end. Concretely, in method httpSend() keep the
destMaker.getNodeInfo(managementNodeId).getNodeIP() assignment in the try, but
do not call httpSendInQueue(ip) there; preserve the catch behavior that uses
deadMessageManager.handleManagementNodeNotFoundError(...) and returns
SEND_CONFIRMED when handled, otherwise rethrow, and then let the single final
return call httpSendInQueue(ip) execute. This ensures httpSendInQueue(...) is
only invoked once.
In `@core/src/main/java/org/zstack/core/Platform.java`:
- Around line 821-828: The loop that sets ip for each InetAddress (using iface
and ip variables with InetAddress/Inet4Address) currently overwrites ip on every
iteration and breaks only when encountering an IPv4, which makes the fallback
address unpredictable when no IPv4 exists; change the logic so that you
immediately break and use the IPv4 address when found (ia instanceof
Inet4Address), but otherwise only assign a fallback ip once (e.g. first
non-link-local address or first global-scope address) and never overwrite that
fallback on subsequent iterations—skip link-local addresses when selecting the
fallback so the chosen non-IPv4 address is stable and usable if no IPv4 is
present.
In `@core/src/main/java/org/zstack/core/thread/DispatchQueueImpl.java`:
- Around line 948-954: The comment and code disagree: the comment says
"pendingQueue is synchronized with chainTasks, do not synchronize itself" but
the block synchronizes on pendingQueue; update them to match by either (a)
keeping synchronization on pendingQueue and changing the comment to state that
pendingQueue is independently synchronized (modify the comment near pendingQueue
in DispatchQueueImpl to reflect independent locking), or (b) synchronizing on
chainTasks instead of pendingQueue (replace synchronized (pendingQueue) with
synchronized (chainTasks)) so the code actually synchronizes with chainTasks;
ensure any references to TaskInfoBuilder.buildPendingTaskInfo(cf, now, index++)
remain inside the chosen synchronized block.
In
`@portal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.java`:
- Around line 911-918: sleepAHeartbeatInterval() currently swallows an
InterruptedException by returning false but its callers in
ManagementNodeManagerImpl (the heartbeat / DB failure recovery loop) ignore that
return value, causing an interrupt to be treated as a DB failure and potentially
stopping the node; update the caller(s) of sleepAHeartbeatInterval() in
ManagementNodeManagerImpl (the heartbeat/DB recovery loop) to check the boolean
result and on false immediately exit the heartbeat thread (break/return) to
perform a graceful stop, or alternatively change sleepAHeartbeatInterval() to
propagate the interruption (throw InterruptedException) and let the caller
handle it—ensure the interrupt flag remains set
(Thread.currentThread().interrupt()) and do not count an interrupt as a DB
failure retry.
In
`@test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy`:
- Around line 398-400: The test currently ignores whether FutureCompletion
actually finished within 5000ms — it calls completion.await(5000L) but only
asserts !completion.isSuccess(), which will also pass if the future never
completed; update the assertion to verify the await returned (i.e. completion
completed) and then assert the error state like the callback branch: call
completion.await(5000L) and assert that it indicates completion (not timed out)
and that completion.isSuccess() is false (or check
completion.getError()/hasError), tying this to the same FutureCompletion
returned by bus.send(msg) so the test fails if send never completes.
- Around line 425-439: The test currently spawns 50 threads that call
MessageSender.sendMsg() and immediately samples
ThreadFacade.getChainTaskInfo("http-send-in-queue"), which can produce a false
pass because messages may not have been enqueued yet; modify testSendQueue to
(1) add a synchronization barrier so all sender threads start at the same time
(e.g., use a CountDownLatch or CyclicBarrier before calling
MessageSender.sendMsg()), (2) after releasing threads, poll
ThreadFacade.getChainTaskInfo("http-send-in-queue") with a short sleep loop
until the chain appears or runningTask.size() > 0 (with a timeout), and only
then perform the assertion that info.runningTask.size() <= 1; reference
MessageSender.sendMsg(), ThreadFacade.getChainTaskInfo("http-send-in-queue") and
the test method that launches the threads to implement these changes.
---
Outside diff comments:
In
`@test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy`:
- Around line 60-65: The test method testUnexpectedManagementNodeRecord
erroneously calls prepareInvalidRecords() twice, creating 4 records instead of
the intended 2 and causing the subsequent assertions (assert count == 2) to
fail; fix it by removing the second call to prepareInvalidRecords() inside
testUnexpectedManagementNodeRecord so that prepareInvalidRecords() is only
invoked once at test start, leaving the subsequent heartbeat config lines
(ManagementNodeGlobalConfig.NODE_HEARTBEAT_INTERVAL.updateValue and
PortalGlobalProperty.MAX_HEARTBEAT_FAILURE) and the rest of the test unchanged.
---
Nitpick comments:
In `@core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java`:
- Around line 35-53: The current loadTaskInfo(TaskInfo info, AbstractChainTask
task, int index) sets info.setContext to "\n" when contextList is empty; change
this to set an empty string ("" ) instead to avoid emitting a lone
newline—update the final assignment that uses info.getContextList().isEmpty() ?
"\n" : String.format("\nCONTEXT: %s\n", ...) to use "" for the empty-case so
callers of TaskInfo.getContext() receive an empty value when there is no
context; touch the loadTaskInfo method and the TaskInfo.setContext/getContext
usage if any consumers expect non-empty formatting.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 33f8364e-5f95-493f-a86a-7f10f677b8c4
⛔ Files ignored due to path filters (1)
conf/errorCodes/sys.xmlis excluded by!**/*.xml
📒 Files selected for processing (14)
core/src/main/java/org/zstack/core/Platform.javacore/src/main/java/org/zstack/core/cloudbus/CloudBus.javacore/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.javacore/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.javacore/src/main/java/org/zstack/core/thread/DispatchQueueImpl.javacore/src/main/java/org/zstack/core/thread/TaskInfoBuilder.javacore/src/main/java/org/zstack/core/thread/ThreadFacadeImpl.javaheader/src/main/java/org/zstack/header/core/progress/ChainInfo.javaheader/src/main/java/org/zstack/header/errorcode/SysErrors.javaportal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.javatest/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovytest/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovytest/src/test/groovy/org/zstack/test/integration/core/cloudbus/MessageSender.javatest/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy
| for (InetAddress ia : Collections.list(iface.getInetAddresses())) { | ||
| ip = ia.getHostAddress(); | ||
| if (ia instanceof Inet4Address) { | ||
| // we prefer IPv4 address | ||
| ip = ia.getHostAddress(); | ||
| break; | ||
| } | ||
| } |
There was a problem hiding this comment.
IPv4 不存在时的回退地址选择存在不稳定行为。
在 Line [822] 每次循环都会覆盖 ip。如果该网卡没有 IPv4,最终会返回“最后一个”地址,可能是不可用的链路本地 IPv6,导致管理地址选取不稳定。
🔧 建议修复
if (defaultLine.contains(name)) {
- for (InetAddress ia : Collections.list(iface.getInetAddresses())) {
- ip = ia.getHostAddress();
- if (ia instanceof Inet4Address) {
- // we prefer IPv4 address
- ip = ia.getHostAddress();
- break;
- }
- }
+ String selectedIp = null;
+ for (InetAddress ia : Collections.list(iface.getInetAddresses())) {
+ String candidate = ia.getHostAddress();
+ if (ia instanceof Inet4Address) {
+ // we prefer IPv4 address
+ selectedIp = candidate;
+ break;
+ }
+ if (selectedIp == null && !ia.isLoopbackAddress() && !ia.isLinkLocalAddress()) {
+ selectedIp = candidate;
+ }
+ }
+ if (selectedIp != null) {
+ ip = selectedIp;
+ break;
+ }
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@core/src/main/java/org/zstack/core/Platform.java` around lines 821 - 828, The
loop that sets ip for each InetAddress (using iface and ip variables with
InetAddress/Inet4Address) currently overwrites ip on every iteration and breaks
only when encountering an IPv4, which makes the fallback address unpredictable
when no IPv4 exists; change the logic so that you immediately break and use the
IPv4 address when found (ia instanceof Inet4Address), but otherwise only assign
a fallback ip once (e.g. first non-link-local address or first global-scope
address) and never overwrite that fallback on subsequent iterations—skip
link-local addresses when selecting the fallback so the chosen non-IPv4 address
is stable and usable if no IPv4 is present.
portal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.java
Show resolved
Hide resolved
| FutureCompletion completion = bus.send(msg) | ||
| completion.await(5000L) | ||
| assert !completion.isSuccess() |
There was a problem hiding this comment.
这里没有区分“发送失败”和“根本没完成”。
await(5000L) 的结果被忽略,后面只断言 !completion.isSuccess()。如果 FutureCompletion 在 5 秒内根本没有完成,这里一样会通过,正好把 send confirm 的回归漏掉。建议至少补一个完成后的错误态断言,和下面回调分支保持一致。
建议修改
FutureCompletion completion = bus.send(msg)
completion.await(5000L)
assert !completion.isSuccess()
+ assert completion.getErrorCode() != null📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| FutureCompletion completion = bus.send(msg) | |
| completion.await(5000L) | |
| assert !completion.isSuccess() | |
| FutureCompletion completion = bus.send(msg) | |
| completion.await(5000L) | |
| assert !completion.isSuccess() | |
| assert completion.getErrorCode() != null |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy`
around lines 398 - 400, The test currently ignores whether FutureCompletion
actually finished within 5000ms — it calls completion.await(5000L) but only
asserts !completion.isSuccess(), which will also pass if the future never
completed; update the assertion to verify the await returned (i.e. completion
completed) and then assert the error state like the callback branch: call
completion.await(5000L) and assert that it indicates completion (not timed out)
and that completion.isSuccess() is false (or check
completion.getError()/hasError), tying this to the same FutureCompletion
returned by bus.send(msg) so the test fails if send never completes.
| def threads = [] | ||
| 1.upto(50) { i -> | ||
| Thread t = Thread.start { | ||
| new MessageSender().sendMsg() | ||
| } | ||
| threads << t | ||
| } | ||
|
|
||
| ThreadFacade thdf = bean(ThreadFacade.class) | ||
|
|
||
| ChainInfo info = thdf.getChainTaskInfo("http-send-in-queue") | ||
| assert info.runningTask.size() <= 1 | ||
| sleep(100) | ||
| info = thdf.getChainTaskInfo("http-send-in-queue") | ||
| assert info.runningTask.size() <= 1 |
There was a problem hiding this comment.
testSendQueue() 现在可能会“空跑通过”。
50 个线程启动后立即读取 http-send-in-queue,没有任何同步点保证发送任务已经入队。这样即使队列串行化退化,这个断言也可能在链路尚未建立时直接通过,变成假阳性。建议先对齐起跑,再等待链路出现后采样。
建议修改
void testSendQueue() {
def threads = []
+ CountDownLatch ready = new CountDownLatch(50)
+ CountDownLatch start = new CountDownLatch(1)
1.upto(50) { i ->
Thread t = Thread.start {
+ ready.countDown()
+ start.await()
new MessageSender().sendMsg()
}
threads << t
}
+ assert ready.await(5, TimeUnit.SECONDS)
+ start.countDown()
ThreadFacade thdf = bean(ThreadFacade.class)
-
- ChainInfo info = thdf.getChainTaskInfo("http-send-in-queue")
- assert info.runningTask.size() <= 1
- sleep(100)
- info = thdf.getChainTaskInfo("http-send-in-queue")
- assert info.runningTask.size() <= 1
+ retryInSecs {
+ ChainInfo info = thdf.getChainTaskInfo("http-send-in-queue")
+ assert info != null
+ assert info.runningTask.size() <= 1
+ }
threads.each { it.join() }
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy`
around lines 425 - 439, The test currently spawns 50 threads that call
MessageSender.sendMsg() and immediately samples
ThreadFacade.getChainTaskInfo("http-send-in-queue"), which can produce a false
pass because messages may not have been enqueued yet; modify testSendQueue to
(1) add a synchronization barrier so all sender threads start at the same time
(e.g., use a CountDownLatch or CyclicBarrier before calling
MessageSender.sendMsg()), (2) after releasing threads, poll
ThreadFacade.getChainTaskInfo("http-send-in-queue") with a short sleep loop
until the chain appears or runningTask.size() > 0 (with a timeout), and only
then perform the assertion that info.runningTask.size() <= 1; reference
MessageSender.sendMsg(), ThreadFacade.getChainTaskInfo("http-send-in-queue") and
the test method that launches the threads to implement these changes.
cedb855 to
79e1cff
Compare
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java (1)
566-591:⚠️ Potential issue | 🟠 Major
httpSendInQueue中异常处理不完整,可能导致调用者挂起。如果
httpSend(ip)抛出异常,sendCompletion.success()不会被调用,FutureCompletion将永远不会完成。调用await()的调用者将无限期阻塞。建议添加异常处理
private FutureCompletion httpSendInQueue(String ip) { FutureCompletion sendCompletion = new FutureCompletion(null); thdf.chainSubmit(new ChainTask(sendCompletion) { `@Override` public String getSyncSignature() { return "http-send-in-queue"; } `@Override` public void run(SyncTaskChain chain) { - httpSend(ip); - sendCompletion.success(); + try { + httpSend(ip); + sendCompletion.success(); + } catch (Throwable t) { + sendCompletion.fail(err(CLOUD_BUS_SEND_ERROR, t.getMessage())); + } chain.next(); } `@Override` protected int getSyncLevel() { return CloudBusGlobalProperty.HTTP_MAX_CONN; } `@Override` public String getName() { return getSyncSignature(); } }); return sendCompletion; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java` around lines 566 - 591, The httpSendInQueue task currently calls httpSend(ip) without catching exceptions, so if httpSend throws the FutureCompletion sendCompletion is never completed and callers block; wrap the httpSend(ip) invocation in try/catch/finally inside the ChainTask.run: on success call sendCompletion.success(), on exception complete the FutureCompletion with failure (e.g. sendCompletion.fail(...) or equivalent) passing the exception/error details, and ensure chain.next() is always invoked in a finally block so the sync chain continues.
♻️ Duplicate comments (1)
test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy (1)
425-443:⚠️ Potential issue | 🟡 Minor
testSendQueue()存在竞态条件风险。50 个线程启动后立即读取
http-send-in-queue的状态,没有同步机制保证发送任务已经入队。这可能导致测试在任务尚未入队时就通过断言,产生假阳性结果。建议添加同步屏障确保所有线程同时开始发送,并等待队列中有任务后再进行采样:
建议修改
void testSendQueue() { def threads = [] + CountDownLatch ready = new CountDownLatch(50) + CountDownLatch start = new CountDownLatch(1) 1.upto(50) { i -> Thread t = Thread.start { + ready.countDown() + start.await() new MessageSender().sendMsg() } threads << t } + ready.await(5, TimeUnit.SECONDS) + start.countDown() ThreadFacade thdf = bean(ThreadFacade.class) - ChainInfo info = thdf.getChainTaskInfo("http-send-in-queue") - assert info.runningTask.size() <= 1 - sleep(100) - info = thdf.getChainTaskInfo("http-send-in-queue") - assert info.runningTask.size() <= 1 + retryInSecs { + ChainInfo info = thdf.getChainTaskInfo("http-send-in-queue") + assert info != null + assert info.runningTask.size() <= 1 + } threads.each { it.join() } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy` around lines 425 - 443, testSendQueue() has a race: it samples ThreadFacade.getChainTaskInfo("http-send-in-queue") immediately after starting 50 threads so the messages may not have been enqueued yet; modify testSendQueue() to coordinate thread start and ensure the queue contains tasks before asserting by using a synchronization barrier (e.g., CountDownLatch or CyclicBarrier) so all MessageSender.sendMsg() threads wait to start together, then wait/poll ThreadFacade.getChainTaskInfo("http-send-in-queue") until runningTask.size() reflects enqueued work (or a short timeout) before performing the assert; reference the testSendQueue(), MessageSender.sendMsg(), ThreadFacade and getChainTaskInfo("http-send-in-queue") symbols when making the change.
🧹 Nitpick comments (1)
test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy (1)
100-114: 使用sleep可能导致测试不稳定。在
c.await()之后使用sleep(1000)来等待异步操作完成,这种方式可能在 CI 环境或系统负载较高时导致测试不稳定。建议考虑使用retryInSecs或其他同步机制来确保断言的可靠性。不过考虑到这是测试代码且原有的
retryInSecs逻辑被简化,当前实现可以接受。🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy` around lines 100 - 114, The test uses sleep(1000) after c.await(), which can make it flaky; replace the sleep-based wait with a deterministic retry/polling loop (e.g., retryInSecs or polling with a timeout) that checks the same condition you're asserting against (mgr.managementNodeNotFoundHandlers.asMap().values().message does not contain reply) until it becomes true or a timeout is reached; update the block around c.await(), bus.registerService(secondNodeService), mockAManagementNode(secondNodeUuid) and cleanup()/bus.unregisterService(secondNodeService) to remove sleep and use the retry/poll mechanism to ensure the assertion is stable.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java`:
- Around line 566-591: The httpSendInQueue task currently calls httpSend(ip)
without catching exceptions, so if httpSend throws the FutureCompletion
sendCompletion is never completed and callers block; wrap the httpSend(ip)
invocation in try/catch/finally inside the ChainTask.run: on success call
sendCompletion.success(), on exception complete the FutureCompletion with
failure (e.g. sendCompletion.fail(...) or equivalent) passing the
exception/error details, and ensure chain.next() is always invoked in a finally
block so the sync chain continues.
---
Duplicate comments:
In
`@test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy`:
- Around line 425-443: testSendQueue() has a race: it samples
ThreadFacade.getChainTaskInfo("http-send-in-queue") immediately after starting
50 threads so the messages may not have been enqueued yet; modify
testSendQueue() to coordinate thread start and ensure the queue contains tasks
before asserting by using a synchronization barrier (e.g., CountDownLatch or
CyclicBarrier) so all MessageSender.sendMsg() threads wait to start together,
then wait/poll ThreadFacade.getChainTaskInfo("http-send-in-queue") until
runningTask.size() reflects enqueued work (or a short timeout) before performing
the assert; reference the testSendQueue(), MessageSender.sendMsg(), ThreadFacade
and getChainTaskInfo("http-send-in-queue") symbols when making the change.
---
Nitpick comments:
In
`@test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy`:
- Around line 100-114: The test uses sleep(1000) after c.await(), which can make
it flaky; replace the sleep-based wait with a deterministic retry/polling loop
(e.g., retryInSecs or polling with a timeout) that checks the same condition
you're asserting against
(mgr.managementNodeNotFoundHandlers.asMap().values().message does not contain
reply) until it becomes true or a timeout is reached; update the block around
c.await(), bus.registerService(secondNodeService),
mockAManagementNode(secondNodeUuid) and
cleanup()/bus.unregisterService(secondNodeService) to remove sleep and use the
retry/poll mechanism to ensure the assertion is stable.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 543c5406-130b-4628-a923-7e3bc97d328d
⛔ Files ignored due to path filters (1)
conf/errorCodes/sys.xmlis excluded by!**/*.xml
📒 Files selected for processing (14)
core/src/main/java/org/zstack/core/Platform.javacore/src/main/java/org/zstack/core/cloudbus/CloudBus.javacore/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.javacore/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.javacore/src/main/java/org/zstack/core/thread/DispatchQueueImpl.javacore/src/main/java/org/zstack/core/thread/TaskInfoBuilder.javacore/src/main/java/org/zstack/core/thread/ThreadFacadeImpl.javaheader/src/main/java/org/zstack/header/core/progress/ChainInfo.javaheader/src/main/java/org/zstack/header/errorcode/SysErrors.javaportal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.javatest/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovytest/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovytest/src/test/groovy/org/zstack/test/integration/core/cloudbus/MessageSender.javatest/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy
✅ Files skipped from review due to trivial changes (2)
- core/src/main/java/org/zstack/core/thread/ThreadFacadeImpl.java
- header/src/main/java/org/zstack/header/errorcode/SysErrors.java
🚧 Files skipped from review as they are similar to previous changes (4)
- test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy
- test/src/test/groovy/org/zstack/test/integration/core/cloudbus/MessageSender.java
- core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.java
- core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java
1. send in CloudBus means submit a message send task. function return do not means sending completion. so there is no way to set a limition in sender. add a FutureCompletion to confirm message send. provide a way to limited send. 2. fix mn heartbeat thread interrupt handle 3. refactor duplicate code for getChainInfo, fix synchronized target. This patch is cherry-picked from ZSTAC-71213 (commit: af58834) Resolves: ZSV-11510 Change-Id: I686361627566737a7775706263796b736967666c
79e1cff to
19fa46e
Compare
send in CloudBus means submit a message send task.
function return do not means sending completion.
so there is no way to set a limition in sender.
add a FutureCompletion to confirm message send.
provide a way to limited send.
fix mn heartbeat thread interrupt handle
refactor duplicate code for getChainInfo, fix synchronized target.
This patch is cherry-picked from ZSTAC-71213
(commit: af58834)
Resolves: ZSV-11510
Change-Id: I686361627566737a7775706263796b736967666c
sync from gitlab !9380