-
Notifications
You must be signed in to change notification settings - Fork 0
<fix>[compute]: <description #3262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: zsv_4.10.28
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| package org.zstack.compute.vm; | ||
|
|
||
| import org.springframework.beans.factory.annotation.Autowired; | ||
| import org.zstack.core.cloudbus.CloudBusCallBack; | ||
| import org.zstack.core.gc.GC; | ||
| import org.zstack.core.gc.GCCompletion; | ||
| import org.zstack.core.gc.TimeBasedGarbageCollector; | ||
| import org.zstack.core.thread.ChainTask; | ||
| import org.zstack.core.thread.SyncTaskChain; | ||
| import org.zstack.core.thread.ThreadFacade; | ||
| import org.zstack.header.core.progress.ChainInfo; | ||
| import org.zstack.header.message.MessageReply; | ||
| import org.zstack.header.vm.UpdateVmInstanceMetadataMsg; | ||
| import org.zstack.header.vm.VmInstanceConstant; | ||
| import org.zstack.header.vm.VmInstanceVO; | ||
|
|
||
| public class UpdateVmInstanceMetadataGC extends TimeBasedGarbageCollector { | ||
| @GC | ||
| public String vmInstanceUuid; | ||
|
|
||
| @Autowired | ||
| protected ThreadFacade thdf; | ||
|
|
||
| static public String getUpdateVmInstanceMetadataSyncSignature(String vmInstanceUuid) { | ||
| return String.format("update-vm-%s-metadata", vmInstanceUuid); | ||
| } | ||
|
|
||
| @Override | ||
| protected void triggerNow(GCCompletion completion) { | ||
| if (!dbf.isExist(vmInstanceUuid, VmInstanceVO.class)) { | ||
| completion.cancel(); | ||
| return; | ||
| } | ||
|
|
||
| String queueName = getUpdateVmInstanceMetadataSyncSignature(vmInstanceUuid); | ||
| ChainInfo chainInfo = thdf.getChainTaskInfo(queueName); | ||
| if (!chainInfo.getPendingTask().isEmpty()) { | ||
| completion.cancel(); | ||
| return; | ||
| } | ||
|
|
||
| thdf.chainSubmit(new ChainTask(completion) { | ||
| @Override | ||
| public String getSyncSignature() { | ||
| return queueName; | ||
| } | ||
|
|
||
| @Override | ||
| public void run(final SyncTaskChain chain) { | ||
| UpdateVmInstanceMetadataMsg msg = new UpdateVmInstanceMetadataMsg(); | ||
| msg.setUuid(vmInstanceUuid); | ||
| bus.makeTargetServiceIdByResourceUuid(msg, VmInstanceConstant.SERVICE_ID, vmInstanceUuid); | ||
| bus.send(msg, new CloudBusCallBack(completion) { | ||
| @Override | ||
| public void run(MessageReply reply) { | ||
| if (!reply.isSuccess()) { | ||
| completion.fail(reply.getError()); | ||
| } else { | ||
| completion.success(); | ||
| } | ||
| chain.next(); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public String getName() { | ||
| return queueName; | ||
| } | ||
| }); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import org.zstack.header.message.APIMessage; | ||
| import org.zstack.header.network.l2.*; | ||
| import org.zstack.header.network.l3.*; | ||
| import org.zstack.header.storage.primary.APIRegisterVmInstanceMsg; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Check if APIRegisterVmInstanceMsg is used in VmInstanceApiInterceptor.java
rg -n "APIRegisterVmInstanceMsg" compute/src/main/java/org/zstack/compute/vm/VmInstanceApiInterceptor.java
# Also check the intercept method to see all handled message types
rg -n "instanceof.*Msg" compute/src/main/java/org/zstack/compute/vm/VmInstanceApiInterceptor.java | head -30Repository: MatheMatrix/zstack Length of output: 2170 🏁 Script executed: #!/bin/bash
# Check if there's a handler method for APIRegisterVmInstanceMsg anywhere in the intercept method
cat -n compute/src/main/java/org/zstack/compute/vm/VmInstanceApiInterceptor.java | sed -n '85,180p'
# Also check if the method signature and pattern continues beyond line 146
tail -n +140 compute/src/main/java/org/zstack/compute/vm/VmInstanceApiInterceptor.java | head -50Repository: MatheMatrix/zstack Length of output: 8697 🏁 Script executed: #!/bin/bash
# Verify if APIRegisterVmInstanceMsg is used in other parts of the codebase
rg -r "APIRegisterVmInstanceMsg" compute/src/main/java/org/zstack/compute/vm/ --type java
# Check if there's any mention of RegisterVmInstance in other interceptor or handler files
rg -r "RegisterVmInstance" compute/src/main/java/org/zstack/compute/ --type java | head -20Repository: MatheMatrix/zstack Length of output: 44 🏁 Script executed: #!/bin/bash
# Double-check: search entire repository for APIRegisterVmInstanceMsg to be thorough
rg "APIRegisterVmInstanceMsg" --type javaRepository: MatheMatrix/zstack Length of output: 1264 移除未使用的 该导入在 🤖 Prompt for AI Agents |
||
| import org.zstack.header.storage.primary.PrimaryStorageClusterRefVO; | ||
| import org.zstack.header.storage.primary.PrimaryStorageClusterRefVO_; | ||
| import org.zstack.header.storage.snapshot.VolumeSnapshotVO; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,10 +13,14 @@ | |
| import org.zstack.core.cascade.CascadeFacade; | ||
| import org.zstack.core.cloudbus.*; | ||
| import org.zstack.core.componentloader.PluginRegistry; | ||
| import org.zstack.core.config.GlobalConfig; | ||
| import org.zstack.core.config.GlobalConfigDefinition; | ||
| import org.zstack.core.db.*; | ||
| import org.zstack.core.db.SimpleQuery.Op; | ||
| import org.zstack.core.defer.Defer; | ||
| import org.zstack.core.defer.Deferred; | ||
| import org.zstack.core.gc.GCConstants; | ||
| import org.zstack.core.gc.SubmitTimeBasedGarbageCollectorMsg; | ||
| import org.zstack.core.jsonlabel.JsonLabel; | ||
| import org.zstack.core.thread.ChainTask; | ||
| import org.zstack.core.thread.RunInQueue; | ||
|
|
@@ -45,6 +49,14 @@ | |
| import org.zstack.header.message.*; | ||
| import org.zstack.header.network.l3.*; | ||
| import org.zstack.header.storage.primary.*; | ||
| import org.zstack.header.storage.snapshot.*; | ||
| import org.zstack.header.storage.snapshot.group.VolumeSnapshotGroupRefVO; | ||
| import org.zstack.header.storage.snapshot.group.VolumeSnapshotGroupRefVO_; | ||
| import org.zstack.header.storage.snapshot.group.VolumeSnapshotGroupVO; | ||
| import org.zstack.header.storage.snapshot.group.VolumeSnapshotGroupVO_; | ||
| import org.zstack.header.tag.SystemTagVO; | ||
| import org.zstack.header.tag.SystemTagVO_; | ||
| import org.zstack.header.tag.TagDefinition; | ||
| import org.zstack.header.vm.*; | ||
| import org.zstack.header.vm.ChangeVmMetaDataMsg.AtomicHostUuid; | ||
| import org.zstack.header.vm.ChangeVmMetaDataMsg.AtomicVmState; | ||
|
|
@@ -66,30 +78,29 @@ | |
| import org.zstack.network.l3.L3NetworkManager; | ||
| import org.zstack.network.service.DnsUtils; | ||
| import org.zstack.network.service.NetworkServiceManager; | ||
| import org.zstack.resourceconfig.ResourceConfig; | ||
| import org.zstack.resourceconfig.ResourceConfigFacade; | ||
| import org.zstack.resourceconfig.*; | ||
| import org.zstack.tag.SystemTag; | ||
| import org.zstack.tag.SystemTagCreator; | ||
| import org.zstack.tag.SystemTagUtils; | ||
| import org.zstack.tag.TagManager; | ||
| import org.zstack.utils.CollectionUtils; | ||
| import org.zstack.utils.ExceptionDSL; | ||
| import org.zstack.utils.ObjectUtils; | ||
| import org.zstack.utils.Utils; | ||
| import org.zstack.utils.*; | ||
| import org.zstack.utils.function.ForEachFunction; | ||
| import org.zstack.utils.function.Function; | ||
| import org.zstack.utils.gson.JSONObjectUtil; | ||
| import org.zstack.utils.logging.CLogger; | ||
| import org.zstack.utils.network.NicIpAddressInfo; | ||
| import org.zstack.utils.network.IPv6Constants; | ||
| import org.zstack.utils.network.IPv6NetworkUtils; | ||
| import org.zstack.utils.network.NetworkUtils; | ||
| import org.zstack.utils.network.NicIpAddressInfo; | ||
|
|
||
| import javax.persistence.PersistenceException; | ||
| import javax.persistence.Tuple; | ||
| import javax.persistence.TypedQuery; | ||
| import java.lang.reflect.Field; | ||
| import java.sql.Timestamp; | ||
| import java.time.LocalDateTime; | ||
| import java.util.*; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import static java.util.Arrays.asList; | ||
|
|
@@ -140,6 +151,8 @@ public class VmInstanceBase extends AbstractVmInstance { | |
| private VmInstanceResourceMetadataManager vidm; | ||
| @Autowired | ||
| private NetworkServiceManager nwServiceMgr; | ||
| @Autowired | ||
| private ResourceDestinationMaker destMaker; | ||
|
|
||
| protected VmInstanceVO self; | ||
| protected VmInstanceVO originalCopy; | ||
|
|
@@ -533,6 +546,8 @@ protected void handleLocalMessage(Message msg) { | |
| handle((CancelFlattenVmInstanceMsg) msg); | ||
| } else if (msg instanceof KvmReportVmShutdownEventMsg) { | ||
| handle((KvmReportVmShutdownEventMsg) msg); | ||
| } else if (msg instanceof UpdateVmInstanceMetadataMsg) { | ||
| handle((UpdateVmInstanceMetadataMsg) msg); | ||
| } else { | ||
| VmInstanceBaseExtensionFactory ext = vmMgr.getVmInstanceBaseExtensionFactory(msg); | ||
| if (ext != null) { | ||
|
|
@@ -9369,5 +9384,121 @@ public void run(MessageReply reply) { | |
| } | ||
| }); | ||
| } | ||
|
|
||
| private void handle(UpdateVmInstanceMetadataMsg msg) { | ||
| Tuple tuple = Q.New(VolumeVO.class).select(VolumeVO_.primaryStorageUuid, VolumeVO_.uuid) | ||
| .eq(VolumeVO_.vmInstanceUuid, msg.getUuid()).eq(VolumeVO_.type, VolumeType.Root).findTuple(); | ||
| String primaryStorageUuid = tuple.get(0, String.class); | ||
| String rootVolumeUuid = tuple.get(1, String.class); | ||
|
Comment on lines
+9389
to
+9392
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 补充根卷查询为空的兜底处理。 Line 9389-9392 若根卷不存在, 🛠️ 建议修复- Tuple tuple = Q.New(VolumeVO.class).select(VolumeVO_.primaryStorageUuid, VolumeVO_.uuid)
- .eq(VolumeVO_.vmInstanceUuid, msg.getUuid()).eq(VolumeVO_.type, VolumeType.Root).findTuple();
- String primaryStorageUuid = tuple.get(0, String.class);
- String rootVolumeUuid = tuple.get(1, String.class);
+ Tuple tuple = Q.New(VolumeVO.class).select(VolumeVO_.primaryStorageUuid, VolumeVO_.uuid)
+ .eq(VolumeVO_.vmInstanceUuid, msg.getUuid()).eq(VolumeVO_.type, VolumeType.Root).findTuple();
+ if (tuple == null) {
+ UpdateVmInstanceMetadataOnPrimaryStorageReply reply = new UpdateVmInstanceMetadataOnPrimaryStorageReply();
+ reply.setError(operr("cannot find root volume for vm[uuid:%s]", msg.getUuid()));
+ bus.reply(msg, reply);
+ return;
+ }
+ String primaryStorageUuid = tuple.get(0, String.class);
+ String rootVolumeUuid = tuple.get(1, String.class);🤖 Prompt for AI Agents |
||
|
|
||
| UpdateVmInstanceMetadataOnPrimaryStorageMsg umsg = new UpdateVmInstanceMetadataOnPrimaryStorageMsg(); | ||
| umsg.setMetadata(buildVmInstanceMetadata(msg.getUuid())); | ||
| umsg.setPrimaryStorageUuid(primaryStorageUuid); | ||
| umsg.setRootVolumeUuid(rootVolumeUuid); | ||
| bus.makeTargetServiceIdByResourceUuid(umsg, PrimaryStorageConstant.SERVICE_ID, umsg.getPrimaryStorageUuid()); | ||
| bus.send(umsg, new CloudBusCallBack(msg) { | ||
| @Override | ||
| public void run(MessageReply r) { | ||
| UpdateVmInstanceMetadataOnPrimaryStorageReply reply = new UpdateVmInstanceMetadataOnPrimaryStorageReply(); | ||
|
|
||
| if (!r.isSuccess()) { | ||
| reply.setError(Platform.operr("failed to update vm[uuid=%s] on hypervisor.", self.getUuid()) | ||
| .withCause(r.getError())); | ||
|
|
||
| submitUpdateVmInstanceMetadataGC(); | ||
| return; | ||
| } | ||
| bus.reply(msg, reply); | ||
|
Comment on lines
+9399
to
+9411
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 失败路径缺少回复会导致调用方悬挂。 Line 9404-9410 仅设置 error 并提交 GC,但没有 🛠️ 建议修复 if (!r.isSuccess()) {
reply.setError(Platform.operr("failed to update vm[uuid=%s] on hypervisor.", self.getUuid())
.withCause(r.getError()));
+ bus.reply(msg, reply);
submitUpdateVmInstanceMetadataGC();
return;
}
bus.reply(msg, reply);🤖 Prompt for AI Agents |
||
| } | ||
|
|
||
| private void submitUpdateVmInstanceMetadataGC() { | ||
| SubmitTimeBasedGarbageCollectorMsg gcmsg = new SubmitTimeBasedGarbageCollectorMsg(); | ||
| gcmsg.setGcInterval(VmGlobalConfig.GC_INTERVAL.value(Long.class)); | ||
| gcmsg.setUnit(TimeUnit.SECONDS); | ||
|
|
||
| UpdateVmInstanceMetadataGC gc = new UpdateVmInstanceMetadataGC(); | ||
| gc.vmInstanceUuid = self.getUuid(); | ||
| gc.NAME = String.format("gc-update-vm-%s-metadata", msg.getVmInstanceUuid()); | ||
| gcmsg.setGc(gc); | ||
|
|
||
| bus.makeTargetServiceIdByResourceUuid(gcmsg, GCConstants.SERVICE_ID, msg.getVmInstanceUuid()); | ||
| bus.send(gcmsg); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| private String buildVmInstanceMetadata(String vmInstanceUuid) { | ||
| VmMetadata vmMetadata = new VmMetadata(); | ||
|
|
||
| // 找出vm | ||
| // 找出volume和快照 | ||
| // 找出网卡 | ||
| VmInstanceVO vm = Q.New(VmInstanceVO.class).eq(VmInstanceVO_.uuid, vmInstanceUuid).find(); | ||
| vmMetadata.vmSystemTags = getResourceSystemTagFromDb(vm.getUuid()); | ||
| vmMetadata.vmResourceConfigs = getResourceConfigFromDb(vm.getUuid()); | ||
|
|
||
| // volume | ||
| // 挂载的 | ||
| List<VolumeVO> volumes1 = Q.New(VolumeVO.class).eq(VolumeVO_.vmInstanceUuid, vmInstanceUuid).list(); | ||
| // 被卸载的 | ||
| List<VolumeVO> volumes2 = Q.New(VolumeVO.class).eq(VolumeVO_.vmInstanceUuid, null).eq(VolumeVO_.lastVmInstanceUuid, vmInstanceUuid).list(); | ||
|
|
||
| List<VolumeVO> volumes = new ArrayList<>(); | ||
| volumes.addAll(volumes1); | ||
| volumes.addAll(volumes2); | ||
| volumes.forEach(volume -> { | ||
| vmMetadata.volumeSystemTags.put(volume.getUuid(), getResourceSystemTagFromDb(volume.getUuid())); | ||
| vmMetadata.volumeResourceConfigs.put(volume.getUuid(), getResourceConfigFromDb(volume.getUuid())); | ||
| }); | ||
|
|
||
| // snapshot | ||
| List<String> volumeUuids = volumes.stream().map(VolumeVO::getUuid).collect(Collectors.toList()); | ||
| List<VolumeSnapshotVO> snapshot = Q.New(VolumeSnapshotVO.class).in(VolumeSnapshotVO_.volumeUuid, volumeUuids).list(); | ||
|
|
||
| List<VolumeSnapshotGroupVO> group = Q.New(VolumeSnapshotGroupVO.class).eq(VolumeSnapshotGroupVO_.vmInstanceUuid, vmInstanceUuid).list(); | ||
| List<String> groupUuids = group.stream().map(VolumeSnapshotGroupVO::getUuid).collect(Collectors.toList()); | ||
| List<VolumeSnapshotGroupRefVO> groupRef = Q.New(VolumeSnapshotGroupRefVO.class).in(VolumeSnapshotGroupRefVO_.volumeSnapshotGroupUuid, groupUuids).list(); | ||
|
|
||
| // vm nic | ||
| List<VmNicVO> vmNics = Q.New(VmNicVO.class).eq(VmNicVO_.vmInstanceUuid, vmInstanceUuid).list(); | ||
| vmNics.forEach(nic -> { | ||
| vmMetadata.vmNicSystemTags.put(nic.getUuid(), getResourceSystemTagFromDb(nic.getUuid())); | ||
| vmMetadata.vmNicResourceConfigs.put(nic.getUuid(), getResourceConfigFromDb(nic.getUuid())); | ||
| }); | ||
|
|
||
| // build metadata | ||
| vmMetadata.vmInstanceVO = JSONObjectUtil.toJsonString(vm); | ||
| volumes.forEach(volumeVO -> vmMetadata.volumeVOs.add(JSONObjectUtil.toJsonString(volumeVO))); | ||
| vmNics.forEach(nic -> vmMetadata.vmNicVOs.add(JSONObjectUtil.toJsonString(nic))); | ||
|
|
||
| Map<String, List<String>> volumeSnapshots = new HashMap<>(); | ||
| snapshot.forEach(s -> { | ||
| if (volumeSnapshots.containsKey(s.getVolumeUuid())) { | ||
| volumeSnapshots.get(s.getVolumeUuid()).add(JSONObjectUtil.toJsonString(VolumeSnapshotInventory.valueOf(s))); | ||
| } else { | ||
| volumeSnapshots.put(s.getVolumeUuid(), new ArrayList<>()); | ||
| volumeSnapshots.get(s.getVolumeUuid()).add(JSONObjectUtil.toJsonString(VolumeSnapshotInventory.valueOf(s))); | ||
| } | ||
| }); | ||
| vmMetadata.volumeSnapshots = volumeSnapshots; | ||
| vmMetadata.volumeSnapshotGroupVO = group.stream().map(JSONObjectUtil::toJsonString).collect(Collectors.toList()); | ||
| vmMetadata.volumeSnapshotGroupRefVO = groupRef.stream().map(JSONObjectUtil::toJsonString).collect(Collectors.toList()); | ||
|
|
||
| return JSONObjectUtil.toJsonString(vmMetadata); | ||
| } | ||
|
|
||
| private List<String> getResourceSystemTagFromDb(String resourceUuid) { | ||
| List<String> systemTags = new ArrayList<>(); | ||
| List<SystemTagVO> vos = Q.New(SystemTagVO.class).eq(SystemTagVO_.resourceUuid, resourceUuid).list(); | ||
| vos.forEach(vo -> systemTags.add(JSONObjectUtil.toJsonString(vo))); | ||
| return systemTags; | ||
| } | ||
|
|
||
| private List<String> getResourceConfigFromDb(String resourceUuid) { | ||
| List<String> resourceConfigs = new ArrayList<>(); | ||
| List<ResourceConfigVO> vos = Q.New(ResourceConfigVO.class).eq(ResourceConfigVO_.resourceUuid, resourceUuid).list(); | ||
| vos.forEach(vo -> resourceConfigs.add(JSONObjectUtil.toJsonString(vo))); | ||
| return resourceConfigs; | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -255,6 +255,8 @@ public void handleMessage(Message msg) { | |
| private void handleLocalMessage(Message msg) { | ||
| if (msg instanceof TriggerGcJobMsg) { | ||
| handle((TriggerGcJobMsg) msg); | ||
| } else if (msg instanceof SubmitTimeBasedGarbageCollectorMsg) { | ||
| handle((SubmitTimeBasedGarbageCollectorMsg) msg); | ||
| } else { | ||
| bus.dealWithUnknownMessage(msg); | ||
| } | ||
|
|
@@ -282,6 +284,12 @@ public String getName() { | |
| }); | ||
| } | ||
|
|
||
| private void handle(final SubmitTimeBasedGarbageCollectorMsg msg) { | ||
| MessageReply reply = new MessageReply(); | ||
| msg.getGc().submit(msg.getGcInterval(), msg.getUnit()); | ||
| bus.reply(msg, reply); | ||
| } | ||
|
Comment on lines
+287
to
+291
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 补充参数非空与范围校验,避免 NPE 与无效调度。 🛡️ 建议修复 private void handle(final SubmitTimeBasedGarbageCollectorMsg msg) {
MessageReply reply = new MessageReply();
- msg.getGc().submit(msg.getGcInterval(), msg.getUnit());
- bus.reply(msg, reply);
+ if (msg.getGc() == null || msg.getGcInterval() == null || msg.getUnit() == null) {
+ reply.setError(operr("gc/gcInterval/unit cannot be null"));
+ bus.reply(msg, reply);
+ return;
+ }
+ if (msg.getGcInterval() <= 0) {
+ reply.setError(operr("gcInterval must be > 0"));
+ bus.reply(msg, reply);
+ return;
+ }
+ msg.getGc().submit(msg.getGcInterval(), msg.getUnit());
+ bus.reply(msg, reply);
}🤖 Prompt for AI Agents |
||
|
|
||
| private void handleApiMessage(APIMessage msg) { | ||
| if (msg instanceof APITriggerGCJobMsg) { | ||
| handle((APITriggerGCJobMsg) msg); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
潜在的 TOCTOU 竞态条件
在检查
chainInfo.getPendingTask().isEmpty()和提交chainSubmit之间存在时间窗口,其他线程可能在此期间提交任务。不过考虑到这是 GC 场景,偶尔的重复提交不会造成数据一致性问题(因为后续会通过syncSignature串行化执行),影响较小。建议确认这种行为是否符合预期设计。
🤖 Prompt for AI Agents