-
Notifications
You must be signed in to change notification settings - Fork 0
fix(compute): cancel backup longjobs before migration when backupTaskLongJobUuids present #3530
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 5.5.12
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,27 +1,34 @@ | ||
| package org.zstack.compute.vm; | ||
|
|
||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.apache.logging.log4j.ThreadContext; | ||
| import org.springframework.beans.factory.annotation.Autowire; | ||
| import org.springframework.beans.factory.annotation.Autowired; | ||
| import org.springframework.beans.factory.annotation.Configurable; | ||
| import org.zstack.core.Platform; | ||
| import org.zstack.core.cloudbus.CloudBus; | ||
| import org.zstack.core.cloudbus.CloudBusCallBack; | ||
| import org.zstack.core.db.DatabaseFacade; | ||
| import org.zstack.core.db.Q; | ||
| import org.zstack.core.thread.ThreadFacade; | ||
| import org.zstack.header.Constants; | ||
| import org.zstack.header.core.Completion; | ||
| import org.zstack.header.core.ReturnValueCompletion; | ||
| import org.zstack.header.longjob.LongJobErrors; | ||
| import org.zstack.header.longjob.LongJobFor; | ||
| import org.zstack.header.longjob.LongJobVO; | ||
| import org.zstack.header.errorcode.ErrorCode; | ||
| import org.zstack.header.longjob.*; | ||
| import org.zstack.header.message.APIEvent; | ||
| import org.zstack.header.message.MessageReply; | ||
| import org.zstack.header.vm.*; | ||
| import org.zstack.header.longjob.LongJob; | ||
| import org.zstack.longjob.LongJobUtils; | ||
| import org.zstack.header.volume.GetVolumeTaskMsg; | ||
| import org.zstack.header.volume.GetVolumeTaskReply; | ||
| import org.zstack.header.volume.VolumeConstant; | ||
| import org.zstack.header.volume.VolumeVO; | ||
| import org.zstack.header.volume.VolumeVO_; | ||
| import org.zstack.utils.gson.JSONObjectUtil; | ||
|
|
||
| import static org.zstack.core.Platform.err; | ||
| import java.util.*; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import static org.zstack.core.Platform.operr; | ||
|
|
||
|
|
||
|
|
@@ -31,16 +38,152 @@ | |
| @LongJobFor(APIMigrateVmMsg.class) | ||
| @Configurable(preConstruction = true, autowire = Autowire.BY_TYPE) | ||
| public class MigrateVmLongJob implements LongJob { | ||
| private static final Logger logger = LogManager.getLogger(MigrateVmLongJob.class); | ||
| private static final int WAIT_CHAIN_TASK_EXIT_MAX_RETRIES = 30; | ||
| private static final long WAIT_CHAIN_TASK_EXIT_INTERVAL_SECS = 1; | ||
|
|
||
| @Autowired | ||
| protected CloudBus bus; | ||
| @Autowired | ||
| protected DatabaseFacade dbf; | ||
| @Autowired | ||
| private ThreadFacade thdf; | ||
|
|
||
| protected String auditResourceUuid; | ||
|
|
||
| @Override | ||
| public void start(LongJobVO job, ReturnValueCompletion<APIEvent> completion) { | ||
| MigrateVmInnerMsg msg = JSONObjectUtil.toObject(job.getJobData(), MigrateVmInnerMsg.class); | ||
|
|
||
| List<String> backupTaskLongJobUuids = getBackupTaskLongJobUuids(job.getJobData()); | ||
| if (backupTaskLongJobUuids != null && !backupTaskLongJobUuids.isEmpty()) { | ||
| logger.info(String.format("migrate vm[uuid:%s] longjob has %d backup longjobs to cancel first", | ||
| msg.getVmInstanceUuid(), backupTaskLongJobUuids.size())); | ||
| cancelBackupLongJobsThenMigrate(backupTaskLongJobUuids, msg, completion); | ||
| } else { | ||
| doMigrate(msg, completion); | ||
| } | ||
| } | ||
|
|
||
| private List<String> getBackupTaskLongJobUuids(String jobData) { | ||
| Map<String, Object> raw = JSONObjectUtil.toObject(jobData, LinkedHashMap.class); | ||
| Object uuids = raw == null ? null : raw.get("backupTaskLongJobUuids"); | ||
| if (!(uuids instanceof List<?>)) { | ||
| return null; | ||
| } | ||
|
|
||
| List<String> result = new ArrayList<>(); | ||
| for (Object item : (List<?>) uuids) { | ||
| if (item == null) { | ||
| continue; | ||
| } | ||
| String uuid = String.valueOf(item).trim(); | ||
| if (!uuid.isEmpty()) { | ||
| result.add(uuid); | ||
| } | ||
| } | ||
| return result.isEmpty() ? null : result; | ||
| } | ||
|
|
||
| private void cancelBackupLongJobsThenMigrate(List<String> backupTaskLongJobUuids, | ||
| MigrateVmInnerMsg msg, | ||
| ReturnValueCompletion<APIEvent> completion) { | ||
| cancelBackupLongJobs(backupTaskLongJobUuids.iterator(), new Completion(completion) { | ||
| @Override | ||
| public void success() { | ||
| waitForVolumeChainTasksExit(msg.getVmInstanceUuid(), WAIT_CHAIN_TASK_EXIT_MAX_RETRIES, | ||
| new Completion(completion) { | ||
| @Override | ||
| public void success() { | ||
| doMigrate(msg, completion); | ||
| } | ||
|
|
||
| @Override | ||
| public void fail(ErrorCode errorCode) { | ||
| completion.fail(errorCode); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public void fail(ErrorCode errorCode) { | ||
| logger.warn(String.format("failed to cancel backup longjobs for vm[uuid:%s], " + | ||
| "attempting migration anyway: %s", msg.getVmInstanceUuid(), errorCode)); | ||
| doMigrate(msg, completion); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| private void cancelBackupLongJobs(Iterator<String> it, Completion completion) { | ||
| if (!it.hasNext()) { | ||
| completion.success(); | ||
| return; | ||
| } | ||
|
|
||
| String longJobUuid = it.next(); | ||
| CancelLongJobMsg cmsg = new CancelLongJobMsg(); | ||
| cmsg.setUuid(longJobUuid); | ||
| bus.makeLocalServiceId(cmsg, LongJobConstants.SERVICE_ID); | ||
| bus.send(cmsg, new CloudBusCallBack(completion) { | ||
| @Override | ||
| public void run(MessageReply reply) { | ||
| if (!reply.isSuccess()) { | ||
| logger.warn(String.format("failed to cancel backup longjob[uuid:%s]: %s", | ||
| longJobUuid, reply.getError())); | ||
| } | ||
| cancelBackupLongJobs(it, completion); | ||
| } | ||
|
Comment on lines
+117
to
+135
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 单个备份 LongJob 的取消失败现在被吞掉了。 Line 130-135 只记日志后继续递归,所以只要迭代器走完,外层就一定会进入后续等待流程。这样一来,单个取消失败不会走到 Line 109-113 的“继续迁移”兜底,反而更可能在等待阶段超时并把迁移直接打失败。建议把任一取消失败汇总后显式回传给调用方,让外层按既有兜底路径处理。 🤖 Prompt for AI Agents |
||
| }); | ||
| } | ||
|
|
||
| private void waitForVolumeChainTasksExit(String vmUuid, int retriesLeft, Completion completion) { | ||
| List<String> volUuids = Q.New(VolumeVO.class) | ||
| .eq(VolumeVO_.vmInstanceUuid, vmUuid) | ||
| .select(VolumeVO_.uuid) | ||
| .listValues(); | ||
|
|
||
| if (volUuids.isEmpty()) { | ||
| completion.success(); | ||
| return; | ||
| } | ||
|
|
||
| GetVolumeTaskMsg gmsg = new GetVolumeTaskMsg(); | ||
| gmsg.setVolumeUuids(volUuids); | ||
| bus.makeLocalServiceId(gmsg, VolumeConstant.SERVICE_ID); | ||
| bus.send(gmsg, new CloudBusCallBack(completion) { | ||
| @Override | ||
| public void run(MessageReply reply) { | ||
| if (!reply.isSuccess()) { | ||
| completion.fail(reply.getError()); | ||
| return; | ||
| } | ||
|
|
||
| GetVolumeTaskReply gr = reply.castReply(); | ||
| boolean hasRunningTasks = gr.getResults().values().stream() | ||
| .anyMatch(info -> !info.getRunningTask().isEmpty()); | ||
|
|
||
| if (!hasRunningTasks) { | ||
| completion.success(); | ||
| return; | ||
| } | ||
|
|
||
| if (retriesLeft <= 0) { | ||
| completion.fail(operr( | ||
| "timeout waiting for volume backup chain tasks to exit for vm[uuid:%s]", vmUuid)); | ||
| return; | ||
| } | ||
|
|
||
| logger.debug(String.format( | ||
| "volumes of vm[uuid:%s] still have running tasks, retry in %ds (retries left: %d)", | ||
| vmUuid, WAIT_CHAIN_TASK_EXIT_INTERVAL_SECS, retriesLeft)); | ||
|
Comment on lines
+161
to
+178
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 这里等待的是“任意卷任务”,不是“backup chain task”。 Line 162-163 只判断 🤖 Prompt for AI Agents |
||
| thdf.submitTimeoutTask( | ||
| () -> waitForVolumeChainTasksExit(vmUuid, retriesLeft - 1, completion), | ||
| TimeUnit.SECONDS, WAIT_CHAIN_TASK_EXIT_INTERVAL_SECS); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| private void doMigrate(MigrateVmInnerMsg msg, ReturnValueCompletion<APIEvent> completion) { | ||
| bus.makeTargetServiceIdByResourceUuid(msg, VmInstanceConstant.SERVICE_ID, msg.getVmInstanceUuid()); | ||
| bus.send(msg, new CloudBusCallBack(completion) { | ||
| @Override | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
新增预处理阶段缺少“已取消”门禁,存在取消后仍迁移的风险。
现在
start()会先执行“取消备份 + 轮询等待”再doMigrate(),但这条异步链路里没有任何 longjob 取消态检查;cancel()仅发送CancelMigrateVmMsg。如果用户在预处理阶段点击取消,后续仍可能进入doMigrate()。建议在每次异步回调继续前检查 longjob 状态,已取消则立即终止。Also applies to: 166-171, 197-212
🤖 Prompt for AI Agents
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment from ye.zou:
这是 LongJob 框架层面的通用缺陷——start() 和 cancel() 跑在不同线程/节点,框架没有提供取消信号传递机制。在单个 LongJob bugfix MR 里加 volatile 标志不是正确的修法,且多节点场景下内存标志也不够。
已开 ZSTAC-83470 跟踪框架改进:由框架提供 isCancelled(jobUuid) 查询(走 DB 保证多节点一致),所有 LongJob 实现在异步回调 checkpoint 统一调用。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zstack-robot-1,感谢 ye.zou 的说明。理解了——start()与cancel()运行于不同线程/节点,框架层面缺乏统一的取消信号传递机制,在单个 bugfix PR 里添加内存标志(volatile)并不是正确的解法,多节点场景下也不具备一致性保证。在 ZSTAC-83470 中由框架统一提供基于 DB 的isCancelled(jobUuid)查询、并在所有 LongJob 实现的异步回调 checkpoint 处统一调用,是更合理的系统性修复方向。此 PR 范围内无需处理,感谢跟踪。✏️ Learnings added