Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 150 additions & 7 deletions compute/src/main/java/org/zstack/compute/vm/MigrateVmLongJob.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
package org.zstack.compute.vm;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.ThreadContext;
import org.springframework.beans.factory.annotation.Autowire;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Configurable;
import org.zstack.core.Platform;
import org.zstack.core.cloudbus.CloudBus;
import org.zstack.core.cloudbus.CloudBusCallBack;
import org.zstack.core.db.DatabaseFacade;
import org.zstack.core.db.Q;
import org.zstack.core.thread.ThreadFacade;
import org.zstack.header.Constants;
import org.zstack.header.core.Completion;
import org.zstack.header.core.ReturnValueCompletion;
import org.zstack.header.longjob.LongJobErrors;
import org.zstack.header.longjob.LongJobFor;
import org.zstack.header.longjob.LongJobVO;
import org.zstack.header.errorcode.ErrorCode;
import org.zstack.header.longjob.*;
import org.zstack.header.message.APIEvent;
import org.zstack.header.message.MessageReply;
import org.zstack.header.vm.*;
import org.zstack.header.longjob.LongJob;
import org.zstack.longjob.LongJobUtils;
import org.zstack.header.volume.GetVolumeTaskMsg;
import org.zstack.header.volume.GetVolumeTaskReply;
import org.zstack.header.volume.VolumeConstant;
import org.zstack.header.volume.VolumeVO;
import org.zstack.header.volume.VolumeVO_;
import org.zstack.utils.gson.JSONObjectUtil;

import static org.zstack.core.Platform.err;
import java.util.*;
import java.util.concurrent.TimeUnit;

import static org.zstack.core.Platform.operr;


