Skip to content

Commit 5ad7dca

Browse files
committed
<fix>[storage]: Serialize and modify multiple mds nodes
When API requests for modifying multiple MDS nodes are initiated, place the API requests into a queue for serialized modification. Resolves: ZSTAC-79225 Change-Id: I78716363796364777676626a686b787177696361
1 parent 88b85d2 commit 5ad7dca

2 files changed

Lines changed: 90 additions & 3 deletions

File tree

storage/src/main/java/org/zstack/storage/addon/primary/ExternalPrimaryStorage.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.zstack.core.workflow.FlowChainBuilder;
1919
import org.zstack.core.workflow.ShareFlow;
2020
import org.zstack.header.core.Completion;
21+
import org.zstack.header.core.NoErrorCompletion;
2122
import org.zstack.header.core.NopeCompletion;
2223
import org.zstack.header.core.ReturnValueCompletion;
2324
import org.zstack.header.core.WhileDoneCompletion;
@@ -172,7 +173,31 @@ protected void handleApiMessage(APIMessage msg) {
172173
}
173174
}
174175

175-
private void handle(APIUpdateExternalPrimaryStorageMsg msg) {
176+
private void handle(final APIUpdateExternalPrimaryStorageMsg msg) {
177+
thdf.chainSubmit(new ChainTask(msg) {
178+
@Override
179+
public String getSyncSignature() {
180+
return String.format("update-external-primary-storage-%s", msg.getUuid());
181+
}
182+
183+
@Override
184+
public void run(SyncTaskChain chain) {
185+
doUpdateExternalPrimaryStorageInQueue(msg, new NoErrorCompletion(chain) {
186+
@Override
187+
public void done() {
188+
chain.next();
189+
}
190+
});
191+
}
192+
193+
@Override
194+
public String getName() {
195+
return getSyncSignature();
196+
}
197+
});
198+
}
199+
200+
private void doUpdateExternalPrimaryStorageInQueue(APIUpdateExternalPrimaryStorageMsg msg, NoErrorCompletion completion) {
176201
APIUpdateExternalPrimaryStorageEvent evt = new APIUpdateExternalPrimaryStorageEvent(msg.getId());
177202
if (msg.getName() != null) {
178203
externalVO.setName(msg.getName());
@@ -188,7 +213,7 @@ private void handle(APIUpdateExternalPrimaryStorageMsg msg) {
188213
}
189214
boolean needReconnect = false;
190215
String oldConfig = externalVO.getConfig();
191-
if (msg.getConfig() != null) {
216+
if (msg.getConfig() != null && !msg.getConfig().equals(oldConfig)) {
192217
String config = controller.validateConfig(msg.getConfig());
193218
externalVO.setConfig(config);
194219
needReconnect = true;
@@ -216,13 +241,15 @@ public void run(MessageReply reply) {
216241
}
217242

218243
bus.publish(evt);
244+
completion.done();
219245
}
220246
});
221247
return;
222248
}
223249

224250
evt.setInventory(externalVO.toInventory());
225251
bus.publish(evt);
252+
completion.done();
226253
}
227254

228255
@Override

test/src/test/groovy/org/zstack/test/integration/storage/primary/addon/zbs/ZbsPrimaryStorageCase.groovy

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package org.zstack.test.integration.storage.primary.addon.zbs
22

