diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index f038f34771d..9e771017445 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -1392,7 +1392,14 @@ private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info } @Override - public Response processBrokerInfo(BrokerInfo info) { + public Response processBrokerInfo(BrokerInfo info) throws IOException { + // We only expect to get at most one broker info command per connection + // Log and throw an IOException to close the connection if we receive more + // one because this is a protocol violation + if (this.brokerInfo != null) { + LOG.warn("Unexpected extra broker info command received: {}", info); + throw new IOException("Unexpected extra broker info command received from: " + info.getBrokerId()); + } if (info.isSlaveBroker()) { LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: {}", info.getBrokerName()); } else if (info.isNetworkConnection() && !info.isDuplexConnection()) { @@ -1464,10 +1471,6 @@ public Response processBrokerInfo(BrokerInfo info) { return null; } } - // We only expect to get one broker info command per connection - if (this.brokerInfo != null) { - LOG.warn("Unexpected extra broker info command received: {}", info); - } this.brokerInfo = info; networkConnection = true; List connectionStates = listConnectionStates(); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java index 646dd4f184d..19c95da29b4 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java @@ -30,6 +30,7 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerSubscriptionInfo; import org.apache.activemq.command.DiscoveryEvent; import org.apache.activemq.security.AuthenticationUser; @@ -152,6 +153,23 @@ public void testRestartSync() throws Exception { assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); } + @Test + public void testDuplicateBrokerInfo() throws Exception { + // Wait for connection and auth setup + doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(), + TimeUnit.SECONDS.toMillis(15)); + assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10)); + + // find the established bridge + DemandForwardingBridge bridge = (DemandForwardingBridge) localBroker.getNetworkConnectors().get(0).activeBridges().stream() + .findFirst().orElseThrow(); + + // send to one of the brokers (networked brokers will have already received a BrokerInfo) + // the duplicate will trigger the bridge connection to close + bridge.localBroker.oneway(new BrokerInfo()); + assertTrue(Wait.waitFor(bridge.localBroker::isDisposed,5000,10)); + } + protected void doSetUp(boolean deleteAllMessages, boolean startNetworkConnector, File localDataDir, File remoteDataDir, long waitForStart) throws Exception { doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0);