Expand All @@ -31,16 +38,152 @@
@LongJobFor(APIMigrateVmMsg.class)
@Configurable(preConstruction = true, autowire = Autowire.BY_TYPE)
public class MigrateVmLongJob implements LongJob {
private static final Logger logger = LogManager.getLogger(MigrateVmLongJob.class);
private static final int WAIT_CHAIN_TASK_EXIT_MAX_RETRIES = 30;
private static final long WAIT_CHAIN_TASK_EXIT_INTERVAL_SECS = 1;

@Autowired
protected CloudBus bus;
@Autowired
protected DatabaseFacade dbf;
@Autowired
private ThreadFacade thdf;

protected String auditResourceUuid;

@Override
public void start(LongJobVO job, ReturnValueCompletion<APIEvent> completion) {
MigrateVmInnerMsg msg = JSONObjectUtil.toObject(job.getJobData(), MigrateVmInnerMsg.class);

List<String> backupTaskLongJobUuids = getBackupTaskLongJobUuids(job.getJobData());
if (backupTaskLongJobUuids != null && !backupTaskLongJobUuids.isEmpty()) {
logger.info(String.format("migrate vm[uuid:%s] longjob has %d backup longjobs to cancel first",
msg.getVmInstanceUuid(), backupTaskLongJobUuids.size()));
cancelBackupLongJobsThenMigrate(backupTaskLongJobUuids, msg, completion);
} else {
doMigrate(msg, completion);
}
}

private List<String> getBackupTaskLongJobUuids(String jobData) {
Map<String, Object> raw = JSONObjectUtil.toObject(jobData, LinkedHashMap.class);
Object uuids = raw == null ? null : raw.get("backupTaskLongJobUuids");
if (!(uuids instanceof List<?>)) {
return null;
}

List<String> result = new ArrayList<>();
for (Object item : (List<?>) uuids) {
if (item == null) {
continue;
}
String uuid = String.valueOf(item).trim();
if (!uuid.isEmpty()) {
result.add(uuid);
}
}
return result.isEmpty() ? null : result;
}

private void cancelBackupLongJobsThenMigrate(List<String> backupTaskLongJobUuids,
MigrateVmInnerMsg msg,
ReturnValueCompletion<APIEvent> completion) {
cancelBackupLongJobs(backupTaskLongJobUuids.iterator(), new Completion(completion) {
@Override
public void success() {
waitForVolumeChainTasksExit(msg.getVmInstanceUuid(), WAIT_CHAIN_TASK_EXIT_MAX_RETRIES,
new Completion(completion) {
@Override
public void success() {
doMigrate(msg, completion);
}
Comment on lines +88 to +99
Copy link

@coderabbitai coderabbitai bot Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

新增预处理阶段缺少“已取消”门禁,存在取消后仍迁移的风险。

现在 start() 会先执行“取消备份 + 轮询等待”再 doMigrate(),但这条异步链路里没有任何 longjob 取消态检查;cancel() 仅发送 CancelMigrateVmMsg。如果用户在预处理阶段点击取消,后续仍可能进入 doMigrate()。建议在每次异步回调继续前检查 longjob 状态,已取消则立即终止。

Also applies to: 166-171, 197-212

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@compute/src/main/java/org/zstack/compute/vm/MigrateVmLongJob.java` around
lines 78 - 89, The async chain in start() (specifically in
cancelBackupLongJobsThenMigrate -> cancelBackupLongJobs callback ->
waitForVolumeChainTasksExit callback -> doMigrate) lacks a check for the LongJob
being canceled, so a user cancel can still allow doMigrate() to run; update each
asynchronous continuation (the anonymous Completion.success() handlers in
cancelBackupLongJobsThenMigrate, the similar callbacks at the other locations
(around lines 166-171 and 197-212)) to first query the long job status (via the
job UUID accessible in this class) and if the status is CANCELED/TERMINATED,
short-circuit by calling completion.success()/fail() as appropriate and do not
call doMigrate(); also ensure cancel() (which sends CancelMigrateVmMsg) sets the
job status so that these checks observe cancellation before proceeding.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment from ye.zou:

这是 LongJob 框架层面的通用缺陷——start() 和 cancel() 跑在不同线程/节点,框架没有提供取消信号传递机制。在单个 LongJob bugfix MR 里加 volatile 标志不是正确的修法,且多节点场景下内存标志也不够。

已开 ZSTAC-83470 跟踪框架改进:由框架提供 isCancelled(jobUuid) 查询(走 DB 保证多节点一致),所有 LongJob 实现在异步回调 checkpoint 统一调用。

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zstack-robot-1,感谢 ye.zou 的说明。理解了——start()cancel() 运行于不同线程/节点,框架层面缺乏统一的取消信号传递机制,在单个 bugfix PR 里添加内存标志(volatile)并不是正确的解法,多节点场景下也不具备一致性保证。在 ZSTAC-83470 中由框架统一提供基于 DB 的 isCancelled(jobUuid) 查询、并在所有 LongJob 实现的异步回调 checkpoint 处统一调用,是更合理的系统性修复方向。此 PR 范围内无需处理,感谢跟踪。


✏️ Learnings added
Learnt from: zstack-robot-1
Repo: MatheMatrix/zstack PR: 3530
File: compute/src/main/java/org/zstack/compute/vm/MigrateVmLongJob.java:78-89
Timestamp: 2026-03-19T07:19:58.791Z
Learning: In the ZStack codebase (MigrateVmLongJob.java and LongJob framework in general), start() and cancel() run on different threads/nodes. The framework provides no built-in cancellation signal propagation mechanism between them. Adding an in-memory volatile flag in a single LongJob implementation is insufficient for multi-node scenarios. The correct fix is a framework-level isCancelled(jobUuid) query backed by the DB (for multi-node consistency), to be called at async callback checkpoints in all LongJob implementations. This is tracked under ZSTAC-83470.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


@Override
public void fail(ErrorCode errorCode) {
completion.fail(errorCode);
}
});
}

@Override
public void fail(ErrorCode errorCode) {
logger.warn(String.format("failed to cancel backup longjobs for vm[uuid:%s], " +
"attempting migration anyway: %s", msg.getVmInstanceUuid(), errorCode));
doMigrate(msg, completion);
}
});
}

private void cancelBackupLongJobs(Iterator<String> it, Completion completion) {
if (!it.hasNext()) {
completion.success();
return;
}

String longJobUuid = it.next();
CancelLongJobMsg cmsg = new CancelLongJobMsg();
cmsg.setUuid(longJobUuid);
bus.makeLocalServiceId(cmsg, LongJobConstants.SERVICE_ID);
bus.send(cmsg, new CloudBusCallBack(completion) {
@Override
public void run(MessageReply reply) {
if (!reply.isSuccess()) {
logger.warn(String.format("failed to cancel backup longjob[uuid:%s]: %s",
longJobUuid, reply.getError()));
}
cancelBackupLongJobs(it, completion);
}
Comment on lines +117 to +135
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

单个备份 LongJob 的取消失败现在被吞掉了。

Line 130-135 只记日志后继续递归,所以只要迭代器走完,外层就一定会进入后续等待流程。这样一来,单个取消失败不会走到 Line 109-113 的“继续迁移”兜底,反而更可能在等待阶段超时并把迁移直接打失败。建议把任一取消失败汇总后显式回传给调用方,让外层按既有兜底路径处理。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@compute/src/main/java/org/zstack/compute/vm/MigrateVmLongJob.java` around
lines 117 - 135, The cancelBackupLongJobs recursion currently swallows
individual CancelLongJobMsg failures (in the CloudBusCallBack for
cancelBackupLongJobs) by only logging and continuing, which prevents the caller
from seeing cancellation errors; modify cancelBackupLongJobs (and its
CloudBusCallBack) so that on a non-successful reply you propagate the error to
the provided Completion (e.g., call completion.fail(reply.getError()) or
otherwise record the failure and stop further recursion), and only call
completion.success() when all cancels succeed; reference cancelBackupLongJobs,
CancelLongJobMsg, CloudBusCallBack, and the completion parameter when making
this change.

});
}

private void waitForVolumeChainTasksExit(String vmUuid, int retriesLeft, Completion completion) {
List<String> volUuids = Q.New(VolumeVO.class)
.eq(VolumeVO_.vmInstanceUuid, vmUuid)
.select(VolumeVO_.uuid)
.listValues();

if (volUuids.isEmpty()) {
completion.success();
return;
}

GetVolumeTaskMsg gmsg = new GetVolumeTaskMsg();
gmsg.setVolumeUuids(volUuids);
bus.makeLocalServiceId(gmsg, VolumeConstant.SERVICE_ID);
bus.send(gmsg, new CloudBusCallBack(completion) {
@Override
public void run(MessageReply reply) {
if (!reply.isSuccess()) {
completion.fail(reply.getError());
return;
}

GetVolumeTaskReply gr = reply.castReply();
boolean hasRunningTasks = gr.getResults().values().stream()
.anyMatch(info -> !info.getRunningTask().isEmpty());

if (!hasRunningTasks) {
completion.success();
return;
}

if (retriesLeft <= 0) {
completion.fail(operr(
"timeout waiting for volume backup chain tasks to exit for vm[uuid:%s]", vmUuid));
return;
}

logger.debug(String.format(
"volumes of vm[uuid:%s] still have running tasks, retry in %ds (retries left: %d)",
vmUuid, WAIT_CHAIN_TASK_EXIT_INTERVAL_SECS, retriesLeft));
Comment on lines +161 to +178
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

这里等待的是“任意卷任务”,不是“backup chain task”。

Line 162-163 只判断 runningTask 是否非空,没有按任务类型或上下文筛选。只要 VM 上还有其他合法卷任务,这条新链路就会额外阻塞 30 秒,甚至把原本可迁移的请求直接判成超时失败。建议把判断范围收窄到这次取消动作关联的 backup chain task。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@compute/src/main/java/org/zstack/compute/vm/MigrateVmLongJob.java` around
lines 161 - 178, The current check using GetVolumeTaskReply gr ->
getResults().values().stream().anyMatch(info ->
!info.getRunningTask().isEmpty()) treats any running volume task as blocking;
narrow the predicate to only consider the backup-chain tasks relevant to this
cancel operation (inspect the running task entries returned by
GetVolumeTaskReply for a task type/name or context field and match that to the
backup-chain identifier or context used by the cancel operation), e.g. replace
the anyMatch condition to detect only tasks whose type/name/context indicates
the backup chain for this VM (use symbols like GetVolumeTaskReply,
info.getRunningTask(), vmUuid and the surrounding retry logic using
WAIT_CHAIN_TASK_EXIT_INTERVAL_SECS/retriesLeft/completion.fail/success to ensure
only backup-chain tasks trigger the retry/fail path).

thdf.submitTimeoutTask(
() -> waitForVolumeChainTasksExit(vmUuid, retriesLeft - 1, completion),
TimeUnit.SECONDS, WAIT_CHAIN_TASK_EXIT_INTERVAL_SECS);
}
});
}

private void doMigrate(MigrateVmInnerMsg msg, ReturnValueCompletion<APIEvent> completion) {
bus.makeTargetServiceIdByResourceUuid(msg, VmInstanceConstant.SERVICE_ID, msg.getVmInstanceUuid());
bus.send(msg, new CloudBusCallBack(completion) {
@Override
Expand Down