Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions conf/errorCodes/sys.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,10 @@
<id>1090</id>
<description>Multiple reasons</description>
</code>

<code>
<id>2111</id>
<description>CloudBus message sending related error</description>
</code>
</error>

12 changes: 9 additions & 3 deletions core/src/main/java/org/zstack/core/Platform.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
Expand Down Expand Up @@ -817,9 +818,14 @@ private static String getManagementServerIpInternal() {
for (NetworkInterface iface : Collections.list(nets)) {
String name = iface.getName();
if (defaultLine.contains(name)) {
InetAddress ia = iface.getInetAddresses().nextElement();
ip = ia.getHostAddress();
break;
for (InetAddress ia : Collections.list(iface.getInetAddresses())) {
ip = ia.getHostAddress();
if (ia instanceof Inet4Address) {
// we prefer IPv4 address
ip = ia.getHostAddress();
break;
}
}
Comment on lines +821 to +828
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

IPv4 不存在时的回退地址选择存在不稳定行为。

在 Line [822] 每次循环都会覆盖 ip。如果该网卡没有 IPv4,最终会返回“最后一个”地址,可能是不可用的链路本地 IPv6,导致管理地址选取不稳定。

🔧 建议修复
                 if (defaultLine.contains(name)) {
-                    for (InetAddress ia : Collections.list(iface.getInetAddresses())) {
-                        ip = ia.getHostAddress();
-                        if (ia instanceof Inet4Address) {
-                            // we prefer IPv4 address
-                            ip = ia.getHostAddress();
-                            break;
-                        }
-                    }
+                    String selectedIp = null;
+                    for (InetAddress ia : Collections.list(iface.getInetAddresses())) {
+                        String candidate = ia.getHostAddress();
+                        if (ia instanceof Inet4Address) {
+                            // we prefer IPv4 address
+                            selectedIp = candidate;
+                            break;
+                        }
+                        if (selectedIp == null && !ia.isLoopbackAddress() && !ia.isLinkLocalAddress()) {
+                            selectedIp = candidate;
+                        }
+                    }
+                    if (selectedIp != null) {
+                        ip = selectedIp;
+                        break;
+                    }
                 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@core/src/main/java/org/zstack/core/Platform.java` around lines 821 - 828, The
loop that sets ip for each InetAddress (using iface and ip variables with
InetAddress/Inet4Address) currently overwrites ip on every iteration and breaks
only when encountering an IPv4, which makes the fallback address unpredictable
when no IPv4 exists; change the logic so that you immediately break and use the
IPv4 address when found (ia instanceof Inet4Address), but otherwise only assign
a fallback ip once (e.g. first non-link-local address or first global-scope
address) and never overwrite that fallback on subsequent iterations—skip
link-local addresses when selecting the fallback so the chosen non-IPv4 address
is stable and usable if no IPv4 is present.

}
}
} catch (SocketException e) {
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/java/org/zstack/core/cloudbus/CloudBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.springframework.http.HttpEntity;
import org.zstack.header.Component;
import org.zstack.header.Service;
import org.zstack.header.core.FutureCompletion;
import org.zstack.header.errorcode.ErrorCode;
import org.zstack.header.exception.CloudConfigureFailException;
import org.zstack.header.message.*;
Expand All @@ -12,14 +13,18 @@
import java.util.function.Consumer;

public interface CloudBus extends Component {
void send(Message msg);

/**
* submit a message sending task into the queue.
* @return {@link FutureCompletion} which can be used to check whether the message was successfully sent or not.
*/
FutureCompletion send(Message msg);

<T extends Message> void send(List<T> msgs);

@Deprecated
void send(APIMessage msg, Consumer<APIEvent> consumer);

void send(NeedReplyMessage msg, CloudBusCallBack callback);
FutureCompletion send(NeedReplyMessage msg, CloudBusCallBack callback);

@Deprecated
void send(List<? extends NeedReplyMessage> msgs, CloudBusListCallBack callBack);
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl2.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.zstack.core.timeout.ApiTimeoutManager;
import org.zstack.header.Constants;
import org.zstack.header.Service;
import org.zstack.header.core.FutureCompletion;
import org.zstack.header.log.NoLogging;
import org.zstack.header.apimediator.APIIsReadyToGoMsg;
import org.zstack.header.apimediator.APIIsReadyToGoReply;
Expand Down Expand Up @@ -106,6 +107,11 @@ public class CloudBusImpl2 implements CloudBus, CloudBusIN, ManagementNodeChange
private final String AMQP_PROPERTY_HEADER__COMPRESSED = "compressed";

private String SERVICE_ID = makeLocalServiceId("cloudbus");
public static final FutureCompletion SEND_CONFIRMED = new FutureCompletion(null);

static {
SEND_CONFIRMED.success();
}

public void setDEFAULT_MESSAGE_TIMEOUT(long timeout) {
this.DEFAULT_MESSAGE_TIMEOUT = timeout;
Expand Down Expand Up @@ -1323,8 +1329,9 @@ private void send(Message msg, Boolean noNeedReply) {
}

@Override
public void send(Message msg) {
public FutureCompletion send(Message msg) {
send(msg, true);
return SEND_CONFIRMED;
}

@Override
Expand Down Expand Up @@ -1354,7 +1361,7 @@ private void evaluateMessageTimeout(NeedReplyMessage msg) {
}

@Override
public void send(final NeedReplyMessage msg, final CloudBusCallBack callback) {
public FutureCompletion send(final NeedReplyMessage msg, final CloudBusCallBack callback) {
evaluateMessageTimeout(msg);

Envelope e = new Envelope() {
Expand Down Expand Up @@ -1405,6 +1412,7 @@ List<Message> getRequests() {
envelopes.put(msg.getId(), e);

send(msg, false);
return SEND_CONFIRMED;
}

private MessageReply createTimeoutReply(NeedReplyMessage m) {
Expand Down
86 changes: 54 additions & 32 deletions core/src/main/java/org/zstack/core/cloudbus/CloudBusImpl3.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import static org.zstack.core.Platform.*;
import static org.zstack.core.cloudbus.CloudBusGlobalProperty.SYNC_CALL_TIMEOUT;
import static org.zstack.header.errorcode.SysErrors.CLOUD_BUS_SEND_ERROR;
import static org.zstack.utils.BeanUtils.getProperty;
import static org.zstack.utils.BeanUtils.setProperty;

Expand Down Expand Up @@ -101,6 +102,7 @@ public class CloudBusImpl3 implements CloudBus, CloudBusIN {
private final static TimeoutRestTemplate http = RESTFacade.createRestTemplate(CoreGlobalProperty.REST_FACADE_READ_TIMEOUT, CoreGlobalProperty.REST_FACADE_CONNECT_TIMEOUT);

public static final String HTTP_BASE_URL = "/cloudbus";
public static final FutureCompletion SEND_CONFIRMED = new FutureCompletion(null);

{
if (CloudBusGlobalProperty.MESSAGE_LOG != null) {
Expand All @@ -114,6 +116,8 @@ public class CloudBusImpl3 implements CloudBus, CloudBusIN {
CloudBusGlobalProperty.HTTP_CONTEXT_PATH = "";
CloudBusGlobalProperty.HTTP_PORT = 8989;
}

SEND_CONFIRMED.success();
}

public static String getManagementNodeUUIDFromServiceID(String serviceID) {
Expand All @@ -125,6 +129,12 @@ public static String getManagementNodeUUIDFromServiceID(String serviceID) {
return ss[0];
}

private FutureCompletion sendFail(ErrorCode errorCode) {
FutureCompletion c = new FutureCompletion(null);
c.fail(errorCode);
return c;
}

private abstract class Envelope {
long startTime;

Expand Down Expand Up @@ -250,8 +260,8 @@ public void deActiveService(String id) {
}

@Override
public void send(Message msg) {
send(msg, true);
public FutureCompletion send(Message msg) {
return send(msg, true);
}

@Override
Expand Down Expand Up @@ -291,11 +301,11 @@ private MessageReply createTimeoutReply(NeedReplyMessage m) {
}

@Override
public void send(NeedReplyMessage msg, CloudBusCallBack callback) {
public FutureCompletion send(NeedReplyMessage msg, CloudBusCallBack callback) {
evaluateMessageTimeout(msg);
if (msg.getTimeout() <= 1) {
callback.run(createTimeoutReply(msg));
return;
return SEND_CONFIRMED;
}

Envelope e = new Envelope() {
Expand Down Expand Up @@ -346,7 +356,7 @@ public void timeout() {

envelopes.put(msg.getId(), e);
msgExts.forEach(m -> m.afterAddEnvelopes(msg.getId()));
send(msg, false);
return send(msg, false);
}

@Override
Expand Down Expand Up @@ -454,7 +464,7 @@ public void reply(Message request, MessageReply reply) {
callReplyPreSendingExtensions(reply, (NeedReplyMessage) request);
} catch (Exception e) {
logger.error("failed to call pre-sending reply extension:", e);
reply.setError(operr(e.getMessage()));
reply.setError(err(CLOUD_BUS_SEND_ERROR, e.getMessage()));
}
}

Expand Down Expand Up @@ -526,29 +536,36 @@ public MessageSender(Message msg) {
localSend = !CloudBusGlobalProperty.HTTP_ALWAYS && managementNodeId.equals(Platform.getManagementServerId());
}

void send() {
FutureCompletion send() {
try {
doSend();
return doSend();
} catch (Throwable th) {
replyErrorIfNeeded(operr(th.getMessage()));
ErrorCode err = err(CLOUD_BUS_SEND_ERROR, th.getMessage());
replyErrorIfNeeded(err);

FutureCompletion c = new FutureCompletion(null);
c.fail(err);
return c;
}
}

private void doSend() {
private FutureCompletion doSend() {
if (msg instanceof Event) {
eventSend();
return;
return SEND_CONFIRMED;
}

if (localSend) {
localSend();
return SEND_CONFIRMED;
} else {
httpSend();
return httpSend();
}
}

private void httpSendInQueue(String ip) {
thdf.chainSubmit(new ChainTask(null) {
private FutureCompletion httpSendInQueue(String ip) {
FutureCompletion sendCompletion = new FutureCompletion(null);
thdf.chainSubmit(new ChainTask(sendCompletion) {
@Override
public String getSyncSignature() {
return "http-send-in-queue";
Expand All @@ -557,6 +574,7 @@ public String getSyncSignature() {
@Override
public void run(SyncTaskChain chain) {
httpSend(ip);
sendCompletion.success();
chain.next();
}

Expand All @@ -570,25 +588,29 @@ public String getName() {
return getSyncSignature();
}
});
return sendCompletion;
}

private void httpSend() {
private FutureCompletion httpSend() {
buildSchema(msg);
String ip;
try {
String ip = destMaker.getNodeInfo(managementNodeId).getNodeIP();
httpSendInQueue(ip);
ip = destMaker.getNodeInfo(managementNodeId).getNodeIP();
} catch (ManagementNodeNotFoundException e) {
if (msg instanceof MessageReply) {
if (!deadMessageManager.handleManagementNodeNotFoundError(managementNodeId, msg, () -> {
String ip = destMaker.getNodeInfo(managementNodeId).getNodeIP();
httpSendInQueue(ip);
})) {
throw e;
}
boolean errorHandled = msg instanceof MessageReply &&
deadMessageManager.handleManagementNodeNotFoundError(managementNodeId, msg, () -> {
String otherIp = destMaker.getNodeInfo(managementNodeId).getNodeIP();
logger.warn(String.format("resend the message[id:%s] to node[ip:%s]", msg.getId(), otherIp));
httpSendInQueue(otherIp);
});
if (errorHandled) {
return SEND_CONFIRMED;
} else {
throw e;
}
}

return httpSendInQueue(ip);
}

private void httpSend(String ip) {
Expand All @@ -612,12 +634,12 @@ protected ResponseEntity<String> call() {
}.run();

if (!rsp.getStatusCode().is2xxSuccessful()) {
replyErrorIfNeeded(operr("HTTP ERROR, status code: %s, body: %s", rsp.getStatusCode(), rsp.getBody()));
replyErrorIfNeeded(err(CLOUD_BUS_SEND_ERROR, "HTTP ERROR, status code: %s, body: %s", rsp.getStatusCode(), rsp.getBody()));
}
} catch (OperationFailureException e) {
replyErrorIfNeeded(e.getErrorCode());
} catch (Throwable e) {
replyErrorIfNeeded(operr(e.getMessage()));
replyErrorIfNeeded(err(CLOUD_BUS_SEND_ERROR, e.getMessage()));
}
}

Expand Down Expand Up @@ -1194,7 +1216,7 @@ private void evalThreadContextToMessage(Message msg) {
}
}

private void doSendAndCallExtensions(Message msg) {
private FutureCompletion doSendAndCallExtensions(Message msg) {
// for unit test finding invocation chain
MessageCommandRecorder.record(msg.getClass());

Expand All @@ -1209,20 +1231,20 @@ private void doSendAndCallExtensions(Message msg) {
interceptor.beforeSendMessage(msg);
}

doSend(msg);
return doSend(msg);
}

private void doSend(Message msg) {
private FutureCompletion doSend(Message msg) {
evalThreadContextToMessage(msg);

if (logger.isTraceEnabled() && islogMessage(msg)) {
logger.trace(String.format("[msg send]: %s", dumpMessage(msg)));
}

new MessageSender(msg).send();
return new MessageSender(msg).send();
}

private void send(Message msg, Boolean noNeedReply) {
private FutureCompletion send(Message msg, Boolean noNeedReply) {
if (msg.getServiceId() == null) {
throw new IllegalArgumentException(String.format("service id cannot be null: %s", msg.getClass().getName()));
}
Expand All @@ -1238,7 +1260,7 @@ private void send(Message msg, Boolean noNeedReply) {
msg.putHeaderEntry(NO_NEED_REPLY_MSG, noNeedReply.toString());
}

doSendAndCallExtensions(msg);
return doSendAndCallExtensions(msg);
}

private void restoreFromSchema(Message msg, Map raw) throws ClassNotFoundException {
Expand Down
Loading