diff --git a/conf/errorCodes/sys.xml b/conf/errorCodes/sys.xml
index c5b763f86ba..b21f247c36e 100755
--- a/conf/errorCodes/sys.xml
+++ b/conf/errorCodes/sys.xml
@@ -105,5 +105,10 @@
1090
Multiple reasons
+
+
+ 2111
+ CloudBus message sending related error
+
diff --git a/core/src/main/java/org/zstack/core/Platform.java b/core/src/main/java/org/zstack/core/Platform.java
index a046d3fe5b4..801579c219f 100755
--- a/core/src/main/java/org/zstack/core/Platform.java
+++ b/core/src/main/java/org/zstack/core/Platform.java
@@ -50,6 +50,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
+import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
@@ -817,9 +818,14 @@ private static String getManagementServerIpInternal() {
for (NetworkInterface iface : Collections.list(nets)) {
String name = iface.getName();
if (defaultLine.contains(name)) {
- InetAddress ia = iface.getInetAddresses().nextElement();
- ip = ia.getHostAddress();
- break;
+ for (InetAddress ia : Collections.list(iface.getInetAddresses())) {
+ ip = ia.getHostAddress();
+ if (ia instanceof Inet4Address) {
+ // we prefer IPv4 address
+ ip = ia.getHostAddress();
+ break;
+ }
+ }
}
}
} catch (SocketException e) {
diff --git a/core/src/main/java/org/zstack/core/cloudbus/CloudBus.java b/core/src/main/java/org/zstack/core/cloudbus/CloudBus.java
index efe579df429..5252bc7ca24 100755
--- a/core/src/main/java/org/zstack/core/cloudbus/CloudBus.java
+++ b/core/src/main/java/org/zstack/core/cloudbus/CloudBus.java
@@ -3,6 +3,7 @@
import org.springframework.http.HttpEntity;
import org.zstack.header.Component;
import org.zstack.header.Service;
+import org.zstack.header.core.FutureCompletion;
import org.zstack.header.errorcode.ErrorCode;
import org.zstack.header.exception.CloudConfigureFailException;
import org.zstack.header.message.*;
@@ -12,14 +13,18 @@
import java.util.function.Consumer;
public interface CloudBus extends Component {
- void send(Message msg);
-
+ /**
+ * submit a message sending task into the queue.
+ * @return {@link FutureCompletion} which can be used to check whether the message was successfully sent or not.
+ */
+ FutureCompletion send(Message msg);
+
void send(List msgs);
@Deprecated
void send(APIMessage msg, Consumer consumer);
- void send(NeedReplyMessage msg, CloudBusCallBack callback);
+ FutureCompletion send(NeedReplyMessage msg, CloudBusCallBack callback);
@Deprecated
void send(List extends NeedReplyMessage> msgs, CloudBusListCallBack callBack);
diff --git a/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.java b/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.java
index 10e03926888..ca7938904ee 100755
--- a/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.java
+++ b/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.java
@@ -19,6 +19,7 @@
import org.zstack.core.timeout.ApiTimeoutManager;
import org.zstack.header.Constants;
import org.zstack.header.Service;
+import org.zstack.header.core.FutureCompletion;
import org.zstack.header.log.NoLogging;
import org.zstack.header.apimediator.APIIsReadyToGoMsg;
import org.zstack.header.apimediator.APIIsReadyToGoReply;
@@ -106,6 +107,11 @@ public class CloudBusImpl2 implements CloudBus, CloudBusIN, ManagementNodeChange
private final String AMQP_PROPERTY_HEADER__COMPRESSED = "compressed";
private String SERVICE_ID = makeLocalServiceId("cloudbus");
+ public static final FutureCompletion SEND_CONFIRMED = new FutureCompletion(null);
+
+ static {
+ SEND_CONFIRMED.success();
+ }
public void setDEFAULT_MESSAGE_TIMEOUT(long timeout) {
this.DEFAULT_MESSAGE_TIMEOUT = timeout;
@@ -1323,8 +1329,9 @@ private void send(Message msg, Boolean noNeedReply) {
}
@Override
- public void send(Message msg) {
+ public FutureCompletion send(Message msg) {
send(msg, true);
+ return SEND_CONFIRMED;
}
@Override
@@ -1354,7 +1361,7 @@ private void evaluateMessageTimeout(NeedReplyMessage msg) {
}
@Override
- public void send(final NeedReplyMessage msg, final CloudBusCallBack callback) {
+ public FutureCompletion send(final NeedReplyMessage msg, final CloudBusCallBack callback) {
evaluateMessageTimeout(msg);
Envelope e = new Envelope() {
@@ -1405,6 +1412,7 @@ List getRequests() {
envelopes.put(msg.getId(), e);
send(msg, false);
+ return SEND_CONFIRMED;
}
private MessageReply createTimeoutReply(NeedReplyMessage m) {
diff --git a/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java b/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java
index 0d83683708a..889a61089a1 100755
--- a/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java
+++ b/core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java
@@ -49,6 +49,7 @@
import static org.zstack.core.Platform.*;
import static org.zstack.core.cloudbus.CloudBusGlobalProperty.SYNC_CALL_TIMEOUT;
+import static org.zstack.header.errorcode.SysErrors.CLOUD_BUS_SEND_ERROR;
import static org.zstack.utils.BeanUtils.getProperty;
import static org.zstack.utils.BeanUtils.setProperty;
@@ -101,6 +102,7 @@ public class CloudBusImpl3 implements CloudBus, CloudBusIN {
private final static TimeoutRestTemplate http = RESTFacade.createRestTemplate(CoreGlobalProperty.REST_FACADE_READ_TIMEOUT, CoreGlobalProperty.REST_FACADE_CONNECT_TIMEOUT);
public static final String HTTP_BASE_URL = "/cloudbus";
+ public static final FutureCompletion SEND_CONFIRMED = new FutureCompletion(null);
{
if (CloudBusGlobalProperty.MESSAGE_LOG != null) {
@@ -114,6 +116,8 @@ public class CloudBusImpl3 implements CloudBus, CloudBusIN {
CloudBusGlobalProperty.HTTP_CONTEXT_PATH = "";
CloudBusGlobalProperty.HTTP_PORT = 8989;
}
+
+ SEND_CONFIRMED.success();
}
public static String getManagementNodeUUIDFromServiceID(String serviceID) {
@@ -125,6 +129,12 @@ public static String getManagementNodeUUIDFromServiceID(String serviceID) {
return ss[0];
}
+ private FutureCompletion sendFail(ErrorCode errorCode) {
+ FutureCompletion c = new FutureCompletion(null);
+ c.fail(errorCode);
+ return c;
+ }
+
private abstract class Envelope {
long startTime;
@@ -250,8 +260,8 @@ public void deActiveService(String id) {
}
@Override
- public void send(Message msg) {
- send(msg, true);
+ public FutureCompletion send(Message msg) {
+ return send(msg, true);
}
@Override
@@ -291,11 +301,11 @@ private MessageReply createTimeoutReply(NeedReplyMessage m) {
}
@Override
- public void send(NeedReplyMessage msg, CloudBusCallBack callback) {
+ public FutureCompletion send(NeedReplyMessage msg, CloudBusCallBack callback) {
evaluateMessageTimeout(msg);
if (msg.getTimeout() <= 1) {
callback.run(createTimeoutReply(msg));
- return;
+ return SEND_CONFIRMED;
}
Envelope e = new Envelope() {
@@ -346,7 +356,7 @@ public void timeout() {
envelopes.put(msg.getId(), e);
msgExts.forEach(m -> m.afterAddEnvelopes(msg.getId()));
- send(msg, false);
+ return send(msg, false);
}
@Override
@@ -454,7 +464,7 @@ public void reply(Message request, MessageReply reply) {
callReplyPreSendingExtensions(reply, (NeedReplyMessage) request);
} catch (Exception e) {
logger.error("failed to call pre-sending reply extension:", e);
- reply.setError(operr(e.getMessage()));
+ reply.setError(err(CLOUD_BUS_SEND_ERROR, e.getMessage()));
}
}
@@ -526,29 +536,36 @@ public MessageSender(Message msg) {
localSend = !CloudBusGlobalProperty.HTTP_ALWAYS && managementNodeId.equals(Platform.getManagementServerId());
}
- void send() {
+ FutureCompletion send() {
try {
- doSend();
+ return doSend();
} catch (Throwable th) {
- replyErrorIfNeeded(operr(th.getMessage()));
+ ErrorCode err = err(CLOUD_BUS_SEND_ERROR, th.getMessage());
+ replyErrorIfNeeded(err);
+
+ FutureCompletion c = new FutureCompletion(null);
+ c.fail(err);
+ return c;
}
}
- private void doSend() {
+ private FutureCompletion doSend() {
if (msg instanceof Event) {
eventSend();
- return;
+ return SEND_CONFIRMED;
}
if (localSend) {
localSend();
+ return SEND_CONFIRMED;
} else {
- httpSend();
+ return httpSend();
}
}
- private void httpSendInQueue(String ip) {
- thdf.chainSubmit(new ChainTask(null) {
+ private FutureCompletion httpSendInQueue(String ip) {
+ FutureCompletion sendCompletion = new FutureCompletion(null);
+ thdf.chainSubmit(new ChainTask(sendCompletion) {
@Override
public String getSyncSignature() {
return "http-send-in-queue";
@@ -557,6 +574,7 @@ public String getSyncSignature() {
@Override
public void run(SyncTaskChain chain) {
httpSend(ip);
+ sendCompletion.success();
chain.next();
}
@@ -570,25 +588,29 @@ public String getName() {
return getSyncSignature();
}
});
+ return sendCompletion;
}
- private void httpSend() {
+ private FutureCompletion httpSend() {
buildSchema(msg);
+ String ip;
try {
- String ip = destMaker.getNodeInfo(managementNodeId).getNodeIP();
- httpSendInQueue(ip);
+ ip = destMaker.getNodeInfo(managementNodeId).getNodeIP();
} catch (ManagementNodeNotFoundException e) {
- if (msg instanceof MessageReply) {
- if (!deadMessageManager.handleManagementNodeNotFoundError(managementNodeId, msg, () -> {
- String ip = destMaker.getNodeInfo(managementNodeId).getNodeIP();
- httpSendInQueue(ip);
- })) {
- throw e;
- }
+ boolean errorHandled = msg instanceof MessageReply &&
+ deadMessageManager.handleManagementNodeNotFoundError(managementNodeId, msg, () -> {
+ String otherIp = destMaker.getNodeInfo(managementNodeId).getNodeIP();
+ logger.warn(String.format("resend the message[id:%s] to node[ip:%s]", msg.getId(), otherIp));
+ httpSendInQueue(otherIp);
+ });
+ if (errorHandled) {
+ return SEND_CONFIRMED;
} else {
throw e;
}
}
+
+ return httpSendInQueue(ip);
}
private void httpSend(String ip) {
@@ -612,12 +634,12 @@ protected ResponseEntity call() {
}.run();
if (!rsp.getStatusCode().is2xxSuccessful()) {
- replyErrorIfNeeded(operr("HTTP ERROR, status code: %s, body: %s", rsp.getStatusCode(), rsp.getBody()));
+ replyErrorIfNeeded(err(CLOUD_BUS_SEND_ERROR, "HTTP ERROR, status code: %s, body: %s", rsp.getStatusCode(), rsp.getBody()));
}
} catch (OperationFailureException e) {
replyErrorIfNeeded(e.getErrorCode());
} catch (Throwable e) {
- replyErrorIfNeeded(operr(e.getMessage()));
+ replyErrorIfNeeded(err(CLOUD_BUS_SEND_ERROR, e.getMessage()));
}
}
@@ -1194,7 +1216,7 @@ private void evalThreadContextToMessage(Message msg) {
}
}
- private void doSendAndCallExtensions(Message msg) {
+ private FutureCompletion doSendAndCallExtensions(Message msg) {
// for unit test finding invocation chain
MessageCommandRecorder.record(msg.getClass());
@@ -1209,20 +1231,20 @@ private void doSendAndCallExtensions(Message msg) {
interceptor.beforeSendMessage(msg);
}
- doSend(msg);
+ return doSend(msg);
}
- private void doSend(Message msg) {
+ private FutureCompletion doSend(Message msg) {
evalThreadContextToMessage(msg);
if (logger.isTraceEnabled() && islogMessage(msg)) {
logger.trace(String.format("[msg send]: %s", dumpMessage(msg)));
}
- new MessageSender(msg).send();
+ return new MessageSender(msg).send();
}
- private void send(Message msg, Boolean noNeedReply) {
+ private FutureCompletion send(Message msg, Boolean noNeedReply) {
if (msg.getServiceId() == null) {
throw new IllegalArgumentException(String.format("service id cannot be null: %s", msg.getClass().getName()));
}
@@ -1238,7 +1260,7 @@ private void send(Message msg, Boolean noNeedReply) {
msg.putHeaderEntry(NO_NEED_REPLY_MSG, noNeedReply.toString());
}
- doSendAndCallExtensions(msg);
+ return doSendAndCallExtensions(msg);
}
private void restoreFromSchema(Message msg, Map raw) throws ClassNotFoundException {
diff --git a/core/src/main/java/org/zstack/core/thread/DispatchQueueImpl.java b/core/src/main/java/org/zstack/core/thread/DispatchQueueImpl.java
index 4c82196e5c2..c074e4ae158 100755
--- a/core/src/main/java/org/zstack/core/thread/DispatchQueueImpl.java
+++ b/core/src/main/java/org/zstack/core/thread/DispatchQueueImpl.java
@@ -8,7 +8,6 @@
import org.zstack.core.debug.DebugManager;
import org.zstack.core.debug.DebugSignalHandler;
import org.zstack.header.Constants;
-import org.zstack.header.core.Completion;
import org.zstack.header.core.ExceptionSafe;
import org.zstack.header.core.ReturnValueCompletion;
import org.zstack.header.core.progress.ChainInfo;
@@ -107,47 +106,33 @@ public void handleDebugSignal() {
public void beforeCleanQueuedumpThread(String signatureName) {
String title = "\n================= Before Clean Task Queue Dump ================";
- dumpsignatureNameThread(signatureName,title);
+ dumpSignatureNameThread(signatureName,title);
}
public void afterCleanQueuedumpThread(String signatureName) {
String title = "\n================= After Clean Task Queue Dump ================";
- dumpsignatureNameThread(signatureName,title);
+ dumpSignatureNameThread(signatureName,title);
}
- public void dumpsignatureNameThread(String signatureName,String title) {
+ public void dumpSignatureNameThread(String signatureName, String title) {
StringBuilder sb = new StringBuilder();
sb.append(title);
sb.append("\nASYNC TASK QUEUE DUMP:");
sb.append(String.format("\nTASK QUEUE NUMBER: %s\n", chainTasks.size()));
List asyncTasks = new ArrayList<>();
- long now = System.currentTimeMillis();
- synchronized (chainTasks) {
- ChainTaskQueueWrapper w = chainTasks.get(signatureName);
- if (w == null) {
- sb.append(String.format("\n===== NO QUEUE SYNC SIGNATURE: %s =====", signatureName));
- sb.append(StringUtils.join(asyncTasks, "\n"));
- sb.append("\n================= END TASK QUEUE DUMP ==================\n");
- _threadFacade.printThreadsAndTasks();
- logger.debug(sb.toString());
- return;
- }
- StringBuilder tb = new StringBuilder(String.format("\nQUEUE SYNC SIGNATURE: %s", signatureName));
- tb.append(String.format("\nRUNNING TASK NUMBER: %s", w.runningQueue.size()));
- tb.append(String.format("\nPENDING TASK NUMBER: %s", w.pendingQueue.size()));
- tb.append(String.format("\nASYNC LEVEL: %s", w.maxThreadNum));
- int index = 0;
- for (Object obj : w.runningQueue) {
- ChainFuture cf = (ChainFuture) obj;
- tb.append(TaskInfoBuilder.buildRunningTaskInfo(cf, now, index++));
- }
- for (Object obj : w.pendingQueue) {
- ChainFuture cf = (ChainFuture) obj;
- tb.append(TaskInfoBuilder.buildPendingTaskInfo(cf, now, index++));
- }
- asyncTasks.add(tb.toString());
+ ChainTaskQueueWrapper w = chainTasks.get(signatureName);
+ if (w == null) {
+ sb.append(String.format("\n===== NO QUEUE SYNC SIGNATURE: %s =====", signatureName));
+ sb.append(StringUtils.join(asyncTasks, "\n"));
+ sb.append("\n================= END TASK QUEUE DUMP ==================\n");
+ _threadFacade.printThreadsAndTasks();
+ logger.debug(sb.toString());
+ return;
}
+ StringBuilder tb = new StringBuilder(String.format("\nQUEUE SYNC SIGNATURE: %s", signatureName));
+ tb.append(w.getTaskQueueInfo());
+ asyncTasks.add(tb.toString());
sb.append(StringUtils.join(asyncTasks, "\n"));
sb.append("\n================= END TASK QUEUE DUMP ==================\n");
@@ -157,26 +142,11 @@ public void dumpsignatureNameThread(String signatureName,String title) {
@Override
public ChainInfo getChainTaskInfo(String signature) {
- long now = System.currentTimeMillis();
- synchronized (chainTasks) {
- ChainInfo info = new ChainInfo();
- ChainTaskQueueWrapper w = chainTasks.get(signature);
- if (w == null) {
- return info;
- }
-
- int index = 0;
- for (Object obj : w.runningQueue) {
- ChainFuture cf = (ChainFuture) obj;
- info.addRunningTask(TaskInfoBuilder.buildRunningTaskInfo(cf, now, index++));
- }
-
- for (Object obj : w.pendingQueue) {
- ChainFuture cf = (ChainFuture) obj;
- info.addPendingTask(TaskInfoBuilder.buildPendingTaskInfo(cf, now, index++));
- }
- return info;
+ ChainTaskQueueWrapper w = chainTasks.get(signature);
+ if (w == null) {
+ return new ChainInfo();
}
+ return w.getChainInfo();
}
@Override
@@ -407,10 +377,9 @@ void addTask(SyncTaskFuture task) {
void startThreadIfNeeded() {
if (counter.get() >= maxThreadNum) {
- int pendingTaskSize = counter.get() - queue.size();
logger.debug(String.format("Sync task syncSignature: %s reached maxThreadNum: %s, current: %d, pending queue size: %d",
- syncSignature, maxThreadNum, counter.get(), pendingTaskSize));
- dumpTaskQueueIfNeeded(pendingTaskSize);
+ syncSignature, maxThreadNum, counter.get(), queue.size()));
+ dumpTaskQueueIfNeeded(queue.size());
return;
}
@@ -783,7 +752,7 @@ protected String getTaskQueueInfo() {
}
private class ChainTaskQueueWrapper extends AbstractTaskQueueWrapper {
- LinkedList pendingQueue = new LinkedList();
+ final LinkedList pendingQueue = new LinkedList();
final Map subPendingMap = new ConcurrentHashMap<>();
final LinkedList runningQueue = new LinkedList();
AtomicInteger counter = new AtomicInteger(0);
@@ -959,28 +928,30 @@ public Void call() {
@Override
protected String getTaskQueueInfo() {
+ return getChainInfo().toString();
+ }
+
+ protected ChainInfo getChainInfo() {
long now = zTimer.getCurrentTimeMillis();
- StringBuilder tb = new StringBuilder();
- tb.append(String.format("\nRUNNING TASK NUMBER: %s", runningQueue.size()));
- tb.append(String.format("\nPENDING TASK NUMBER: %s", pendingQueue.size()));
- tb.append(String.format("\nASYNC LEVEL: %s", maxThreadNum));
+ ChainInfo info = new ChainInfo();
+ info.setMaxThreadNum(maxThreadNum);
int index = 0;
synchronized (runningQueue) {
for (Object obj : runningQueue) {
ChainFuture cf = (ChainFuture) obj;
- tb.append(TaskInfoBuilder.buildRunningTaskInfo(cf, now, index++));
+ info.addRunningTask(TaskInfoBuilder.buildRunningTaskInfo(cf, now, index++));
}
}
- synchronized (pendingQueue) {
+ // pendingQueue is synchronized with chainTasks, do not synchronize itself
+ synchronized (chainTasks) {
for (Object obj : pendingQueue) {
ChainFuture cf = (ChainFuture) obj;
- tb.append(TaskInfoBuilder.buildPendingTaskInfo(cf, now, index++));
+ info.addPendingTask(TaskInfoBuilder.buildPendingTaskInfo(cf, now, index++));
}
}
-
- return tb.toString();
+ return info;
}
}
diff --git a/core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java b/core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java
index aa4a1717a68..b454459d6f5 100644
--- a/core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java
+++ b/core/src/main/java/org/zstack/core/thread/TaskInfoBuilder.java
@@ -7,9 +7,7 @@
import org.zstack.header.core.progress.RunningTaskInfo;
import org.zstack.header.core.progress.TaskInfo;
import org.zstack.header.message.Message;
-import org.zstack.utils.Utils;
import org.zstack.utils.gson.JSONObjectUtil;
-import org.zstack.utils.logging.CLogger;
import java.util.ArrayList;
import java.util.Map;
@@ -21,7 +19,7 @@
public class TaskInfoBuilder {
static RunningTaskInfo buildRunningTaskInfo(AbstractTimeStatisticFuture abstractTimeStatisticFuture, long now, int index) {
RunningTaskInfo info = new RunningTaskInfo();
- loadTaskInfo(info, abstractTimeStatisticFuture, index);
+ loadTaskInfo(info, abstractTimeStatisticFuture.getTask(), index);
info.setExecutionTime(TimeUnit.MILLISECONDS.toSeconds(now - abstractTimeStatisticFuture.getStartExecutionTimeInMills()));
info.setPendingTime(TimeUnit.MILLISECONDS.toSeconds(now - abstractTimeStatisticFuture.getStartPendingTimeInMills()) - info.getExecutionTime());
return info;
@@ -29,23 +27,23 @@ static RunningTaskInfo buildRunningTaskInfo(AbstractTimeStatisticFuture abstract
static PendingTaskInfo buildPendingTaskInfo(AbstractTimeStatisticFuture abstractTimeStatisticFuture, long now, int index) {
PendingTaskInfo info = new PendingTaskInfo();
- loadTaskInfo(info, abstractTimeStatisticFuture, index);
+ loadTaskInfo(info, abstractTimeStatisticFuture.getTask(), index);
info.setPendingTime(TimeUnit.MILLISECONDS.toSeconds(now - abstractTimeStatisticFuture.getStartPendingTimeInMills()));
return info;
}
- static private void loadTaskInfo(TaskInfo info, AbstractTimeStatisticFuture cf, int index) {
- info.setName(cf.getTask().getName());
- info.setClassName(cf.getTask().getClass().getSimpleName());
+ static private void loadTaskInfo(TaskInfo info, AbstractChainTask task, int index) {
+ info.setName(task.getName());
+ info.setClassName(task.getClass().getSimpleName());
info.setIndex(index);
- Map tc = cf.getTask().getThreadContext();
+ Map tc = task.getThreadContext();
if (tc != null) {
info.setApiName(tc.get(Constants.THREAD_CONTEXT_TASK_NAME));
info.setApiId(tc.get(Constants.THREAD_CONTEXT_API));
}
info.setContextList(new ArrayList<>());
- for (AsyncBackup backup : cf.getTask().getBackups()) {
+ for (AsyncBackup backup : task.getBackups()) {
if (backup instanceof Message) {
info.getContextList().add(JSONObjectUtil.toJsonString(backup));
}
diff --git a/core/src/main/java/org/zstack/core/thread/ThreadFacadeImpl.java b/core/src/main/java/org/zstack/core/thread/ThreadFacadeImpl.java
index e8e54abbeb0..04a81cc84c8 100755
--- a/core/src/main/java/org/zstack/core/thread/ThreadFacadeImpl.java
+++ b/core/src/main/java/org/zstack/core/thread/ThreadFacadeImpl.java
@@ -163,6 +163,7 @@ public void destroy() {
@Override
public Future submit(Task task) {
+ _logger.trace(String.format("submit task: %s", task.getName()));
return _pool.submit(new Worker(task));
}
diff --git a/header/src/main/java/org/zstack/header/core/progress/ChainInfo.java b/header/src/main/java/org/zstack/header/core/progress/ChainInfo.java
index bb07ad06050..9ea2be16ef4 100644
--- a/header/src/main/java/org/zstack/header/core/progress/ChainInfo.java
+++ b/header/src/main/java/org/zstack/header/core/progress/ChainInfo.java
@@ -1,5 +1,7 @@
package org.zstack.header.core.progress;
+import org.zstack.header.rest.APINoSee;
+
import java.util.ArrayList;
import java.util.List;
@@ -9,6 +11,8 @@
public class ChainInfo {
private List runningTask = new ArrayList<>();
private List pendingTask = new ArrayList<>();
+ @APINoSee
+ private long maxThreadNum;
public void setPendingTask(List pendingTask) {
this.pendingTask = pendingTask;
@@ -33,4 +37,25 @@ public void addRunningTask(RunningTaskInfo task) {
public void addPendingTask(PendingTaskInfo pendingTask) {
this.pendingTask.add(pendingTask);
}
+
+ public void setMaxThreadNum(long maxThreadNum) {
+ this.maxThreadNum = maxThreadNum;
+ }
+
+ public long getMaxThreadNum() {
+ return maxThreadNum;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder tb = new StringBuilder();
+ tb.append(String.format("\nRUNNING TASK NUMBER: %s", runningTask.size()));
+ tb.append(String.format("\nPENDING TASK NUMBER: %s", pendingTask.size()));
+ tb.append(String.format("\nASYNC LEVEL: %s", maxThreadNum));
+
+ runningTask.forEach(tb::append);
+ pendingTask.forEach(tb::append);
+
+ return tb.toString();
+ }
}
diff --git a/header/src/main/java/org/zstack/header/errorcode/SysErrors.java b/header/src/main/java/org/zstack/header/errorcode/SysErrors.java
index c59fba9c5b5..32bad1939e6 100755
--- a/header/src/main/java/org/zstack/header/errorcode/SysErrors.java
+++ b/header/src/main/java/org/zstack/header/errorcode/SysErrors.java
@@ -26,6 +26,9 @@ public enum SysErrors {
// ZSphere only
MULTIPLE_REASONS(1090),
+
+ // CloudBus related Error - ZSphere only - 211X
+ CLOUD_BUS_SEND_ERROR(2111),
;
private String code;
diff --git a/portal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.java b/portal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.java
index fb59c563f50..97e90e49de1 100755
--- a/portal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.java
+++ b/portal/src/main/java/org/zstack/portal/managementnode/ManagementNodeManagerImpl.java
@@ -869,7 +869,17 @@ public void run() {
@Override
public Void call() {
+ logger.debug(String.format("heartbeat thread[%s] for management node[uuid:%s] started",
+ Thread.currentThread().getName(), node().getUuid()));
while (true) {
+ if (Thread.currentThread().isInterrupted()) {
+ // the heartbeat task may be cancelled by the heartbeat interval change,
+ // just return, don't break, otherwise it stops the management node
+ logger.debug(String.format("heartbeat thread[%s] for management node[uuid:%s] is interrupted, exit heartbeat loop",
+ Thread.currentThread().getName(), node().getUuid()));
+ return null;
+ }
+
try {
if (!amIalive()) {
logger.warn(String.format("cannot find my[uuid:%s] heartbeat in database, quit process", node().getUuid()));
@@ -892,23 +902,19 @@ public Void call() {
}
sleepAHeartbeatInterval();
-
- if (heartBeatTask.isCancelled()) {
- // the heartbeat task may be cancelled by the heartbeat interval change,
- // just return, don't break, otherwise it stops the management node
- return null;
- }
}
stop();
return null;
}
- private void sleepAHeartbeatInterval() {
+ private boolean sleepAHeartbeatInterval() {
try {
TimeUnit.SECONDS.sleep(ManagementNodeGlobalConfig.NODE_HEARTBEAT_INTERVAL.value(Long.class));
+ return true;
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
+ return false;
}
}
@@ -927,6 +933,10 @@ private boolean handleHeartbeatFailure(Throwable t) {
int heartbeatFailureTimes = 0;
while (true) {
+ if (Thread.currentThread().isInterrupted()) {
+ return false;
+ }
+
if (heartbeatFailureTimes > PortalGlobalProperty.MAX_HEARTBEAT_FAILURE) {
logger.warn(String.format("the heartbeat has failed %s times that is greater than the max allowed value[%s]," +
" quit process", heartbeatFailureTimes, PortalGlobalProperty.MAX_HEARTBEAT_FAILURE), t);
diff --git a/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy b/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy
index 675c6b5a4e6..7bc81d6cb03 100755
--- a/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy
+++ b/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/CloudBus3Case.groovy
@@ -3,7 +3,10 @@ package org.zstack.test.integration.core.cloudbus
import org.zstack.core.Platform
import org.zstack.core.cloudbus.*
import org.zstack.core.db.DatabaseFacade
+import org.zstack.core.thread.ThreadFacade
import org.zstack.header.AbstractService
+import org.zstack.header.core.FutureCompletion
+import org.zstack.header.core.progress.ChainInfo
import org.zstack.header.errorcode.OperationFailureException
import org.zstack.header.errorcode.SysErrors
import org.zstack.header.managementnode.ManagementNodeInventory
@@ -22,6 +25,9 @@ import java.util.concurrent.TimeUnit
import java.util.function.Consumer
class CloudBus3Case extends SubCase {
+ AbstractService service
+ CloudBus bus
+
@Override
void clean() {
}
@@ -222,16 +228,13 @@ class CloudBus3Case extends SubCase {
def qmsg = new APIQueryVmInstanceMsg()
qmsg.setTimeout(1L)
- bus.makeLocalServiceId(qmsg, SERVICE_ID)
bus.makeServiceIdByManagementNodeId(qmsg, SERVICE_ID, "some-fake-uuid")
MessageReply r = bus.call(qmsg)
assert r.error.isError(SysErrors.TIMEOUT)
qmsg = new APIQueryVmInstanceMsg()
- qmsg.setTimeout(2L)
- bus.makeLocalServiceId(qmsg, SERVICE_ID)
- bus.makeServiceIdByManagementNodeId(qmsg, SERVICE_ID, "some-fake-uuid")
+ bus.makeServiceIdByManagementNodeId(qmsg, SERVICE_ID, "some-fake-uuid2")
expect(OperationFailureException.class) {
bus.call(qmsg)
@@ -240,10 +243,225 @@ class CloudBus3Case extends SubCase {
bus.unregisterService(service)
}
+ void testFutureCompletionSend() {
+ CloudBus bus = bean(CloudBus.class)
+ String SERVICE_ID = "testFutureCompletionSend"
+
+ if (service == null) {
+ service = new AbstractService() {
+ @Override
+ void handleMessage(Message msg) {
+ bus.reply(msg, new MessageReply())
+ }
+
+ @Override
+ String getId() {
+ return bus.makeLocalServiceId(SERVICE_ID)
+ }
+
+ @Override
+ boolean start() {
+ return true
+ }
+
+ @Override
+ boolean stop() {
+ return true
+ }
+ }
+
+ bus.registerService(service)
+ }
+
+ StartVmInstanceMsg msg = new StartVmInstanceMsg(vmInstanceUuid: Platform.uuid)
+ bus.makeLocalServiceId(msg, SERVICE_ID)
+ FutureCompletion completion = bus.send(msg)
+ completion.await(5000L)
+ assert completion.isSuccess()
+ assert !completion.isTimeout()
+
+ MessageReply[] replyHolder = new MessageReply[1]
+ NeedReplyMessage nrMsg = new StartVmInstanceMsg(vmInstanceUuid: Platform.uuid)
+ bus.makeLocalServiceId(nrMsg, SERVICE_ID)
+ FutureCompletion completion2 = bus.send(nrMsg, new CloudBusCallBack(null) {
+ @Override
+ void run(MessageReply reply) {
+ replyHolder[0] = reply
+ }
+ })
+ completion2.await(5000L)
+ assert completion2.isSuccess()
+ assert !completion2.isTimeout()
+ retryInSecs {
+ assert replyHolder[0] != null
+ assert replyHolder[0].isSuccess()
+ }
+ }
+
+ void testFutureCompletionSendToNonExistService() {
+ CloudBus bus = bean(CloudBus.class)
+ String NON_EXIST_SERVICE_ID = "NonExistService" + Platform.uuid
+
+ StartVmInstanceMsg msg = new StartVmInstanceMsg(vmInstanceUuid: Platform.uuid)
+ bus.makeLocalServiceId(msg, NON_EXIST_SERVICE_ID)
+ FutureCompletion completion = bus.send(msg)
+ completion.await(5000L)
+ assert completion.isSuccess()
+
+
+ MessageReply[] replyHolder = new MessageReply[1]
+ NeedReplyMessage nrMsg = new StartVmInstanceMsg(vmInstanceUuid: Platform.uuid)
+ bus.makeLocalServiceId(nrMsg, NON_EXIST_SERVICE_ID)
+ FutureCompletion completion2 = bus.send(nrMsg, new CloudBusCallBack(null) {
+ @Override
+ void run(MessageReply reply) {
+ replyHolder[0] = reply
+ }
+ })
+ completion2.await(5000L)
+ assert completion2.isSuccess()
+ retryInSecs {
+ assert replyHolder[0] != null
+ assert !replyHolder[0].isSuccess()
+ }
+ }
+
+ void testFutureCompletionSendTimeout() {
+ CloudBus bus = bean(CloudBus.class)
+ String SERVICE_ID = "testFutureCompletionSendTimeout"
+
+ def service = new AbstractService() {
+ @Override
+ void handleMessage(Message msg) {
+ // Do not reply to simulate timeout
+ try {
+ Thread.sleep(3000)
+ } catch (InterruptedException e) {
+ e.printStackTrace()
+ }
+ }
+
+ @Override
+ String getId() {
+ return bus.makeLocalServiceId(SERVICE_ID)
+ }
+
+ @Override
+ boolean start() {
+ return true
+ }
+
+ @Override
+ boolean stop() {
+ return true
+ }
+ }
+
+ bus.registerService(service)
+
+ // test timeout (Message msg)
+ StartVmInstanceMsg msg = new StartVmInstanceMsg(vmInstanceUuid: Platform.uuid)
+ msg.setTimeout(1)
+ bus.makeLocalServiceId(msg, SERVICE_ID)
+ FutureCompletion completion = bus.send(msg)
+ completion.await(5000L)
+ assert completion.isSuccess()
+
+ MessageReply[] replyHolder = new MessageReply[1]
+ NeedReplyMessage nrMsg = new StartVmInstanceMsg(vmInstanceUuid: Platform.uuid)
+ nrMsg.setTimeout(1)
+ bus.makeLocalServiceId(nrMsg, SERVICE_ID)
+ FutureCompletion completion2 = bus.send(nrMsg, new CloudBusCallBack(null) {
+ @Override
+ void run(MessageReply reply) {
+ replyHolder[0] = reply
+ }
+ })
+ completion2.await(5000L)
+ assert completion2.isSuccess()
+ retryInSecs {
+ assert replyHolder[0] != null
+ assert !replyHolder[0].isSuccess()
+ assert replyHolder[0].error.isError(SysErrors.TIMEOUT)
+ }
+
+ bus.unregisterService(service)
+ }
+
+ void testFutureCompletionSendToMissingNode() {
+ CloudBus bus = bean(CloudBus.class)
+ String SERVICE_ID = "testFutureCompletionSendToMissingNode"
+
+ StartVmInstanceMsg msg = new StartVmInstanceMsg(vmInstanceUuid: Platform.uuid)
+ msg.setTimeout(2)
+ bus.makeServiceIdByManagementNodeId(msg, SERVICE_ID, "fake-management-node-uuid")
+ FutureCompletion completion = bus.send(msg)
+ completion.await(5000L)
+ assert !completion.isSuccess()
+ assert completion.getErrorCode() != null
+
+ MessageReply[] replyHolder = new MessageReply[1]
+ NeedReplyMessage nrMsg = new StartVmInstanceMsg(vmInstanceUuid: Platform.uuid)
+ nrMsg.setTimeout(2)
+ bus.makeServiceIdByManagementNodeId(nrMsg, SERVICE_ID, "fake-management-node-uuid")
+ FutureCompletion completion2 = bus.send(nrMsg, new CloudBusCallBack(null) {
+ @Override
+ void run(MessageReply reply) {
+ replyHolder[0] = reply
+ }
+ })
+ completion2.await(5000L)
+ assert !completion2.isSuccess()
+ assert completion2.getErrorCode() != null
+
+ retryInSecs {
+ assert replyHolder[0] != null
+ assert !replyHolder[0].isSuccess()
+ // TODO
+ // assert replyHolder[0].error.isError(SysErrors.MANAGEMENT_NODE_UNAVAILABLE_ERROR)
+ }
+ }
+
+ void testSendQueue() {
+ def threads = []
+ 1.upto(50) { i ->
+ Thread t = Thread.start {
+ new MessageSender().sendMsg()
+ }
+ threads << t
+ }
+
+ ThreadFacade thdf = bean(ThreadFacade.class)
+
+ ChainInfo info = thdf.getChainTaskInfo("http-send-in-queue")
+ assert info.runningTask.size() <= 1
+ sleep(100)
+ info = thdf.getChainTaskInfo("http-send-in-queue")
+ assert info.runningTask.size() <= 1
+
+ threads.each { it.join() }
+ }
+
@Override
void test() {
+ bus = bean(CloudBus.class)
+
testStepSend()
testManagementNodeGone()
testSendToMissingNode()
+ testFutureCompletionSend()
+ testFutureCompletionSendToNonExistService()
+ testFutureCompletionSendTimeout()
+ testFutureCompletionSendToMissingNode()
+ CloudBusGlobalProperty.HTTP_ALWAYS = true
+
+ testFutureCompletionSend()
+ testFutureCompletionSendToNonExistService()
+ testFutureCompletionSendTimeout()
+ testFutureCompletionSendToMissingNode()
+
+ testSendQueue()
+ bus.unregisterService(service)
+ CloudBusGlobalProperty.HTTP_ALWAYS = false
}
}
diff --git a/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy b/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy
index 2b7a0a7c2f3..b965b9b2924 100755
--- a/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy
+++ b/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/ManagementNodeNotFoundHandlerCase.groovy
@@ -4,10 +4,9 @@ import org.zstack.core.Platform
import org.zstack.core.cloudbus.CloudBus
import org.zstack.core.cloudbus.CloudBusGlobalConfig
import org.zstack.core.cloudbus.DeadMessageManagerImpl
-import org.zstack.core.cloudbus.EventFacade
import org.zstack.core.db.DatabaseFacade
import org.zstack.header.AbstractService
-import org.zstack.header.managementnode.ManagementNodeCanonicalEvent
+import org.zstack.header.core.FutureCompletion
import org.zstack.header.managementnode.ManagementNodeInventory
import org.zstack.header.managementnode.ManagementNodeState
import org.zstack.header.managementnode.ManagementNodeVO
@@ -22,6 +21,8 @@ import org.zstack.testlib.SubCase
import java.util.concurrent.TimeUnit
class ManagementNodeNotFoundHandlerCase extends SubCase {
+ DeadMessageManagerImpl mgr
+
@Override
void clean() {
}
@@ -33,7 +34,7 @@ class ManagementNodeNotFoundHandlerCase extends SubCase {
@Override
void environment() {
- DeadMessageManagerImpl mgr = bean(DeadMessageManagerImpl.class)
+ mgr = bean(DeadMessageManagerImpl.class)
mgr.managementNodeNotFoundHandlers.invalidateAll()
mgr.managementNodeNotFoundHandlers.cleanUp()
@@ -46,7 +47,7 @@ class ManagementNodeNotFoundHandlerCase extends SubCase {
ManagementNodeVO vo = new ManagementNodeVO(
hostName: "127.0.0.10",
// mock a future heartbeat
- heartBeat: new Date(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1)).toTimestamp(),
+ heartBeat: new Date(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(5)).toTimestamp(),
uuid: mgmtUuid,
port: 8989,
state: ManagementNodeState.RUNNING
@@ -74,6 +75,7 @@ class ManagementNodeNotFoundHandlerCase extends SubCase {
def secondNodeService = new AbstractService() {
@Override
void handleMessage(Message msg) {
+ logger.debug("received message[${msg.id}] on service[${getId()}]")
message = msg
}
@@ -95,17 +97,18 @@ class ManagementNodeNotFoundHandlerCase extends SubCase {
StartVmInstanceReply reply = new StartVmInstanceReply()
bus.makeServiceIdByManagementNodeId(reply, secondServiceName, secondNodeUuid)
- bus.send(reply)
+ FutureCompletion c = bus.send(reply)
assert message == null
bus.registerService(secondNodeService)
+ c.await()
+ assert !mgr.managementNodeNotFoundHandlers.asMap().values().message.contains(reply)
Closure cleanup = mockAManagementNode(secondNodeUuid)
- retryInSecs {
- assert message == null
- }
+ sleep(1000)
+ assert message == null
cleanup()
bus.unregisterService(secondNodeService)
@@ -120,6 +123,7 @@ class ManagementNodeNotFoundHandlerCase extends SubCase {
def secondNodeService = new AbstractService() {
@Override
void handleMessage(Message msg) {
+ logger.debug("received message[${msg.id}] on service[${getId()}]")
message = msg
}
@@ -141,17 +145,17 @@ class ManagementNodeNotFoundHandlerCase extends SubCase {
StartVmInstanceMsg msg = new StartVmInstanceMsg()
bus.makeServiceIdByManagementNodeId(msg, secondServiceName, secondNodeUuid)
- bus.send(msg)
+ FutureCompletion c = bus.send(msg)
assert message == null
bus.registerService(secondNodeService)
-
+ c.await()
+ assert !mgr.managementNodeNotFoundHandlers.asMap().values().message.contains(msg)
Closure cleanup = mockAManagementNode(secondNodeUuid)
- retryInSecs {
- assert message == null
- }
+ sleep(1000)
+ assert message == null
cleanup()
bus.unregisterService(secondNodeService)
@@ -166,6 +170,7 @@ class ManagementNodeNotFoundHandlerCase extends SubCase {
def secondNodeService = new AbstractService() {
@Override
void handleMessage(Message msg) {
+ logger.debug("received message[${msg.id}] on service[${getId()}]")
message = msg
}
@@ -190,12 +195,15 @@ class ManagementNodeNotFoundHandlerCase extends SubCase {
StartVmInstanceReply reply = new StartVmInstanceReply()
bus.makeServiceIdByManagementNodeId(reply, secondServiceName, secondNodeUuid)
- bus.send(reply)
+ FutureCompletion c = bus.send(reply)
assert message == null
bus.registerService(secondNodeService)
+ // send is a async operation, so we need to wait a bit
+ c.await()
+ assert mgr.managementNodeNotFoundHandlers.asMap().values().message.contains(reply)
Closure cleanup = mockAManagementNode(secondNodeUuid)
retryInSecs {
diff --git a/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/MessageSender.java b/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/MessageSender.java
new file mode 100644
index 00000000000..3877ecf0bce
--- /dev/null
+++ b/test/src/test/groovy/org/zstack/test/integration/core/cloudbus/MessageSender.java
@@ -0,0 +1,42 @@
+package org.zstack.test.integration.core.cloudbus;
+
+import org.zstack.core.Platform;
+import org.zstack.core.cloudbus.CloudBus;
+import org.zstack.core.thread.SyncTask;
+import org.zstack.core.thread.ThreadFacade;
+import org.zstack.header.core.FutureCompletion;
+import org.zstack.header.vm.StartVmInstanceMsg;
+
+public class MessageSender {
+
+ void sendMsg() {
+ CloudBus bus = Platform.getComponentLoader().getComponent(CloudBus.class);
+ ThreadFacade thread = Platform.getComponentLoader().getComponent(ThreadFacade.class);
+
+ thread.syncSubmit(new SyncTask() {
+ @Override
+ public Void call() throws Exception {
+ StartVmInstanceMsg msg = new StartVmInstanceMsg();
+ bus.makeLocalServiceId(msg, "testFutureCompletionSend");
+ FutureCompletion completion = bus.send(msg);
+ completion.await(5000L);
+ return null;
+ }
+
+ @Override
+ public String getName() {
+ return "MessageSender.sendMsg";
+ }
+
+ @Override
+ public int getSyncLevel() {
+ return 1;
+ }
+
+ @Override
+ public String getSyncSignature() {
+ return getName();
+ }
+ });
+ }
+}
\ No newline at end of file
diff --git a/test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy b/test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy
index 6684b602860..a6940fab90e 100644
--- a/test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy
+++ b/test/src/test/groovy/org/zstack/test/integration/portal/ManagementNodeHeartbeatCase.groovy
@@ -58,10 +58,10 @@ class ManagementNodeHeartbeatCase extends SubCase {
}
void testUnexpectedManagementNodeRecord() {
+ prepareInvalidRecords()
ManagementNodeGlobalConfig.NODE_HEARTBEAT_INTERVAL.updateValue(1)
PortalGlobalProperty.MAX_HEARTBEAT_FAILURE = 2
- prepareInvalidRecords()
int heartbeatFailureTimeout = ManagementNodeGlobalConfig.NODE_HEARTBEAT_INTERVAL.value(Integer.class) * PortalGlobalProperty.MAX_HEARTBEAT_FAILURE
int heartbeatUpdateDelay = 1 * ManagementNodeGlobalConfig.NODE_HEARTBEAT_INTERVAL.value(Integer.class)
@@ -80,7 +80,12 @@ class ManagementNodeHeartbeatCase extends SubCase {
assert count == 2
// wait one more interval to wait 127.0.0.111 cleaned
- sleep(TimeUnit.SECONDS.toMillis(failureInterval * 3))
+ // exceed 3s will be added in suspects
+ // next heartbeat will be clean
+ // but hb timestamp is second precision, so it will not be added until 4s.
+ // and cleaned after 5s
+ // so we need to wait more than 5s
+ sleep(TimeUnit.SECONDS.toMillis(failureInterval * 3) + 500)
count = dbf.count(ManagementNodeVO.class)
assert count == 1