33
import org.springframework.http.HttpEntity
4+
import org.zstack.core.cloudbus.CloudBus
45
import org.zstack.core.cloudbus.EventCallback
56
import org.zstack.core.cloudbus.EventFacade
67
import org.zstack.core.db.Q
8+
import org.zstack.core.thread.ThreadFacadeImpl
79
import org.zstack.header.storage.addon.primary.ExternalPrimaryStorageVO
810
import org.zstack.header.storage.addon.primary.ExternalPrimaryStorageSpaceVO
911
import org.zstack.header.storage.addon.primary.ExternalPrimaryStorageVO_
@@ -13,6 +15,8 @@ import org.zstack.header.storage.primary.PrimaryStorageCapacityVO_
1315
import org.zstack.header.storage.primary.PrimaryStorageHostRefVO
1416
import org.zstack.header.storage.primary.PrimaryStorageHostRefVO_
1517
import org.zstack.header.storage.primary.PrimaryStorageStatus
18+
import org.zstack.header.storage.primary.ReconnectPrimaryStorageMsg
19+
import org.zstack.header.storage.primary.ReconnectPrimaryStorageReply
1620
import org.zstack.storage.zbs.MdsUri
1721
import org.zstack.sdk.*
1822
import org.zstack.storage.addon.primary.ExternalPrimaryStorageSystemTags
@@ -47,6 +51,8 @@ class ZbsPrimaryStorageCase extends SubCase {
4751
VolumeInventory vol, vol2
4852
KVMHostInventory kvm
4953
EventFacade evtf
54+
ThreadFacadeImpl thdf
55+
AtomicInteger reconnectMsgCount = new AtomicInteger(0)
5056

5157
@Override
5258
void clean() {
@@ -165,6 +171,7 @@ class ZbsPrimaryStorageCase extends SubCase {
165171
diskOffering = env.inventoryByName("diskOffering") as DiskOfferingInventory
166172
kvm = env.inventoryByName("kvm-1") as KVMHostInventory
167173
evtf = bean(EventFacade.class)
174+
thdf = bean(ThreadFacadeImpl.class)
168175

169176
testSyncPrimaryStorageCapacityConcurrently()
170177
testDefaultConfig()
@@ -294,14 +301,67 @@ class ZbsPrimaryStorageCase extends SubCase {
294301
assert Q.New(ExternalPrimaryStorageSpaceVO.class)
295302
.eq(ExternalPrimaryStorageSpaceVO_.primaryStorageUuid, ps.uuid)
296303
.count() == 1
304+
305+
def signature = String.format("update-external-primary-storage-%s", ps.uuid)
306+
def run = thdf.getChainTaskInfo(signature).getRunningTask().size()
307+
retryInSecs {
308+
assert thdf.getChainTaskInfo(signature).getRunningTask().size() == 0
309+
assert thdf.getChainTaskInfo(signature).getPendingTask().size() == 0
310+
}
311+
env.message(ReconnectPrimaryStorageMsg.class) { ReconnectPrimaryStorageMsg msg, CloudBus bus ->
312+
if (ps != null && msg.getPrimaryStorageUuid() == ps.uuid) {
313+
reconnectMsgCount.incrementAndGet()
314+
run = thdf.getChainTaskInfo(signature).getRunningTask().size()
315+
assert run == 1
316+
}
317+
def reply = new ReconnectPrimaryStorageReply()
318+
bus.reply(msg, reply)
319+
}
320+
Thread.start {
321+
updateExternalPrimaryStorage {
322+
uuid = ps.uuid
323+
config ="{\"mdsUrls\":[\"root:password@127.0.1.4\",\"root:password@127.0.1.2\",\"root:password@127.0.1.3\"],\"logicalPoolName\":\"lpool1\"}"
324+
}
325+
}
326+
Thread.start {
327+
updateExternalPrimaryStorage {
328+
uuid = ps.uuid
329+
config ="{\"mdsUrls\":[\"root:password@127.0.1.5\",\"root:password@127.0.1.2\",\"root:password@127.0.1.3\"],\"logicalPoolName\":\"lpool1\"}"
330+
}
331+
}
332+
retryInSecs {
333+
assert reconnectMsgCount.get() >=1
334+
assert thdf.getChainTaskInfo(signature).getRunningTask().size() == 0
335+
assert thdf.getChainTaskInfo(signature).getPendingTask().size() == 0
336+
}
337+
def oldReconnectMsgCount=reconnectMsgCount.get()
338+
String oldConfig = Q.New(ExternalPrimaryStorageVO.class)
339+
.select(ExternalPrimaryStorageVO_.config)
340+
.eq(ExternalPrimaryStorageVO_.uuid, ps.uuid)
341+
.findValue()
342+
Thread.start {
343+
updateExternalPrimaryStorage {
344+
uuid = ps.uuid
345+
config = oldConfig
346+
}
347+
}
348+
retryInSecs {
349+
assert thdf.getChainTaskInfo(signature).getRunningTask().size() == 0
350+
assert thdf.getChainTaskInfo(signature).getPendingTask().size() == 0
351+
}
352+
assert reconnectMsgCount.get() == oldReconnectMsgCount
353+
env.revokeMessage(ReconnectPrimaryStorageMsg.class, null)
297354
// update multi pools
298355
// Config.Pool
299356
updateExternalPrimaryStorage {
300357
uuid = ps.uuid
301358
config = "{\"mdsUrls\":[\"root:password@127.0.1.1\",\"root:password@127.0.1.2\",\"root:password@127.0.1.3\"]," +
302359
"\"pools\":[{\"logicalName\":\"lpool1\"}, {\"logicalName\":\"lpool2\"}]}"
303360
}
304-
361+
retryInSecs {
362+
assert thdf.getChainTaskInfo(signature).getRunningTask().size() == 0
363+
assert thdf.getChainTaskInfo(signature).getPendingTask().size() == 0
364+
}
305365
assert Q.New(ExternalPrimaryStorageSpaceVO.class)
306366
.eq(ExternalPrimaryStorageSpaceVO_.primaryStorageUuid, ps.uuid)
307367
.count() == 2

0 commit comments

Comments
 (0)