diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java index cc0a814b2f..091957a075 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AdvisoryViaNetworkTest.java @@ -22,7 +22,9 @@ import jakarta.jms.MessageConsumer; import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.virtual.CompositeTopic; @@ -113,7 +115,7 @@ public void testAdvisoryPrefetchSize() throws Exception { assertEquals(1, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize()); //both advisory messages are not acked yet because of optimized acks - assertDeqInflight(0, 2); + assertDeqInflight(0, 2, topic1, new ActiveMQTopic("A.FOO2")); } /** @@ -149,7 +151,7 @@ public void testAdvisoryPrefetchSize1() throws Exception { assertEquals(1, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize()); assertEquals(10, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize()); - assertDeqInflight(2, 0); + assertDeqInflight(2, 0, topic1, new ActiveMQTopic("A.FOO2")); } /** @@ -184,7 +186,7 @@ public void testAdvisoryPrefetchSizeNotSet() throws Exception { assertEquals(10, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize()); assertEquals(10, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize()); - assertDeqInflight(0, 2); + assertDeqInflight(0, 2, topic1, new ActiveMQTopic("A.FOO2")); } /** @@ -218,7 +220,7 @@ public void testPrefetchSize1() throws Exception { assertEquals(1, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize()); assertEquals(1, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize()); - assertDeqInflight(2, 0); + assertDeqInflight(2, 0, topic1, new ActiveMQTopic("A.FOO2")); } /** @@ -243,7 +245,7 @@ public void testAdvisoryPrefetchSizePercent() throws Exception { createConsumer("A", new ActiveMQTopic("A.FOO")); } - assertDeqInflight(7, 3); + assertDeqInflight(7, 3, new ActiveMQTopic("A.FOO")); } /** @@ -270,18 +272,27 @@ public void testPrefetchSizePercent() throws Exception { createConsumer("A", new ActiveMQTopic("A.FOO")); } - assertDeqInflight(7, 3); + assertDeqInflight(7, 3, new ActiveMQTopic("A.FOO")); } - private void assertDeqInflight(final int dequeue, final int inflight) throws Exception { + private void assertDeqInflight(final int dequeue, final int inflight, + final ActiveMQTopic... topics) throws Exception { assertTrue("deq and inflight as expected", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - RegionBroker regionBroker = (RegionBroker) brokers.get("A").broker.getRegionBroker(); - LOG.info("A Deq:" + regionBroker.getDestinationStatistics().getDequeues().getCount()); - LOG.info("A Inflight:" + regionBroker.getDestinationStatistics().getInflight().getCount()); - return regionBroker.getDestinationStatistics().getDequeues().getCount() == dequeue - && regionBroker.getDestinationStatistics().getInflight().getCount() == inflight; + long actualDeq = 0; + long actualInflight = 0; + for (ActiveMQTopic topic : topics) { + ActiveMQTopic advisory = AdvisorySupport.getConsumerAdvisoryTopic(topic); + Destination destination = brokers.get("A").broker.getDestination(advisory); + if (destination != null) { + actualDeq += destination.getDestinationStatistics().getDequeues().getCount(); + actualInflight += destination.getDestinationStatistics().getInflight().getCount(); + } + } + LOG.info("A Deq:" + actualDeq); + LOG.info("A Inflight:" + actualInflight); + return actualDeq == dequeue && actualInflight == inflight; } })); }