From 19fa46e0bf81f14f8f33d7ef3932861086f891f9 Mon Sep 17 00:00:00 2001 From: Zhang Wenhao Date: Tue, 17 Mar 2026 17:14:11 +0800 Subject: [PATCH] [cloudbus]: support send confirm 1. send in CloudBus means submit a message send task. function return do not means sending completion. so there is no way to set a limition in sender. add a FutureCompletion to confirm message send. provide a way to limited send. 2. fix mn heartbeat thread interrupt handle 3. refactor duplicate code for getChainInfo, fix synchronized target. This patch is cherry-picked from ZSTAC-71213 (commit: af58834cb816224677b5bc2d7badd0a7a446befa) Resolves: ZSV-11510 Change-Id: I686361627566737a7775706263796b736967666c --- conf/errorCodes/sys.xml | 5 + .../main/java/org/zstack/core/Platform.java | 12 +- .../org/zstack/core/cloudbus/CloudBus.java | 11 +- .../zstack/core/cloudbus/CloudBusImpl2.java | 12 +- .../zstack/core/cloudbus/CloudBusImpl3.java | 86 ++++--- .../zstack/core/thread/DispatchQueueImpl.java | 93 +++---- .../zstack/core/thread/TaskInfoBuilder.java | 16 +- .../zstack/core/thread/ThreadFacadeImpl.java | 1 + .../header/core/progress/ChainInfo.java | 25 ++ .../zstack/header/errorcode/SysErrors.java | 3 + .../ManagementNodeManagerImpl.java | 24 +- .../core/cloudbus/CloudBus3Case.groovy | 226 +++++++++++++++++- .../ManagementNodeNotFoundHandlerCase.groovy | 36 +-- .../core/cloudbus/MessageSender.java | 42 ++++ .../portal/ManagementNodeHeartbeatCase.groovy | 9 +- 15 files changed, 464 insertions(+), 137 deletions(-) create mode 100644 test/src/test/groovy/org/zstack/test/integration/core/cloudbus/MessageSender.java diff --git a/conf/errorCodes/sys.xml b/conf/errorCodes/sys.xml index c5b763f86ba..b21f247c36e 100755 --- a/conf/errorCodes/sys.xml +++ b/conf/errorCodes/sys.xml @@ -105,5 +105,10 @@ 1090 Multiple reasons + + + 2111 + CloudBus message sending related error + diff --git a/core/src/main/java/org/zstack/core/Platform.java b/core/src/main/java/org/zstack/core/Platform.java index a046d3fe5b4..801579c219f 100755 --- a/core/src/main/java/org/zstack/core/Platform.java +++ b/core/src/main/java/org/zstack/core/Platform.java @@ -50,6 +50,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.net.Inet4Address; import java.net.InetAddress; import java.net.NetworkInterface; import java.net.SocketException; @@ -817,9 +818,14 @@ private static String getManagementServerIpInternal() { for (NetworkInterface iface : Collections.list(nets)) { String name = iface.getName(); if (defaultLine.contains(name)) { - InetAddress ia = iface.getInetAddresses().nextElement(); - ip = ia.getHostAddress(); - break; + for (InetAddress ia : Collections.list(iface.getInetAddresses())) { + ip = ia.getHostAddress(); + if (ia instanceof Inet4Address) { + // we prefer IPv4 address + ip = ia.getHostAddress(); + break; + } + } } } } catch (SocketException e) { diff --git a/core/src/main/java/org/zstack/core/cloudbus/CloudBus.java b/core/src/main/java/org/zstack/core/cloudbus/CloudBus.java index efe579df429..5252bc7ca24 100755 --- a/core/src/main/java/org/zstack/core/cloudbus/CloudBus.java +++ b/core/src/main/java/org/zstack/core/cloudbus/CloudBus.java @@ -3,6 +3,7 @@ import org.springframework.http.HttpEntity; import org.zstack.header.Component; import org.zstack.header.Service; +import org.zstack.header.core.FutureCompletion; import org.zstack.header.errorcode.ErrorCode; import org.zstack.header.exception.CloudConfigureFailException; import org.zstack.header.message.*; @@ -12,14 +13,18 @@ import java.util.function.Consumer; public interface CloudBus extends Component { - void send(Message msg); - + /** + * submit a message sending task into the queue. + * @return {@link FutureCompletion} which can be used to check whether the message was successfully sent or not. + */ + FutureCompletion send(Message msg); + void send(List msgs); @Deprecated void send(APIMessage msg, Consumer consumer); - void send(NeedReplyMessage msg, CloudBusCallBack callback); + FutureCompletion send(NeedReplyMessage msg, CloudBusCallBack callback); @Deprecated void send(List msgs, CloudBusListCallBack callBack); diff --git a/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.java b/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.java index 10e03926888..ca7938904ee 100755 --- a/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.java +++ b/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.java @@ -19,6 +19,7 @@ import org.zstack.core.timeout.ApiTimeoutManager; import org.zstack.header.Constants; import org.zstack.header.Service; +import org.zstack.header.core.FutureCompletion; import org.zstack.header.log.NoLogging; import org.zstack.header.apimediator.APIIsReadyToGoMsg; import org.zstack.header.apimediator.APIIsReadyToGoReply; @@ -106,6 +107,11 @@ public class CloudBusImpl2 implements CloudBus, CloudBusIN, ManagementNodeChange private final String AMQP_PROPERTY_HEADER__COMPRESSED = "compressed"; private String SERVICE_ID = makeLocalServiceId("cloudbus"); + public static final FutureCompletion SEND_CONFIRMED = new FutureCompletion(null); + + static { + SEND_CONFIRMED.success(); + } public void setDEFAULT_MESSAGE_TIMEOUT(long timeout) { this.DEFAULT_MESSAGE_TIMEOUT = timeout; @@ -1323,8 +1329,9 @@ private void send(Message msg, Boolean noNeedReply) { } @Override - public void send(Message msg) { + public FutureCompletion send(Message msg) { send(msg, true); + return SEND_CONFIRMED; } @Override @@ -1354,7 +1361,7 @@ private void evaluateMessageTimeout(NeedReplyMessage msg) { } @Override - public void send(final NeedReplyMessage msg, final CloudBusCallBack callback) { + public FutureCompletion send(final NeedReplyMessage msg, final CloudBusCallBack callback) { evaluateMessageTimeout(msg); Envelope e = new Envelope() { @@ -1405,6 +1412,7 @@ List getRequests() { envelopes.put(msg.getId(), e); send(msg, false); + return SEND_CONFIRMED; } private MessageReply createTimeoutReply(NeedReplyMessage m) { diff --git a/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java b/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java index 0d83683708a..889a61089a1 100755 --- a/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java +++ b/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java @@ -49,6 +49,7 @@ import static org.zstack.core.Platform.*; import static org.zstack.core.cloudbus.CloudBusGlobalProperty.SYNC_CALL_TIMEOUT; +import static org.zstack.header.errorcode.SysErrors.CLOUD_BUS_SEND_ERROR; import static org.zstack.utils.BeanUtils.getProperty; import static org.zstack.utils.BeanUtils.setProperty; @@ -101,6 +102,7 @@ public class CloudBusImpl3 implements CloudBus, CloudBusIN { private final static TimeoutRestTemplate http = RESTFacade.createRestTemplate(CoreGlobalProperty.REST_FACADE_READ_TIMEOUT, CoreGlobalProperty.REST_FACADE_CONNECT_TIMEOUT); public static final String HTTP_BASE_URL = "/cloudbus"; + public static final FutureCompletion SEND_CONFIRMED = new FutureCompletion(null); { if (CloudBusGlobalProperty.MESSAGE_LOG != null) { @@ -114,6 +116,8 @@ public class CloudBusImpl3 implements CloudBus, CloudBusIN { CloudBusGlobalProperty.HTTP_CONTEXT_PATH = ""; CloudBusGlobalProperty.HTTP_PORT = 8989; } + + SEND_CONFIRMED.success(); } public static String getManagementNodeUUIDFromServiceID(String serviceID) { @@ -125,6 +129,12 @@ public static String getManagementNodeUUIDFromServiceID(String serviceID) { return ss[0]; } + private FutureCompletion sendFail(ErrorCode errorCode) { + FutureCompletion c = new FutureCompletion(null); + c.fail(errorCode); + return c; + } + private abstract class Envelope { long startTime; @@ -250,8 +260,8 @@ public void deActiveService(String id) { } @Override - public void send(Message msg) { - send(msg, true); + public FutureCompletion send(Message msg) { + return send(msg, true); } @Override @@ -291,11 +301,11 @@ private MessageReply createTimeoutReply(NeedReplyMessage m) { } @Override - public void send(NeedReplyMessage msg, CloudBusCallBack callback) { + public FutureCompletion send(NeedReplyMessage msg, CloudBusCallBack callback) { evaluateMessageTimeout(msg); if (msg.getTimeout() <= 1) { callback.run(createTimeoutReply(msg)); - return; + return SEND_CONFIRMED; } Envelope e = new Envelope() { @@ -346,7 +356,7 @@ public void timeout() { envelopes.put(msg.getId(), e); msgExts.forEach(m -> m.afterAddEnvelopes(msg.getId())); - send(msg, false); + return send(msg, false); } @Override @@ -454,7 +464,7 @@ public void reply(Message request, MessageReply reply) { callReplyPreSendingExtensions(reply, (NeedReplyMessage) request); } catch (Exception e) { logger.error("failed to call pre-sending reply extension:", e); - reply.setError(operr(e.getMessage())); + reply.setError(err(CLOUD_BUS_SEND_ERROR, e.getMessage())); } } @@ -526,29 +536,36 @@ public MessageSender(Message msg) { localSend = !CloudBusGlobalProperty.HTTP_ALWAYS && managementNodeId.equals(Platform.getManagementServerId()); } - void send() { + FutureCompletion send() { try { - doSend(); + return doSend(); } catch (Throwable th) { - replyErrorIfNeeded(operr(th.getMessage())); + ErrorCode err = err(CLOUD_BUS_SEND_ERROR, th.getMessage()); + replyErrorIfNeeded(err); + + FutureCompletion c = new FutureCompletion(null); + c.fail(err); + return c; } } - private void doSend() { + private FutureCompletion doSend() { if (msg instanceof Event) { eventSend(); - return; + return SEND_CONFIRMED; } if (localSend) { localSend(); + return SEND_CONFIRMED; } else { - httpSend(); + return httpSend(); } } - private void httpSendInQueue(String ip) { - thdf.chainSubmit(new ChainTask(null) { + private FutureCompletion httpSendInQueue(String ip) { + FutureCompletion sendCompletion = new FutureCompletion(null); + thdf.chainSubmit(new ChainTask(sendCompletion) { @Override public String getSyncSignature() { return "http-send-in-queue"; @@ -557,6 +574,7 @@ public String getSyncSignature() { @Override public void run(SyncTaskChain chain) { httpSend(ip); + sendCompletion.success(); chain.next(); } @@ -570,25 +588,29 @@ public String getName() { return getSyncSignature(); } }); + return sendCompletion; } - private void httpSend() { + private FutureCompletion httpSend() { buildSchema(msg); + String ip; try { - String ip = destMaker.getNodeInfo(managementNodeId).getNodeIP(); - httpSendInQueue(ip); + ip = destMaker.getNodeInfo(managementNodeId).getNodeIP(); } catch (ManagementNodeNotFoundException e) { - if (msg instanceof MessageReply) { - if (!deadMessageManager.handleManagementNodeNotFoundError(managementNodeId, msg, () -> { - String ip = destMaker.getNodeInfo(managementNodeId).getNodeIP(); - httpSendInQueue(ip); - })) { - throw e; - } + boolean errorHandled = msg instanceof MessageReply && + deadMessageManager.handleManagementNodeNotFoundError(managementNodeId, msg, () -> { + String otherIp = destMaker.getNodeInfo(managementNodeId).getNodeIP(); + logger.warn(String.format("resend the message[id:%s] to node[ip:%s]", msg.getId(), otherIp)); + httpSendInQueue(otherIp); + }); + if (errorHandled) { + return SEND_CONFIRMED; } else { throw e; } } + + return httpSendInQueue(ip); } private void httpSend(String ip) { @@ -612,12 +634,12 @@ protected ResponseEntity call() { }.run(); if (!rsp.getStatusCode().is2xxSuccessful()) { - replyErrorIfNeeded(operr("HTTP ERROR, status code: %s, body: %s", rsp.getStatusCode(), rsp.getBody())); + replyErrorIfNeeded(err(CLOUD_BUS_SEND_ERROR, "HTTP ERROR, status code: %s, body: %s", rsp.getStatusCode(), rsp.getBody())); } } catch (OperationFailureException e) { replyErrorIfNeeded(e.getErrorCode()); } catch (Throwable e) { - replyErrorIfNeeded(operr(e.getMessage())); + replyErrorIfNeeded(err(CLOUD_BUS_SEND_ERROR, e.getMessage())); } } @@ -1194,7 +1216,7 @@ private void evalThreadContextToMessage(Message msg) { } } - private void doSendAndCallExtensions(Message msg) { + private FutureCompletion doSendAndCallExtensions(Message msg) { // for unit test finding invocation chain MessageCommandRecorder.record(msg.getClass()); @@ -1209,20 +1231,20 @@ private void doSendAndCallExtensions(Message msg) { interceptor.beforeSendMessage(msg); } - doSend(msg); + return doSend(msg); } - private void doSend(Message msg) { + private FutureCompletion doSend(Message msg) { evalThreadContextToMessage(msg); if (logger.isTraceEnabled() && islogMessage(msg)) { logger.trace(String.format("[msg send]: %s", dumpMessage(msg))); } - new MessageSender(msg).send(); + return new MessageSender(msg).send(); } - private void send(Message msg, Boolean noNeedReply) { + private FutureCompletion send(Message msg, Boolean noNeedReply) { if (msg.getServiceId() == null) { throw new IllegalArgumentException(String.format("service id cannot be null: %s", msg.getClass().getName())); } @@ -1238,7 +1260,7 @@ private void send(Message msg, Boolean noNeedReply) { msg.putHeaderEntry(NO_NEED_REPLY_MSG, noNeedReply.toString()); } - doSendAndCallExtensions(msg); + return doSendAndCallExtensions(msg); } private void restoreFromSchema(Message msg, Map raw) throws ClassNotFoundException { diff --git a/core/src/main/java/org/zstack/core/thread/DispatchQueueImpl.java b/core/src/main/java/org/zstack/core/thread/DispatchQueueImpl.java index 4c82196e5c2..c074e4ae158 100755 --- a/core/src/main/java/org/zstack/core/thread/DispatchQueueImpl.java +++ b/core/src/main/java/org/zstack/core/thread/DispatchQueueImpl.java @@ -8,7 +8,6 @@ import org.zstack.core.debug.DebugManager; import org.zstack.core.debug.DebugSignalHandler; import org.zstack.header.Constants; -import org.zstack.header.core.Completion; import org.zstack.header.core.ExceptionSafe; import org.zstack.header.core.ReturnValueCompletion; import org.zstack.header.core.progress.ChainInfo; @@ -107,47 +106,33 @@ public void handleDebugSignal() { public void beforeCleanQueuedumpThread(String signatureName) { String title = "\n================= Before Clean Task Queue Dump ================"; - dumpsignatureNameThread(signatureName,title); + dumpSignatureNameThread(signatureName,title); } public void afterCleanQueuedumpThread(String signatureName) { String title = "\n================= After Clean Task Queue Dump ================"; - dumpsignatureNameThread(signatureName,title); + dumpSignatureNameThread(signatureName,title); } - public void dumpsignatureNameThread(String signatureName,String title) { + public void dumpSignatureNameThread(String signatureName, String title) { StringBuilder sb = new StringBuilder(); sb.append(title); sb.append("\nASYNC TASK QUEUE DUMP:"); sb.append(String.format("\nTASK QUEUE NUMBER: %s\n", chainTasks.size())); List asyncTasks = new ArrayList<>(); - long now = System.currentTimeMillis(); - synchronized (chainTasks) { - ChainTaskQueueWrapper w = chainTasks.get(signatureName); - if (w == null) { - sb.append(String.format("\n===== NO QUEUE SYNC SIGNATURE: %s =====", signatureName)); - sb.append(StringUtils.join(asyncTasks, "\n")); - sb.append("\n================= END TASK QUEUE DUMP ==================\n"); - _threadFacade.printThreadsAndTasks(); - logger.debug(sb.toString()); - return; - } - StringBuilder tb = new StringBuilder(String.format("\nQUEUE SYNC SIGNATURE: %s", signatureName)); - tb.append(String.format("\nRUNNING TASK NUMBER: %s", w.runningQueue.size())); - tb.append(String.format("\nPENDING TASK NUMBER: %s", w.pendingQueue.size())); - tb.append(String.format("\nASYNC LEVEL: %s", w.maxThreadNum)); - int index = 0; - for (Object obj : w.runningQueue) { - ChainFuture cf = (ChainFuture) obj; - tb.append(TaskInfoBuilder.buildRunningTaskInfo(cf, now, index++)); - } - for (Object obj : w.pendingQueue) { - ChainFuture cf = (ChainFuture) obj; - tb.append(TaskInfoBuilder.buildPendingTaskInfo(cf, now, index++)); - } - asyncTasks.add(tb.toString()); + ChainTaskQueueWrapper w = chainTasks.get(signatureName); + if (w == null) { + sb.append(String.format("\n===== NO QUEUE SYNC SIGNATURE: %s =====", signatureName)); + sb.append(StringUtils.join(asyncTasks, "\n")); + sb.append("\n================= END TASK QUEUE DUMP ==================\n"); + _threadFacade.printThreadsAndTasks(); + logger.debug(sb.toString()); + return; } + StringBuilder tb = new StringBuilder(String.format("\nQUEUE SYNC SIGNATURE: %s", signatureName)); + tb.append(w.getTaskQueueInfo()); + asyncTasks.add(tb.toString()); sb.append(StringUtils.join(asyncTasks, "\n")); sb.append("\n================= END TASK QUEUE DUMP ==================\n"); @@ -157,26 +142,11 @@ public void dumpsignatureNameThread(String signatureName,String title) { @Override public ChainInfo getChainTaskInfo(String signature) { - long now = System.currentTimeMillis(); - synchronized (chainTasks) { - ChainInfo info = new ChainInfo(); - ChainTaskQueueWrapper w = chainTasks.get(signature); - if (w == null) { - return info; - } - - int index = 0; - for (Object obj : w.runningQueue) { - ChainFuture cf = (ChainFuture) obj; - info.addRunningTask(TaskInfoBuilder.buildRunningTaskInfo(cf, now, index++)); - } - - for (Object obj : w.pendingQueue) { - ChainFuture cf = (ChainFuture) obj; - info.addPendingTask(TaskInfoBuilder.buildPendingTaskInfo(cf, now, index++)); - } - return info; + ChainTaskQueueWrapper w = chainTasks.get(signature); + if (w == null) { + return new ChainInfo(); } + return w.getChainInfo(); } @Override @@ -407,10 +377,9 @@ void addTask(SyncTaskFuture task) { void startThreadIfNeeded() { if (counter.get() >= maxThreadNum) { - int pendingTaskSize = counter.get() - queue.size(); logger.debug(String.format("Sync task syncSignature: %s reached maxThreadNum: %s, current: %d, pending queue size: %d", - syncSignature, maxThreadNum, counter.get(), pendingTaskSize)); - dumpTaskQueueIfNeeded(pendingTaskSize); + syncSignature, maxThreadNum, counter.get(), queue.size())); + dumpTaskQueueIfNeeded(queue.size()); return; } @@ -783,7 +752,7 @@ protected String getTaskQueueInfo() { } private class ChainTaskQueueWrapper extends AbstractTaskQueueWrapper { - LinkedList pendingQueue = new LinkedList(); + final LinkedList pendingQueue = new LinkedList(); final Map subPendingMap = new ConcurrentHashMap<>(); final LinkedList runningQueue = new LinkedList(); AtomicInteger counter = new AtomicInteger(0); @@ -959,28 +928,30 @@ public Void call() { @Override protected String getTaskQueueInfo() { + return getChainInfo().toString(); + } + + protected ChainInfo getChainInfo() { long now = zTimer.getCurrentTimeMillis(); - StringBuilder tb = new StringBuilder(); - tb.append(String.format("\nRUNNING TASK NUMBER: %s", runningQueue.size())); - tb.append(String.format("\nPENDING TASK NUMBER: %s", pendingQueue.size())); - tb.append(String.format("\nASYNC LEVEL: %s", maxThreadNum)); + ChainInfo info = new ChainInfo(); + info.setMaxThreadNum(maxThreadNum); int index = 0; synchronized (runningQueue) { for (Object obj : runningQueue) { ChainFuture cf = (ChainFuture) obj; - tb.append(TaskInfoBuilder.buildRunningTaskInfo(cf, now, index++)); + info.addRunningTask(TaskInfoBuilder.buildRunningTaskInfo(cf, now, index++)); } } - synchronized (pendingQueue) { + // pendingQueue is synchronized with chainTasks, do not synchronize itself + synchronized (chainTasks) { for (Object obj : pendingQueue) { ChainFuture cf = (ChainFuture) obj; - tb.append(TaskInfoBuilder.buildPendingTaskInfo(cf, now, index++)); + info.addPendingTask(TaskInfoBuilder.buildPendingTaskInfo(cf, now, index++)); } } - - return tb.toString(); + return info; } } diff --git a/core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java b/core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java index aa4a1717a68..b454459d6f5 100644 --- a/core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java +++ b/core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java @@ -7,9 +7,7 @@ import org.zstack.header.core.progress.RunningTaskInfo; import org.zstack.header.core.progress.TaskInfo; import org.zstack.header.message.Message; -import org.zstack.utils.Utils; import org.zstack.utils.gson.JSONObjectUtil; -import org.zstack.utils.logging.CLogger; import java.util.ArrayList; import java.util.Map; @@ -21,7 +19,7 @@ public class TaskInfoBuilder { static RunningTaskInfo buildRunningTaskInfo(AbstractTimeStatisticFuture abstractTimeStatisticFuture, long now, int index) { RunningTaskInfo info = new RunningTaskInfo(); - loadTaskInfo(info, abstractTimeStatisticFuture, index); + loadTaskInfo(info, abstractTimeStatisticFuture.getTask(), index); info.setExecutionTime(TimeUnit.MILLISECONDS.toSeconds(now - abstractTimeStatisticFuture.getStartExecutionTimeInMills())); info.setPendingTime(TimeUnit.MILLISECONDS.toSeconds(now - abstractTimeStatisticFuture.getStartPendingTimeInMills()) - info.getExecutionTime()); return info; @@ -29,23 +27,23 @@ static RunningTaskInfo buildRunningTaskInfo(AbstractTimeStatisticFuture abstract static PendingTaskInfo buildPendingTaskInfo(AbstractTimeStatisticFuture abstractTimeStatisticFuture, long now, int index) { PendingTaskInfo info = new PendingTaskInfo(); - loadTaskInfo(info, abstractTimeStatisticFuture, index); + loadTaskInfo(info, abstractTimeStatisticFuture.getTask(), index); info.setPendingTime(TimeUnit.MILLISECONDS.toSeconds(now - abstractTimeStatisticFuture.getStartPendingTimeInMills())); return info; } - static private void loadTaskInfo(TaskInfo info, AbstractTimeStatisticFuture cf, int index) { - info.setName(cf.getTask().getName()); - info.setClassName(cf.getTask().getClass().getSimpleName()); + static private void loadTaskInfo(TaskInfo info, AbstractChainTask task, int index) { + info.setName(task.getName()); + info.setClassName(task.getClass().getSimpleName()); info.setIndex(index); - Map tc = cf.getTask().getThreadContext(); + Map tc = task.getThreadContext(); if (tc != null) { info.setApiName(tc.get(Constants.THREAD_CONTEXT_TASK_NAME)); info.setApiId(tc.get(Constants.THREAD_CONTEXT_API)); } info.setContextList(new ArrayList<>()); - for (AsyncBackup backup : cf.getTask().getBackups()) { + for (AsyncBackup backup : task.getBackups()) { if (backup instanceof Message) { info.getContextList().add(JSONObjectUtil.toJsonString(backup)); } diff --git a/core/src/main/java/org/zstack/core/thread/ThreadFacadeImpl.java b/core/src/main/java/org/zstack/core/thread/ThreadFacadeImpl.java index e8e54abbeb0..04a81cc84c8 100755 --- a/core/src/main/java/org/zstack/core/thread/ThreadFacadeImpl.java +++ b/core/src/main/java/org/zstack/core/thread/ThreadFacadeImpl.java @@ -163,6 +163,7 @@ public void destroy() { @Override public Future submit(Task task) { + _logger.trace(String.format("submit task: %s", task.getName())); return _pool.submit(new Worker(task)); } diff --git a/header/src/main/java/org/zstack/header/core/progress/ChainInfo.java b/header/src/main/java/org/zstack/header/core/progress/ChainInfo.java index bb07ad06050..9ea2be16ef4 100644 --- a/header/src/main/java/org/zstack/header/core/progress/ChainInfo.java +++ b/header/src/main/java/org/zstack/header/core/progress/ChainInfo.java @@ -1,5 +1,7 @@ package org.zstack.header.core.progress; +import org.zstack.header.rest.APINoSee; + import java.util.ArrayList; import java.util.List; @@ -9,6 +11,8 @@ public class ChainInfo { private List runningTask = new ArrayList<>(); private List pendingTask = new ArrayList<>(); + @APINoSee + private long maxThreadNum; public void setPendingTask(List pendingTask) { this.pendingTask = pendingTask; @@ -33,4 +37,25 @@ public void addRunningTask(RunningTaskInfo task) { public void addPendingTask(PendingTaskInfo pendingTask) { this.pendingTask.add(pendingTask); } + + public void setMaxThreadNum(long maxThreadNum) { + this.maxThreadNum = maxThreadNum; + } + + public long getMaxThreadNum() { + return maxThreadNum; + } + + @Override + public String toString() { + StringBuilder tb = new StringBuilder(); + tb.append(String.format("\nRUNNING TASK NUMBER: %s", runningTask.size())); + tb.append(String.format("\nPENDING TASK NUMBER: %s", pendingTask.size())); + tb.append(String.format("\nASYNC LEVEL: %s", maxThreadNum)); + + runningTask.forEach(tb::append); + pendingTask.forEach(tb::append); + + return tb.toString(); + } } diff --git a/header/src/main/java/org/zstack/header/errorcode/SysErrors.java b/header/src/main/java/org/zstack/header/errorcode/SysErrors.java index c59fba9c5b5..32bad1939e6 100755 --- a/header/src/main/java/org/zstack/header/errorcode/SysErrors.java +++ b/header/src/main/java/org/zstack/header/errorcode/SysErrors.java @@ -26,6 +26,9 @@ public enum SysErrors { // ZSphere only MULTIPLE_REASONS(1090), + + // CloudBus related Error - ZSphere only - 211X + CLOUD_BUS_SEND_ERROR(2111), ; private String code; diff --git a/portal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.java b/portal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.java index fb59c563f50..97e90e49de1 100755 --- a/portal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.java +++ b/portal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.java @@ -869,7 +869,17 @@ public void run() { @Override public Void call() { + logger.debug(String.format("heartbeat thread[%s] for management node[uuid:%s] started", + Thread.currentThread().getName(), node().getUuid())); while (true) { + if (Thread.currentThread().isInterrupted()) { + // the heartbeat task may be cancelled by the heartbeat interval change, + // just return, don't break, otherwise it stops the management node + logger.debug(String.format("heartbeat thread[%s] for management node[uuid:%s] is interrupted, exit heartbeat loop", + Thread.currentThread().getName(), node().getUuid())); + return null; + } + try { if (!amIalive()) { logger.warn(String.format("cannot find my[uuid:%s] heartbeat in database, quit process", node().getUuid())); @@ -892,23 +902,19 @@ public Void call() { } sleepAHeartbeatInterval(); - - if (heartBeatTask.isCancelled()) { - // the heartbeat task may be cancelled by the heartbeat interval change, - // just return, don't break, otherwise it stops the management node - return null; - } } stop(); return null; } - private void sleepAHeartbeatInterval() { + private boolean sleepAHeartbeatInterval() { try { TimeUnit.SECONDS.sleep(ManagementNodeGlobalConfig.NODE_HEARTBEAT_INTERVAL.value(Long.class)); + return true; } catch (InterruptedException ignored) { Thread.currentThread().interrupt(); + return false; } } @@ -927,6 +933,10 @@ private boolean handleHeartbeatFailure(Throwable t) { int heartbeatFailureTimes = 0; while (true) { + if (Thread.currentThread().isInterrupted()) { + return false; + } + if (heartbeatFailureTimes > PortalGlobalProperty.MAX_HEARTBEAT_FAILURE) { logger.warn(String.format("the heartbeat has failed %s times that is greater than the max allowed value[%s]," + " quit process", heartbeatFailureTimes, PortalGlobalProperty.MAX_HEARTBEAT_FAILURE), t); diff --git a/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy b/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy index 675c6b5a4e6..7bc81d6cb03 100755 --- a/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy @@ -3,7 +3,10 @@ package org.zstack.test.integration.core.cloudbus import org.zstack.core.Platform import org.zstack.core.cloudbus.* import org.zstack.core.db.DatabaseFacade +import org.zstack.core.thread.ThreadFacade import org.zstack.header.AbstractService +import org.zstack.header.core.FutureCompletion +import org.zstack.header.core.progress.ChainInfo import org.zstack.header.errorcode.OperationFailureException import org.zstack.header.errorcode.SysErrors import org.zstack.header.managementnode.ManagementNodeInventory @@ -22,6 +25,9 @@ import java.util.concurrent.TimeUnit import java.util.function.Consumer class CloudBus3Case extends SubCase { + AbstractService service + CloudBus bus + @Override void clean() { } @@ -222,16 +228,13 @@ class CloudBus3Case extends SubCase { def qmsg = new APIQueryVmInstanceMsg() qmsg.setTimeout(1L) - bus.makeLocalServiceId(qmsg, SERVICE_ID) bus.makeServiceIdByManagementNodeId(qmsg, SERVICE_ID, "some-fake-uuid") MessageReply r = bus.call(qmsg) assert r.error.isError(SysErrors.TIMEOUT) qmsg = new APIQueryVmInstanceMsg() - qmsg.setTimeout(2L) - bus.makeLocalServiceId(qmsg, SERVICE_ID) - bus.makeServiceIdByManagementNodeId(qmsg, SERVICE_ID, "some-fake-uuid") + bus.makeServiceIdByManagementNodeId(qmsg, SERVICE_ID, "some-fake-uuid2") expect(OperationFailureException.class) { bus.call(qmsg) @@ -240,10 +243,225 @@ class CloudBus3Case extends SubCase { bus.unregisterService(service) } + void testFutureCompletionSend() { + CloudBus bus = bean(CloudBus.class) + String SERVICE_ID = "testFutureCompletionSend" + + if (service == null) { + service = new AbstractService() { + @Override + void handleMessage(Message msg) { + bus.reply(msg, new MessageReply()) + } + + @Override + String getId() { + return bus.makeLocalServiceId(SERVICE_ID) + } + + @Override + boolean start() { + return true + } + + @Override + boolean stop() { + return true + } + } + + bus.registerService(service) + } + + StartVmInstanceMsg msg = new StartVmInstanceMsg(vmInstanceUuid: Platform.uuid) + bus.makeLocalServiceId(msg, SERVICE_ID) + FutureCompletion completion = bus.send(msg) + completion.await(5000L) + assert completion.isSuccess() + assert !completion.isTimeout() + + MessageReply[] replyHolder = new MessageReply[1] + NeedReplyMessage nrMsg = new StartVmInstanceMsg(vmInstanceUuid: Platform.uuid) + bus.makeLocalServiceId(nrMsg, SERVICE_ID) + FutureCompletion completion2 = bus.send(nrMsg, new CloudBusCallBack(null) { + @Override + void run(MessageReply reply) { + replyHolder[0] = reply + } + }) + completion2.await(5000L) + assert completion2.isSuccess() + assert !completion2.isTimeout() + retryInSecs { + assert replyHolder[0] != null + assert replyHolder[0].isSuccess() + } + } + + void testFutureCompletionSendToNonExistService() { + CloudBus bus = bean(CloudBus.class) + String NON_EXIST_SERVICE_ID = "NonExistService" + Platform.uuid + + StartVmInstanceMsg msg = new StartVmInstanceMsg(vmInstanceUuid: Platform.uuid) + bus.makeLocalServiceId(msg, NON_EXIST_SERVICE_ID) + FutureCompletion completion = bus.send(msg) + completion.await(5000L) + assert completion.isSuccess() + + + MessageReply[] replyHolder = new MessageReply[1] + NeedReplyMessage nrMsg = new StartVmInstanceMsg(vmInstanceUuid: Platform.uuid) + bus.makeLocalServiceId(nrMsg, NON_EXIST_SERVICE_ID) + FutureCompletion completion2 = bus.send(nrMsg, new CloudBusCallBack(null) { + @Override + void run(MessageReply reply) { + replyHolder[0] = reply + } + }) + completion2.await(5000L) + assert completion2.isSuccess() + retryInSecs { + assert replyHolder[0] != null + assert !replyHolder[0].isSuccess() + } + } + + void testFutureCompletionSendTimeout() { + CloudBus bus = bean(CloudBus.class) + String SERVICE_ID = "testFutureCompletionSendTimeout" + + def service = new AbstractService() { + @Override + void handleMessage(Message msg) { + // Do not reply to simulate timeout + try { + Thread.sleep(3000) + } catch (InterruptedException e) { + e.printStackTrace() + } + } + + @Override + String getId() { + return bus.makeLocalServiceId(SERVICE_ID) + } + + @Override + boolean start() { + return true + } + + @Override + boolean stop() { + return true + } + } + + bus.registerService(service) + + // test timeout (Message msg) + StartVmInstanceMsg msg = new StartVmInstanceMsg(vmInstanceUuid: Platform.uuid) + msg.setTimeout(1) + bus.makeLocalServiceId(msg, SERVICE_ID) + FutureCompletion completion = bus.send(msg) + completion.await(5000L) + assert completion.isSuccess() + + MessageReply[] replyHolder = new MessageReply[1] + NeedReplyMessage nrMsg = new StartVmInstanceMsg(vmInstanceUuid: Platform.uuid) + nrMsg.setTimeout(1) + bus.makeLocalServiceId(nrMsg, SERVICE_ID) + FutureCompletion completion2 = bus.send(nrMsg, new CloudBusCallBack(null) { + @Override + void run(MessageReply reply) { + replyHolder[0] = reply + } + }) + completion2.await(5000L) + assert completion2.isSuccess() + retryInSecs { + assert replyHolder[0] != null + assert !replyHolder[0].isSuccess() + assert replyHolder[0].error.isError(SysErrors.TIMEOUT) + } + + bus.unregisterService(service) + } + + void testFutureCompletionSendToMissingNode() { + CloudBus bus = bean(CloudBus.class) + String SERVICE_ID = "testFutureCompletionSendToMissingNode" + + StartVmInstanceMsg msg = new StartVmInstanceMsg(vmInstanceUuid: Platform.uuid) + msg.setTimeout(2) + bus.makeServiceIdByManagementNodeId(msg, SERVICE_ID, "fake-management-node-uuid") + FutureCompletion completion = bus.send(msg) + completion.await(5000L) + assert !completion.isSuccess() + assert completion.getErrorCode() != null + + MessageReply[] replyHolder = new MessageReply[1] + NeedReplyMessage nrMsg = new StartVmInstanceMsg(vmInstanceUuid: Platform.uuid) + nrMsg.setTimeout(2) + bus.makeServiceIdByManagementNodeId(nrMsg, SERVICE_ID, "fake-management-node-uuid") + FutureCompletion completion2 = bus.send(nrMsg, new CloudBusCallBack(null) { + @Override + void run(MessageReply reply) { + replyHolder[0] = reply + } + }) + completion2.await(5000L) + assert !completion2.isSuccess() + assert completion2.getErrorCode() != null + + retryInSecs { + assert replyHolder[0] != null + assert !replyHolder[0].isSuccess() + // TODO + // assert replyHolder[0].error.isError(SysErrors.MANAGEMENT_NODE_UNAVAILABLE_ERROR) + } + } + + void testSendQueue() { + def threads = [] + 1.upto(50) { i -> + Thread t = Thread.start { + new MessageSender().sendMsg() + } + threads << t + } + + ThreadFacade thdf = bean(ThreadFacade.class) + + ChainInfo info = thdf.getChainTaskInfo("http-send-in-queue") + assert info.runningTask.size() <= 1 + sleep(100) + info = thdf.getChainTaskInfo("http-send-in-queue") + assert info.runningTask.size() <= 1 + + threads.each { it.join() } + } + @Override void test() { + bus = bean(CloudBus.class) + testStepSend() testManagementNodeGone() testSendToMissingNode() + testFutureCompletionSend() + testFutureCompletionSendToNonExistService() + testFutureCompletionSendTimeout() + testFutureCompletionSendToMissingNode() + CloudBusGlobalProperty.HTTP_ALWAYS = true + + testFutureCompletionSend() + testFutureCompletionSendToNonExistService() + testFutureCompletionSendTimeout() + testFutureCompletionSendToMissingNode() + + testSendQueue() + bus.unregisterService(service) + CloudBusGlobalProperty.HTTP_ALWAYS = false } } diff --git a/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy b/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy index 2b7a0a7c2f3..b965b9b2924 100755 --- a/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy @@ -4,10 +4,9 @@ import org.zstack.core.Platform import org.zstack.core.cloudbus.CloudBus import org.zstack.core.cloudbus.CloudBusGlobalConfig import org.zstack.core.cloudbus.DeadMessageManagerImpl -import org.zstack.core.cloudbus.EventFacade import org.zstack.core.db.DatabaseFacade import org.zstack.header.AbstractService -import org.zstack.header.managementnode.ManagementNodeCanonicalEvent +import org.zstack.header.core.FutureCompletion import org.zstack.header.managementnode.ManagementNodeInventory import org.zstack.header.managementnode.ManagementNodeState import org.zstack.header.managementnode.ManagementNodeVO @@ -22,6 +21,8 @@ import org.zstack.testlib.SubCase import java.util.concurrent.TimeUnit class ManagementNodeNotFoundHandlerCase extends SubCase { + DeadMessageManagerImpl mgr + @Override void clean() { } @@ -33,7 +34,7 @@ class ManagementNodeNotFoundHandlerCase extends SubCase { @Override void environment() { - DeadMessageManagerImpl mgr = bean(DeadMessageManagerImpl.class) + mgr = bean(DeadMessageManagerImpl.class) mgr.managementNodeNotFoundHandlers.invalidateAll() mgr.managementNodeNotFoundHandlers.cleanUp() @@ -46,7 +47,7 @@ class ManagementNodeNotFoundHandlerCase extends SubCase { ManagementNodeVO vo = new ManagementNodeVO( hostName: "127.0.0.10", // mock a future heartbeat - heartBeat: new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)).toTimestamp(), + heartBeat: new Date(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(5)).toTimestamp(), uuid: mgmtUuid, port: 8989, state: ManagementNodeState.RUNNING @@ -74,6 +75,7 @@ class ManagementNodeNotFoundHandlerCase extends SubCase { def secondNodeService = new AbstractService() { @Override void handleMessage(Message msg) { + logger.debug("received message[${msg.id}] on service[${getId()}]") message = msg } @@ -95,17 +97,18 @@ class ManagementNodeNotFoundHandlerCase extends SubCase { StartVmInstanceReply reply = new StartVmInstanceReply() bus.makeServiceIdByManagementNodeId(reply, secondServiceName, secondNodeUuid) - bus.send(reply) + FutureCompletion c = bus.send(reply) assert message == null bus.registerService(secondNodeService) + c.await() + assert !mgr.managementNodeNotFoundHandlers.asMap().values().message.contains(reply) Closure cleanup = mockAManagementNode(secondNodeUuid) - retryInSecs { - assert message == null - } + sleep(1000) + assert message == null cleanup() bus.unregisterService(secondNodeService) @@ -120,6 +123,7 @@ class ManagementNodeNotFoundHandlerCase extends SubCase { def secondNodeService = new AbstractService() { @Override void handleMessage(Message msg) { + logger.debug("received message[${msg.id}] on service[${getId()}]") message = msg } @@ -141,17 +145,17 @@ class ManagementNodeNotFoundHandlerCase extends SubCase { StartVmInstanceMsg msg = new StartVmInstanceMsg() bus.makeServiceIdByManagementNodeId(msg, secondServiceName, secondNodeUuid) - bus.send(msg) + FutureCompletion c = bus.send(msg) assert message == null bus.registerService(secondNodeService) - + c.await() + assert !mgr.managementNodeNotFoundHandlers.asMap().values().message.contains(msg) Closure cleanup = mockAManagementNode(secondNodeUuid) - retryInSecs { - assert message == null - } + sleep(1000) + assert message == null cleanup() bus.unregisterService(secondNodeService) @@ -166,6 +170,7 @@ class ManagementNodeNotFoundHandlerCase extends SubCase { def secondNodeService = new AbstractService() { @Override void handleMessage(Message msg) { + logger.debug("received message[${msg.id}] on service[${getId()}]") message = msg } @@ -190,12 +195,15 @@ class ManagementNodeNotFoundHandlerCase extends SubCase { StartVmInstanceReply reply = new StartVmInstanceReply() bus.makeServiceIdByManagementNodeId(reply, secondServiceName, secondNodeUuid) - bus.send(reply) + FutureCompletion c = bus.send(reply) assert message == null bus.registerService(secondNodeService) + // send is a async operation, so we need to wait a bit + c.await() + assert mgr.managementNodeNotFoundHandlers.asMap().values().message.contains(reply) Closure cleanup = mockAManagementNode(secondNodeUuid) retryInSecs { diff --git a/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/MessageSender.java b/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/MessageSender.java new file mode 100644 index 00000000000..3877ecf0bce --- /dev/null +++ b/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/MessageSender.java @@ -0,0 +1,42 @@ +package org.zstack.test.integration.core.cloudbus; + +import org.zstack.core.Platform; +import org.zstack.core.cloudbus.CloudBus; +import org.zstack.core.thread.SyncTask; +import org.zstack.core.thread.ThreadFacade; +import org.zstack.header.core.FutureCompletion; +import org.zstack.header.vm.StartVmInstanceMsg; + +public class MessageSender { + + void sendMsg() { + CloudBus bus = Platform.getComponentLoader().getComponent(CloudBus.class); + ThreadFacade thread = Platform.getComponentLoader().getComponent(ThreadFacade.class); + + thread.syncSubmit(new SyncTask() { + @Override + public Void call() throws Exception { + StartVmInstanceMsg msg = new StartVmInstanceMsg(); + bus.makeLocalServiceId(msg, "testFutureCompletionSend"); + FutureCompletion completion = bus.send(msg); + completion.await(5000L); + return null; + } + + @Override + public String getName() { + return "MessageSender.sendMsg"; + } + + @Override + public int getSyncLevel() { + return 1; + } + + @Override + public String getSyncSignature() { + return getName(); + } + }); + } +} \ No newline at end of file diff --git a/test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy b/test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy index 6684b602860..a6940fab90e 100644 --- a/test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy @@ -58,10 +58,10 @@ class ManagementNodeHeartbeatCase extends SubCase { } void testUnexpectedManagementNodeRecord() { + prepareInvalidRecords() ManagementNodeGlobalConfig.NODE_HEARTBEAT_INTERVAL.updateValue(1) PortalGlobalProperty.MAX_HEARTBEAT_FAILURE = 2 - prepareInvalidRecords() int heartbeatFailureTimeout = ManagementNodeGlobalConfig.NODE_HEARTBEAT_INTERVAL.value(Integer.class) * PortalGlobalProperty.MAX_HEARTBEAT_FAILURE int heartbeatUpdateDelay = 1 * ManagementNodeGlobalConfig.NODE_HEARTBEAT_INTERVAL.value(Integer.class) @@ -80,7 +80,12 @@ class ManagementNodeHeartbeatCase extends SubCase { assert count == 2 // wait one more interval to wait 127.0.0.111 cleaned - sleep(TimeUnit.SECONDS.toMillis(failureInterval * 3)) + // exceed 3s will be added in suspects + // next heartbeat will be clean + // but hb timestamp is second precision, so it will not be added until 4s. + // and cleaned after 5s + // so we need to wait more than 5s + sleep(TimeUnit.SECONDS.toMillis(failureInterval * 3) + 500) count = dbf.count(ManagementNodeVO.class) assert count == 1