From e3f0cf9f5856f1caf835351c1eadfad055d78466 Mon Sep 17 00:00:00 2001 From: Anmol Saxena Date: Wed, 25 Feb 2026 16:10:55 +0530 Subject: [PATCH 1/3] AMQP: added transport factory unit tests for mutex stripping and monitor wiring. --- ...AmqpTransportFactoryConfigurationTest.java | 146 ++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransportFactoryConfigurationTest.java diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransportFactoryConfigurationTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransportFactoryConfigurationTest.java new file mode 100644 index 0000000000..347e456192 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransportFactoryConfigurationTest.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import javax.net.SocketFactory; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.SSLSocketFactory; + +import org.apache.activemq.transport.MutexTransport; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.TransportFactory; +import org.apache.activemq.transport.TransportFilter; +import org.apache.activemq.transport.tcp.SslTransport; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(ParallelTest.class) +public class AmqpTransportFactoryConfigurationTest { + + @Test + public void testServerConfigureStripsMutexTransportForAllAmqpFactories() throws Exception { + assertServerConfigureStripsMutex(new AmqpTransportFactory()); + assertServerConfigureStripsMutex(new AmqpSslTransportFactory()); + assertServerConfigureStripsMutex(new AmqpNioTransportFactory()); + assertServerConfigureStripsMutex(new AmqpNioSslTransportFactory()); + } + + @Test + public void testCompositeConfigureAppliesAmqpAndWireFormatPropertiesForAllAmqpFactories() throws Exception { + assertCompositeConfigureAppliesProperties(new AmqpTransportFactory()); + assertCompositeConfigureAppliesProperties(new AmqpSslTransportFactory()); + assertCompositeConfigureAppliesProperties(new AmqpNioTransportFactory()); + assertCompositeConfigureAppliesProperties(new AmqpNioSslTransportFactory()); + } + + @Test + public void testAllAmqpFactoriesWireAmqpInactivityMonitor() throws Exception { + assertAmqpFactoriesWireAmqpInactivityMonitor(new AmqpTransportFactory()); + assertAmqpFactoriesWireAmqpInactivityMonitor(new AmqpSslTransportFactory()); + assertAmqpFactoriesWireAmqpInactivityMonitor(new AmqpNioTransportFactory()); + assertAmqpFactoriesWireAmqpInactivityMonitor(new AmqpNioSslTransportFactory()); + } + + private void assertServerConfigureStripsMutex(TransportFactory factory) throws Exception { + Transport configured = factory.serverConfigure( + createTransportForFactory(factory, new AmqpWireFormat()), + new AmqpWireFormat(), + new HashMap()); + + assertFalse("AMQP serverConfigure should strip the broker-side MutexTransport", + configured instanceof MutexTransport); + } + + private void assertCompositeConfigureAppliesProperties(TransportFactory factory) throws Exception { + Map options = new HashMap<>(); + options.put("transformer", "raw"); + options.put("producerCredit", "17"); + options.put("wireFormat.maxFrameSize", "4096"); + options.put("wireFormat.connectAttemptTimeout", "1234"); + + Transport configured = factory.compositeConfigure( + createTransportForFactory(factory, new AmqpWireFormat()), + new AmqpWireFormat(), + options); + + AmqpTransportFilter filter = findInChain(configured, AmqpTransportFilter.class); + assertNotNull("Expected AmqpTransportFilter in configured transport chain", filter); + assertEquals("raw", filter.getTransformer()); + assertEquals(17, filter.getProducerCredit()); + assertEquals(4096L, filter.getMaxFrameSize()); + assertEquals(1234, filter.getConnectAttemptTimeout()); + } + + private void assertAmqpFactoriesWireAmqpInactivityMonitor(TransportFactory factory) throws Exception { + Transport configured = factory.compositeConfigure( + createTransportForFactory(factory, new AmqpWireFormat()), + new AmqpWireFormat(), + new HashMap()); + + AmqpInactivityMonitor monitor = findInChain(configured, AmqpInactivityMonitor.class); + AmqpTransportFilter filter = findInChain(configured, AmqpTransportFilter.class); + + assertNotNull("Expected AmqpInactivityMonitor in configured transport chain", monitor); + assertNotNull("Expected AmqpTransportFilter in configured transport chain", filter); + assertTrue("Filter should report inactivity monitor as enabled", filter.isUseInactivityMonitor()); + assertSame("Factory should wire the same monitor instance into the AMQP filter", + monitor, filter.getInactivityMonitor()); + } + + private TcpTransport createTcpTransport(AmqpWireFormat wireFormat) throws Exception { + return new TcpTransport( + wireFormat, + SocketFactory.getDefault(), + new URI("tcp://localhost:61616"), + null); + } + + private TcpTransport createTransportForFactory(TransportFactory factory, AmqpWireFormat wireFormat) throws Exception { + if (factory instanceof AmqpSslTransportFactory || factory instanceof AmqpNioSslTransportFactory) { + SSLSocket socket = (SSLSocket) SSLSocketFactory.getDefault().createSocket(); + return new SslTransport(wireFormat, socket); + } + + return createTcpTransport(wireFormat); + } + + private T findInChain(Transport transport, Class type) { + Transport current = transport; + while (current != null) { + T found = current.narrow(type); + if (found != null) { + return found; + } + if (!(current instanceof TransportFilter)) { + return null; + } + current = ((TransportFilter) current).getNext(); + } + return null; + } +} From 4c5e8047dbeb49304f873d3eeae2acded000da66 Mon Sep 17 00:00:00 2001 From: Anmol Saxena Date: Wed, 25 Feb 2026 20:53:24 +0530 Subject: [PATCH 2/3] Added dynamic port and cleanup to avoid tests conflicts. --- ...AmqpTransportFactoryConfigurationTest.java | 51 ++++++++++++------- 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransportFactoryConfigurationTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransportFactoryConfigurationTest.java index 347e456192..3baa85cc46 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransportFactoryConfigurationTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransportFactoryConfigurationTest.java @@ -1,19 +1,3 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.apache.activemq.transport.amqp; import static org.junit.Assert.assertEquals; @@ -22,8 +6,12 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; +import java.net.Socket; +import java.net.ServerSocket; import java.net.URI; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import javax.net.SocketFactory; @@ -36,12 +24,26 @@ import org.apache.activemq.transport.TransportFilter; import org.apache.activemq.transport.tcp.SslTransport; import org.apache.activemq.transport.tcp.TcpTransport; +import org.junit.After; import org.junit.Test; import org.junit.experimental.categories.Category; @Category(ParallelTest.class) public class AmqpTransportFactoryConfigurationTest { + private final List createdTransports = new ArrayList<>(); + + @After + public void cleanup() throws Exception { + for (TcpTransport transport : createdTransports) { + Socket socket = transport.narrow(Socket.class); + if (socket != null && !socket.isClosed()) { + socket.close(); + } + } + createdTransports.clear(); + } + @Test public void testServerConfigureStripsMutexTransportForAllAmqpFactories() throws Exception { assertServerConfigureStripsMutex(new AmqpTransportFactory()); @@ -113,20 +115,31 @@ private void assertAmqpFactoriesWireAmqpInactivityMonitor(TransportFactory facto } private TcpTransport createTcpTransport(AmqpWireFormat wireFormat) throws Exception { + int dynamicPort = findAvailablePort(); return new TcpTransport( wireFormat, SocketFactory.getDefault(), - new URI("tcp://localhost:61616"), + new URI("tcp://localhost:" + dynamicPort), null); } + private int findAvailablePort() throws Exception { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } + private TcpTransport createTransportForFactory(TransportFactory factory, AmqpWireFormat wireFormat) throws Exception { + TcpTransport transport; if (factory instanceof AmqpSslTransportFactory || factory instanceof AmqpNioSslTransportFactory) { SSLSocket socket = (SSLSocket) SSLSocketFactory.getDefault().createSocket(); - return new SslTransport(wireFormat, socket); + transport = new SslTransport(wireFormat, socket); + } else { + transport = createTcpTransport(wireFormat); } - return createTcpTransport(wireFormat); + createdTransports.add(transport); + return transport; } private T findInChain(Transport transport, Class type) { From 9c9315f11b5d2bd8b7ee1e84d29cc02a1d8aa943 Mon Sep 17 00:00:00 2001 From: Anmol Saxena Date: Thu, 26 Feb 2026 02:09:20 +0530 Subject: [PATCH 3/3] Adding license to the file. --- .../AmqpTransportFactoryConfigurationTest.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransportFactoryConfigurationTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransportFactoryConfigurationTest.java index 3baa85cc46..9e0e3e9776 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransportFactoryConfigurationTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTransportFactoryConfigurationTest.java @@ -1,3 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.activemq.transport.amqp; import static org.junit.Assert.assertEquals;