Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import jakarta.jms.MessageConsumer;

import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.virtual.CompositeTopic;
Expand Down Expand Up @@ -113,7 +115,7 @@ public void testAdvisoryPrefetchSize() throws Exception {
assertEquals(1, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());

//both advisory messages are not acked yet because of optimized acks
assertDeqInflight(0, 2);
assertDeqInflight(0, 2, topic1, new ActiveMQTopic("A.FOO2"));
}

/**
Expand Down Expand Up @@ -149,7 +151,7 @@ public void testAdvisoryPrefetchSize1() throws Exception {
assertEquals(1, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
assertEquals(10, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());

assertDeqInflight(2, 0);
assertDeqInflight(2, 0, topic1, new ActiveMQTopic("A.FOO2"));
}

/**
Expand Down Expand Up @@ -184,7 +186,7 @@ public void testAdvisoryPrefetchSizeNotSet() throws Exception {
assertEquals(10, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
assertEquals(10, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());

assertDeqInflight(0, 2);
assertDeqInflight(0, 2, topic1, new ActiveMQTopic("A.FOO2"));
}

/**
Expand Down Expand Up @@ -218,7 +220,7 @@ public void testPrefetchSize1() throws Exception {
assertEquals(1, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
assertEquals(1, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());

assertDeqInflight(2, 0);
assertDeqInflight(2, 0, topic1, new ActiveMQTopic("A.FOO2"));
}

/**
Expand All @@ -243,7 +245,7 @@ public void testAdvisoryPrefetchSizePercent() throws Exception {
createConsumer("A", new ActiveMQTopic("A.FOO"));
}

assertDeqInflight(7, 3);
assertDeqInflight(7, 3, new ActiveMQTopic("A.FOO"));
}

/**
Expand All @@ -270,18 +272,27 @@ public void testPrefetchSizePercent() throws Exception {
createConsumer("A", new ActiveMQTopic("A.FOO"));
}

assertDeqInflight(7, 3);
assertDeqInflight(7, 3, new ActiveMQTopic("A.FOO"));
}

private void assertDeqInflight(final int dequeue, final int inflight) throws Exception {
private void assertDeqInflight(final int dequeue, final int inflight,
final ActiveMQTopic... topics) throws Exception {
assertTrue("deq and inflight as expected", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
RegionBroker regionBroker = (RegionBroker) brokers.get("A").broker.getRegionBroker();
LOG.info("A Deq:" + regionBroker.getDestinationStatistics().getDequeues().getCount());
LOG.info("A Inflight:" + regionBroker.getDestinationStatistics().getInflight().getCount());
return regionBroker.getDestinationStatistics().getDequeues().getCount() == dequeue
&& regionBroker.getDestinationStatistics().getInflight().getCount() == inflight;
long actualDeq = 0;
long actualInflight = 0;
for (ActiveMQTopic topic : topics) {
ActiveMQTopic advisory = AdvisorySupport.getConsumerAdvisoryTopic(topic);
Destination destination = brokers.get("A").broker.getDestination(advisory);
Copy link

Copilot AI Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BrokerService#getDestination(...) lazily creates the destination (it calls addDestination), so this assertion helper can still mutate broker state while waiting. To avoid side effects in the assert, lookup the advisory destination via the region broker’s existing destination map (e.g., ((RegionBroker) broker.getRegionBroker()).getDestinationMap().get(advisory)) rather than calling broker.getDestination(advisory).

Suggested change
Destination destination = brokers.get("A").broker.getDestination(advisory);
RegionBroker regionBroker = (RegionBroker) brokers.get("A").broker.getRegionBroker();
Destination destination = regionBroker.getDestinationMap().get(advisory);

Copilot uses AI. Check for mistakes.
if (destination != null) {
actualDeq += destination.getDestinationStatistics().getDequeues().getCount();
actualInflight += destination.getDestinationStatistics().getInflight().getCount();
}
}
LOG.info("A Deq:" + actualDeq);
LOG.info("A Inflight:" + actualInflight);
return actualDeq == dequeue && actualInflight == inflight;
}
}));
}
Expand Down
Loading