Skip to content

Commit ea79e83

Browse files
committed
<feature>[thread]: support coalesce queue for batch dhcp
Resolves: ZSTAC-83039 Change-Id: I737574686f6e6b69726e79736c6279616e66706b
1 parent 3a6ad0f commit ea79e83

9 files changed

Lines changed: 994 additions & 4 deletions

File tree

compute/src/main/java/org/zstack/compute/zone/AbstractZone.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import org.zstack.header.zone.ZoneStateEvent;
99

1010
abstract class AbstractZone implements Zone {
11-
private static DatabaseFacade dbf = Platform.getComponentLoader().getComponent(DatabaseFacade.class);
1211
private final static StateMachine<ZoneState, ZoneStateEvent> stateMachine;
1312

1413
static {
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package org.zstack.core.thread;
2+
3+
import org.springframework.beans.factory.annotation.Autowire;
4+
import org.springframework.beans.factory.annotation.Autowired;
5+
import org.springframework.beans.factory.annotation.Configurable;
6+
import org.zstack.header.core.AbstractCompletion;
7+
import org.zstack.header.core.Completion;
8+
import org.zstack.header.core.ReturnValueCompletion;
9+
import org.zstack.header.errorcode.ErrorCode;
10+
import org.zstack.utils.Utils;
11+
import org.zstack.utils.logging.CLogger;
12+
13+
import java.util.ArrayList;
14+
import java.util.Collections;
15+
import java.util.List;
16+
import java.util.concurrent.ConcurrentHashMap;
17+
import java.util.stream.Collectors;
18+
19+
/**
20+
* Base implementation for coalesce queues.
21+
*
22+
* @param <T> Request Item Type
23+
* @param <R> Batch Execution Result Type
24+
* @param <V> Single Request Result Type
25+
*/
26+
@Configurable(preConstruction = true, autowire = Autowire.BY_TYPE)
27+
public abstract class AbstractCoalesceQueue<T, R, V> {
28+
private static final CLogger logger = Utils.getLogger(AbstractCoalesceQueue.class);
29+
30+
@Autowired
31+
private ThreadFacade thdf;
32+
33+
private final ConcurrentHashMap<String, SignatureQueue> signatureQueues = new ConcurrentHashMap<>();
34+
35+
protected class PendingRequest {
36+
final T item;
37+
final AbstractCompletion completion;
38+
39+
PendingRequest(T item, AbstractCompletion completion) {
40+
this.item = item;
41+
this.completion = completion;
42+
}
43+
44+
@SuppressWarnings("unchecked")
45+
void notifySuccess(V result) {
46+
if (completion == null) {
47+
return;
48+
}
49+
50+
if (completion instanceof ReturnValueCompletion) {
51+
((ReturnValueCompletion<V>) completion).success(result);
52+
} else if (completion instanceof Completion) {
53+
((Completion) completion).success();
54+
}
55+
}
56+
57+
void notifyFailure(ErrorCode errorCode) {
58+
if (completion == null) {
59+
return;
60+
}
61+
62+
if (completion instanceof ReturnValueCompletion) {
63+
((ReturnValueCompletion<V>) completion).fail(errorCode);
64+
} else if (completion instanceof Completion) {
65+
((Completion) completion).fail(errorCode);
66+
}
67+
}
68+
}
69+
70+
private class SignatureQueue {
71+
final String syncSignature;
72+
List<PendingRequest> pendingList = Collections.synchronizedList(new ArrayList<>());
73+
74+
SignatureQueue(String syncSignature) {
75+
this.syncSignature = syncSignature;
76+
}
77+
78+
synchronized List<PendingRequest> takeAll() {
79+
List<PendingRequest> toProcess = pendingList;
80+
pendingList = Collections.synchronizedList(new ArrayList<>());
81+
return toProcess;
82+
}
83+
84+
synchronized void add(PendingRequest request) {
85+
pendingList.add(request);
86+
}
87+
88+
synchronized boolean isEmpty() {
89+
return pendingList.isEmpty();
90+
}
91+
}
92+
93+
protected abstract String getName();
94+
95+
protected abstract void executeBatch(List<T> items, AbstractCompletion completion);
96+
97+
protected abstract AbstractCompletion createBatchCompletion(String syncSignature, List<PendingRequest> requests, SyncTaskChain chain);
98+
99+
protected abstract V calculateResult(T item, R batchResult);
100+
101+
protected final void handleSuccess(String syncSignature, List<PendingRequest> requests, R batchResult, SyncTaskChain chain) {
102+
for (PendingRequest req : requests) {
103+
try {
104+
V singleResult = calculateResult(req.item, batchResult);
105+
req.notifySuccess(singleResult);
106+
} catch (Throwable t) {
107+
logger.warn(String.format("[%s] failed to calculate result for item %s", getName(), req.item), t);
108+
req.notifyFailure(org.zstack.core.Platform.operr("failed to calculate result: %s", t.getMessage()));
109+
}
110+
}
111+
cleanup(syncSignature);
112+
chain.next();
113+
}
114+
115+
protected final void handleFailure(String syncSignature, List<PendingRequest> requests, ErrorCode errorCode, SyncTaskChain chain) {
116+
for (PendingRequest req : requests) {
117+
req.notifyFailure(errorCode);
118+
}
119+
cleanup(syncSignature);
120+
chain.next();
121+
}
122+
123+
void setThreadFacade(ThreadFacade thdf) {
124+
this.thdf = thdf;
125+
}
126+
127+
protected final void submitRequest(String syncSignature, T item, AbstractCompletion completion) {
128+
doSubmit(syncSignature, new PendingRequest(item, completion));
129+
}
130+
131+
private void doSubmit(String syncSignature, PendingRequest request) {
132+
SignatureQueue queue = signatureQueues.computeIfAbsent(syncSignature, SignatureQueue::new);
133+
queue.add(request);
134+
135+
thdf.chainSubmit(new ChainTask(null) {
136+
@Override
137+
public String getSyncSignature() {
138+
return String.format("coalesce-queue-%s-%s", AbstractCoalesceQueue.this.getName(), syncSignature);
139+
}
140+
141+
@Override
142+
public void run(SyncTaskChain chain) {
143+
List<PendingRequest> requests = queue.takeAll();
144+
145+
if (requests.isEmpty()) {
146+
chain.next();
147+
return;
148+
}
149+
150+
String name = getName();
151+
logger.debug(String.format("[%s] coalescing %d requests for signature[%s]",
152+
name, requests.size(), syncSignature));
153+
154+
AbstractCompletion batchCompletion = createBatchCompletion(syncSignature, requests, chain);
155+
List<T> items = requests.stream().map(req -> req.item).collect(Collectors.toList());
156+
executeBatch(items, batchCompletion);
157+
}
158+
159+
@Override
160+
public String getName() {
161+
return String.format("%s-coalesced-batch-%s", AbstractCoalesceQueue.this.getName(), syncSignature);
162+
}
163+
164+
@Override
165+
protected int getSyncLevel() {
166+
return 1;
167+
}
168+
});
169+
}
170+
171+
private void cleanup(String syncSignature) {
172+
signatureQueues.computeIfPresent(syncSignature, (k, queue) -> {
173+
if (queue.isEmpty()) {
174+
return null;
175+
}
176+
return queue;
177+
});
178+
}
179+
180+
int getActiveQueueCount() {
181+
return signatureQueues.size();
182+
}
183+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package org.zstack.core.thread;
2+
3+
import org.zstack.header.core.AbstractCompletion;
4+
import org.zstack.header.core.Completion;
5+
import org.zstack.header.errorcode.ErrorCode;
6+
7+
import java.util.List;
8+
9+
/**
10+
* A coalesce queue for requests that do NOT expect a return value.
11+
*
12+
* @param <T> Request Item Type
13+
*/
14+
public abstract class CoalesceQueue<T> extends AbstractCoalesceQueue<T, Void, Void> {
15+
16+
public void submit(String syncSignature, T item, Completion completion) {
17+
submitRequest(syncSignature, item, completion);
18+
}
19+
20+
protected abstract void executeBatch(List<T> items, Completion completion);
21+
22+
@Override
23+
protected final void executeBatch(List<T> items, AbstractCompletion batchCompletion) {
24+
executeBatch(items, (Completion) batchCompletion);
25+
}
26+
27+
@Override
28+
protected final AbstractCompletion createBatchCompletion(String syncSignature, List<PendingRequest> requests, SyncTaskChain chain) {
29+
return new Completion(chain) {
30+
@Override
31+
public void success() {
32+
handleSuccess(syncSignature, requests, null, chain);
33+
}
34+
35+
@Override
36+
public void fail(ErrorCode errorCode) {
37+
handleFailure(syncSignature, requests, errorCode, chain);
38+
}
39+
};
40+
}
41+
42+
@Override
43+
protected final Void calculateResult(T item, Void batchResult) {
44+
return null;
45+
}
46+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package org.zstack.core.thread;
2+
3+
import org.zstack.header.core.AbstractCompletion;
4+
import org.zstack.header.core.ReturnValueCompletion;
5+
import org.zstack.header.errorcode.ErrorCode;
6+
7+
import java.util.List;
8+
9+
/**
10+
* A coalesce queue for requests that expect a return value.
11+
*
12+
* @param <T> Request Item Type
13+
* @param <R> Batch Execution Result Type
14+
* @param <V> Single Request Result Type
15+
*/
16+
public abstract class ReturnValueCoalesceQueue<T, R, V> extends AbstractCoalesceQueue<T, R, V> {
17+
18+
public void submit(String syncSignature, T item, ReturnValueCompletion<V> completion) {
19+
submitRequest(syncSignature, item, completion);
20+
}
21+
22+
protected abstract void executeBatch(List<T> items, ReturnValueCompletion<R> completion);
23+
24+
@Override
25+
protected final void executeBatch(List<T> items, AbstractCompletion batchCompletion) {
26+
executeBatch(items, (ReturnValueCompletion<R>) batchCompletion);
27+
}
28+
29+
@Override
30+
protected final AbstractCompletion createBatchCompletion(String syncSignature, List<PendingRequest> requests, SyncTaskChain chain) {
31+
return new ReturnValueCompletion<R>(null) {
32+
@Override
33+
public void success(R batchResult) {
34+
handleSuccess(syncSignature, requests, batchResult, chain);
35+
}
36+
37+
@Override
38+
public void fail(ErrorCode errorCode) {
39+
handleFailure(syncSignature, requests, errorCode, chain);
40+
}
41+
};
42+
}
43+
}

plugin/flatNetworkProvider/src/main/java/org/zstack/network/service/flat/FlatDhcpBackend.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
import org.zstack.core.db.SQL;
1919
import org.zstack.core.defer.Defer;
2020
import org.zstack.core.defer.Deferred;
21-
import org.zstack.core.thread.SyncTask;
22-
import org.zstack.core.thread.ThreadFacade;
21+
import org.zstack.core.thread.*;
2322
import org.zstack.core.upgrade.GrayVersion;
2423
import org.zstack.core.workflow.SimpleFlowChain;
2524
import org.zstack.header.AbstractService;
@@ -121,6 +120,48 @@ public class FlatDhcpBackend extends AbstractService implements NetworkServiceDh
121120

122121
private Map<String, L3NetworkGetIpStatisticExtensionPoint> getIpStatisticExts = new HashMap<>();
123122

123+
private static class DhcpApplyRequest {
124+
final String hostUuid;
125+
final List<DhcpInfo> dhcpInfos;
126+
final boolean rebuild;
127+
128+
DhcpApplyRequest(String hostUuid, List<DhcpInfo> dhcpInfos, boolean rebuild) {
129+
this.hostUuid = hostUuid;
130+
this.dhcpInfos = dhcpInfos;
131+
this.rebuild = rebuild;
132+
}
133+
}
134+
135+
private class DhcpApplyQueue extends CoalesceQueue<DhcpApplyRequest> {
136+
@Override
137+
protected String getName() {
138+
return "flat-dhcp-apply";
139+
}
140+
141+
@Override
142+
protected void executeBatch(List<DhcpApplyRequest> requests, Completion completion) {
143+
if (requests.isEmpty()) {
144+
completion.success();
145+
return;
146+
}
147+
148+
String hostUuid = requests.get(0).hostUuid;
149+
150+
boolean anyRebuild = false;
151+
List<DhcpInfo> mergedInfos = new ArrayList<>();
152+
for (DhcpApplyRequest req : requests) {
153+
anyRebuild = anyRebuild || req.rebuild;
154+
mergedInfos.addAll(req.dhcpInfos);
155+
}
156+
157+
logger.debug(String.format("Coalesced %d DHCP apply requests for host[uuid:%s]", requests.size(), hostUuid));
158+
159+
applyDhcpToHosts(mergedInfos, hostUuid, anyRebuild, completion);
160+
}
161+
}
162+
163+
private final DhcpApplyQueue dhcpApplyCoalesceQueue = new DhcpApplyQueue();
164+
124165
public static final String APPLY_DHCP_PATH = "/flatnetworkprovider/dhcp/apply";
125166
public static final String BATCH_APPLY_DHCP_PATH = "/flatnetworkprovider/dhcp/batchApply";
126167
public static final String PREPARE_DHCP_PATH = "/flatnetworkprovider/dhcp/prepare";
@@ -2074,7 +2115,9 @@ public void applyDhcpService(List<DhcpStruct> dhcpStructList, VmInstanceSpec spe
20742115
return;
20752116
}
20762117

2077-
applyDhcpToHosts(toDhcpInfo(dhcpStructList), spec.getDestHost().getUuid(), false, completion);
2118+
String hostUuid = spec.getDestHost().getUuid();
2119+
DhcpApplyRequest request = new DhcpApplyRequest(hostUuid, toDhcpInfo(dhcpStructList), false);
2120+
dhcpApplyCoalesceQueue.submit(hostUuid, request, completion);
20782121
}
20792122

20802123
private void releaseDhcpService(List<DhcpInfo> info, final String vmUuid, final String hostUuid, final NoErrorCompletion completion) {

0 commit comments

Comments
 (0)