-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Use destination statistics instead of global statistics to avoid side… #1668
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -22,7 +22,9 @@ | |||||||
| import jakarta.jms.MessageConsumer; | ||||||||
|
|
||||||||
| import org.apache.activemq.JmsMultipleBrokersTestSupport; | ||||||||
| import org.apache.activemq.advisory.AdvisorySupport; | ||||||||
| import org.apache.activemq.broker.BrokerService; | ||||||||
| import org.apache.activemq.broker.region.Destination; | ||||||||
| import org.apache.activemq.broker.region.DestinationInterceptor; | ||||||||
| import org.apache.activemq.broker.region.RegionBroker; | ||||||||
| import org.apache.activemq.broker.region.virtual.CompositeTopic; | ||||||||
|
|
@@ -113,7 +115,7 @@ public void testAdvisoryPrefetchSize() throws Exception { | |||||||
| assertEquals(1, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize()); | ||||||||
|
|
||||||||
| //both advisory messages are not acked yet because of optimized acks | ||||||||
| assertDeqInflight(0, 2); | ||||||||
| assertDeqInflight(0, 2, topic1, new ActiveMQTopic("A.FOO2")); | ||||||||
| } | ||||||||
|
|
||||||||
| /** | ||||||||
|
|
@@ -149,7 +151,7 @@ public void testAdvisoryPrefetchSize1() throws Exception { | |||||||
| assertEquals(1, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize()); | ||||||||
| assertEquals(10, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize()); | ||||||||
|
|
||||||||
| assertDeqInflight(2, 0); | ||||||||
| assertDeqInflight(2, 0, topic1, new ActiveMQTopic("A.FOO2")); | ||||||||
| } | ||||||||
|
|
||||||||
| /** | ||||||||
|
|
@@ -184,7 +186,7 @@ public void testAdvisoryPrefetchSizeNotSet() throws Exception { | |||||||
| assertEquals(10, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize()); | ||||||||
| assertEquals(10, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize()); | ||||||||
|
|
||||||||
| assertDeqInflight(0, 2); | ||||||||
| assertDeqInflight(0, 2, topic1, new ActiveMQTopic("A.FOO2")); | ||||||||
| } | ||||||||
|
|
||||||||
| /** | ||||||||
|
|
@@ -218,7 +220,7 @@ public void testPrefetchSize1() throws Exception { | |||||||
| assertEquals(1, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize()); | ||||||||
| assertEquals(1, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize()); | ||||||||
|
|
||||||||
| assertDeqInflight(2, 0); | ||||||||
| assertDeqInflight(2, 0, topic1, new ActiveMQTopic("A.FOO2")); | ||||||||
| } | ||||||||
|
|
||||||||
| /** | ||||||||
|
|
@@ -243,7 +245,7 @@ public void testAdvisoryPrefetchSizePercent() throws Exception { | |||||||
| createConsumer("A", new ActiveMQTopic("A.FOO")); | ||||||||
| } | ||||||||
|
|
||||||||
| assertDeqInflight(7, 3); | ||||||||
| assertDeqInflight(7, 3, new ActiveMQTopic("A.FOO")); | ||||||||
| } | ||||||||
|
|
||||||||
| /** | ||||||||
|
|
@@ -270,18 +272,27 @@ public void testPrefetchSizePercent() throws Exception { | |||||||
| createConsumer("A", new ActiveMQTopic("A.FOO")); | ||||||||
| } | ||||||||
|
|
||||||||
| assertDeqInflight(7, 3); | ||||||||
| assertDeqInflight(7, 3, new ActiveMQTopic("A.FOO")); | ||||||||
| } | ||||||||
|
|
||||||||
| private void assertDeqInflight(final int dequeue, final int inflight) throws Exception { | ||||||||
| private void assertDeqInflight(final int dequeue, final int inflight, | ||||||||
| final ActiveMQTopic... topics) throws Exception { | ||||||||
| assertTrue("deq and inflight as expected", Wait.waitFor(new Wait.Condition() { | ||||||||
| @Override | ||||||||
| public boolean isSatisified() throws Exception { | ||||||||
| RegionBroker regionBroker = (RegionBroker) brokers.get("A").broker.getRegionBroker(); | ||||||||
| LOG.info("A Deq:" + regionBroker.getDestinationStatistics().getDequeues().getCount()); | ||||||||
| LOG.info("A Inflight:" + regionBroker.getDestinationStatistics().getInflight().getCount()); | ||||||||
| return regionBroker.getDestinationStatistics().getDequeues().getCount() == dequeue | ||||||||
| && regionBroker.getDestinationStatistics().getInflight().getCount() == inflight; | ||||||||
| long actualDeq = 0; | ||||||||
| long actualInflight = 0; | ||||||||
| for (ActiveMQTopic topic : topics) { | ||||||||
| ActiveMQTopic advisory = AdvisorySupport.getConsumerAdvisoryTopic(topic); | ||||||||
| Destination destination = brokers.get("A").broker.getDestination(advisory); | ||||||||
|
||||||||
| Destination destination = brokers.get("A").broker.getDestination(advisory); | |
| RegionBroker regionBroker = (RegionBroker) brokers.get("A").broker.getRegionBroker(); | |
| Destination destination = regionBroker.getDestinationMap().get(advisory); |
Uh oh!
There was an error while loading. Please reload this page.