From 084f0025d1fd4f8b3fcc7735a7e0319f7cc0d7ce Mon Sep 17 00:00:00 2001 From: igorjava2025 <-> Date: Sun, 8 Mar 2026 22:12:57 +0300 Subject: [PATCH 1/4] Add HARD_STRICT mode See details there https://github.com/sonus21/rqueue/issues/276 (cherry picked from commit acc09bf4cbac84694e7217920d6d51869712adf9) --- .../SimpleRqueueListenerContainerFactory.java | 14 ++ .../listener/HardStrictPriorityPoller.java | 206 ++++++++++++++++++ .../HardStrictPriorityPollerProperties.java | 24 ++ .../RqueueMessageListenerContainer.java | 24 ++ .../rqueue/models/enums/PriorityMode.java | 3 +- .../HardStrictPriorityPollerTest.java | 178 +++++++++++++++ 6 files changed, 448 insertions(+), 1 deletion(-) create mode 100644 rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java create mode 100644 rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java create mode 100644 rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java index 5da2e641..ac7a4e90 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/config/SimpleRqueueListenerContainerFactory.java @@ -27,6 +27,7 @@ import com.github.sonus21.rqueue.core.impl.RqueueMessageTemplateImpl; import com.github.sonus21.rqueue.core.middleware.Middleware; import com.github.sonus21.rqueue.core.support.MessageProcessor; +import com.github.sonus21.rqueue.listener.HardStrictPriorityPollerProperties; import com.github.sonus21.rqueue.listener.RqueueMessageHandler; import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; import com.github.sonus21.rqueue.models.enums.PriorityMode; @@ -92,6 +93,8 @@ public class SimpleRqueueListenerContainerFactory { // Set priority mode for the pollers private PriorityMode priorityMode = PriorityMode.WEIGHTED; + // Set HardStrictPriorityPollerProperties for HARD_STRICT priority mode poller + private HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties; /** * Whether all beans of spring application should be inspected to find methods annotated with @@ -348,6 +351,9 @@ public RqueueMessageListenerContainer createMessageListenerContainer() { if (messageHeaders != null) { messageListenerContainer.setMessageHeaders(messageHeaders); } + if (hardStrictPriorityPollerProperties != null) { + messageListenerContainer.setHardStrictPriorityPollerProperties(hardStrictPriorityPollerProperties); + } return messageListenerContainer; } @@ -486,6 +492,14 @@ public void setMessageHeaders(MessageHeaders messageHeaders) { this.messageHeaders = messageHeaders; } + public HardStrictPriorityPollerProperties getHardStrictPriorityPollerProperties() { + return this.hardStrictPriorityPollerProperties; + } + + public void setHardStrictPriorityPollerProperties(HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) { + this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties; + } + /** * Rqueue scans all beans to find method annotated with {@link RqueueListener}. * diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java new file mode 100644 index 00000000..3d824bc3 --- /dev/null +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java @@ -0,0 +1,206 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.listener; + +import com.github.sonus21.rqueue.core.RqueueBeanProvider; +import com.github.sonus21.rqueue.core.middleware.Middleware; +import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr; +import com.github.sonus21.rqueue.utils.Constants; +import com.github.sonus21.rqueue.utils.QueueThreadPool; +import com.github.sonus21.rqueue.utils.TimeoutUtils; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.slf4j.event.Level; +import org.springframework.messaging.MessageHeaders; + +/** + * Use it only with priority queues. + * Message processing can be slow. + * The hard strict priority algorithm is better in HardStrictPriorityPoller than in StrictPriorityPoller + * More details see in GitHub project issue + */ +class HardStrictPriorityPoller extends RqueueMessagePoller { + + private final RqueueBeanProvider rqueueBeanProvider; + + private final Map queueNameToDetail; + private final Map queueNameToThread; + private final Map queueDeactivationTime = new HashMap<>(); + private final HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties; + + HardStrictPriorityPoller( + String groupName, + final List queueDetails, + final Map queueNameToThread, + RqueueBeanProvider rqueueBeanProvider, + QueueStateMgr queueStateMgr, + List middlewares, + long pollingInterval, + long backoffTime, + PostProcessingHandler postProcessingHandler, + MessageHeaders messageHeaders, + HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) { + super( + "HardStrict-" + groupName, + rqueueBeanProvider, + queueStateMgr, + middlewares, + pollingInterval, + backoffTime, + postProcessingHandler, + messageHeaders); + + this.rqueueBeanProvider = rqueueBeanProvider; + // Sort queues by priority once during initialization + List queueDetailList = new ArrayList<>(queueDetails); + queueDetailList.sort( + (o1, o2) -> + o2.getPriority().get(Constants.DEFAULT_PRIORITY_KEY) + - o1.getPriority().get(Constants.DEFAULT_PRIORITY_KEY)); + + this.queues = queueDetailList.stream().map(QueueDetail::getName).collect(Collectors.toList()); + this.queueNameToDetail = + queueDetailList.stream() + .collect(Collectors.toMap(QueueDetail::getName, Function.identity())); + this.queueNameToThread = queueNameToThread; + this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties != null + ? hardStrictPriorityPollerProperties + : new HardStrictPriorityPollerProperties(); + } + + @Override + public void start() { + log(Level.DEBUG, "Running, Ordered Queues: {}", null, queues); + while (true) { + if (shouldExit()) { + return; + } + + boolean messageFoundInAnyQueue = false; + + try { + for (String queue : queues) { + if (eligibleForPolling(queue) && !isDeactivated(queue)) { + QueueThreadPool queueThreadPool = queueNameToThread.get(queue); + QueueDetail queueDetail = queueNameToDetail.get(queue); + poll(-1, queue, queueDetail, queueThreadPool); + + if (hardStrictPriorityPollerProperties.getAfterPollSleepInterval() != null) { + TimeoutUtils.sleepLog(hardStrictPriorityPollerProperties.getAfterPollSleepInterval(), false); + } + + if (existMessagesInCurrentQueueOrHigherPriorityQueue(queue, queues)) { + // break current cycle and start new cycle + // it allow to process queue with the higher priority + messageFoundInAnyQueue = true; + break; + } + } + } + + // If no messages were found across all queues, sleep for the polling interval + if (!messageFoundInAnyQueue) { + TimeoutUtils.sleepLog(pollingInterval, false); + } + + } catch (Throwable e) { + log(Level.ERROR, "Exception in the poller {}", e, e.getMessage()); + if (shouldExit()) { + return; + } + TimeoutUtils.sleepLog(backoffTime, false); + } + } + } + + boolean existMessagesInCurrentQueueOrHigherPriorityQueue(String currentQueue, List queues) { + for (String queue : queues) { + if (eligibleForPolling(queue) && !isDeactivated(queue)) { + QueueDetail queueDetail = queueNameToDetail.get(queue); + if (existAvailableMessagesForPoll(queueDetail)) { + // the current or higher priority queue contains messages that need to be processed. + return true; + } + } + // we check all queues from the highest priority to current queue + if (queue.equals(currentQueue)) { + return false; + } + } + // unexpected behavior, need more details if it occurs + log(Level.WARN, "current queue '{}' not found in queues list '{}'", null, currentQueue, queues); + return false; + } + + protected boolean existAvailableMessagesForPoll(QueueDetail queueDetail) { + List readyMessages = rqueueBeanProvider + .getRqueueMessageTemplate() + .readFromList(queueDetail.getQueueName(), 0, 0); + + if (readyMessages != null && !readyMessages.isEmpty()) { + log(Level.TRACE, "readyMessages exists for queue '{}', existAvailableMessagesForPoll = true.", null, queueDetail.getName()); + return true; + } + + // Only check delayed messages with score <= current time + long currentTime = System.currentTimeMillis(); + List delayedMessages = rqueueBeanProvider + .getRqueueMessageTemplate() + .readFromZsetWithScore(queueDetail.getScheduledQueueName(), 0, currentTime); + + if (delayedMessages != null && !delayedMessages.isEmpty()) { + log(Level.TRACE, "delayedMessages exists for queue '{}', existAvailableMessagesForPoll = true.", null, queueDetail.getName()); + return true; + } + + return false; + } + + private boolean isDeactivated(String queue) { + Long deactivationTime = queueDeactivationTime.get(queue); + if (deactivationTime == null) { + return false; + } + if (System.currentTimeMillis() - deactivationTime > pollingInterval) { + queueDeactivationTime.remove(queue); + return false; + } + return true; + } + + @Override + long getSemaphoreWaitTime() { + return hardStrictPriorityPollerProperties.getSemaphoreWaitTime() != null + ? hardStrictPriorityPollerProperties.getSemaphoreWaitTime() + : 20L; + } + + @Override + void deactivate(int index, String queue, DeactivateType deactivateType) { + if (deactivateType == DeactivateType.POLL_FAILED) { + // Pause in case of connection errors or polling failures + TimeoutUtils.sleepLog(backoffTime, false); + } else { + // Mark deactivation time if the queue is empty + queueDeactivationTime.put(queue, System.currentTimeMillis()); + } + } +} diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java new file mode 100644 index 00000000..99ca44b9 --- /dev/null +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java @@ -0,0 +1,24 @@ +package com.github.sonus21.rqueue.listener; + +public class HardStrictPriorityPollerProperties { + // set such default values for parameters "afterPollSleepInterval" and "semaphoreWaitTime" + // because local load tests have correct strict priority algorithm work and good performance with them + private Long afterPollSleepInterval = 30L; + private Long semaphoreWaitTime = 15L; + + public Long getAfterPollSleepInterval() { + return afterPollSleepInterval; + } + + public void setAfterPollSleepInterval(Long afterPollSleepInterval) { + this.afterPollSleepInterval = afterPollSleepInterval; + } + + public Long getSemaphoreWaitTime() { + return this.semaphoreWaitTime; + } + + public void setSemaphoreWaitTime(Long semaphoreWaitTime) { + this.semaphoreWaitTime = semaphoreWaitTime; + } +} diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java index 440a836a..36b8dc73 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/RqueueMessageListenerContainer.java @@ -106,6 +106,7 @@ public class RqueueMessageListenerContainer private int phase = Integer.MAX_VALUE; private PriorityMode priorityMode; private MessageHeaders messageHeaders; + private HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties; public RqueueMessageListenerContainer( RqueueMessageHandler rqueueMessageHandler, RqueueMessageTemplate rqueueMessageTemplate) { @@ -515,6 +516,21 @@ protected void startGroup(String groupName, List queueDetails) { backOffTime, postProcessingHandler, getMessageHeaders())); + } else if (getPriorityMode() == PriorityMode.HARD_STRICT) { + future = + taskExecutor.submit( + new HardStrictPriorityPoller( + StringUtils.groupName(groupName), + queueDetails, + queueThread, + rqueueBeanProvider, + queueStateMgr, + getMiddleWares(), + pollingInterval, + backOffTime, + postProcessingHandler, + getMessageHeaders(), + getHardStrictPriorityPollerProperties())); } else { future = taskExecutor.submit( @@ -712,6 +728,14 @@ public void setMessageHeaders(MessageHeaders messageHeaders) { this.messageHeaders = messageHeaders; } + public HardStrictPriorityPollerProperties getHardStrictPriorityPollerProperties() { + return this.hardStrictPriorityPollerProperties; + } + + public void setHardStrictPriorityPollerProperties(HardStrictPriorityPollerProperties hardStrictPriorityPollerProperties) { + this.hardStrictPriorityPollerProperties = hardStrictPriorityPollerProperties; + } + class QueueStateMgr { Set pausedQueues = ConcurrentHashMap.newKeySet(); diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java index 0b6e556a..0fa83405 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/enums/PriorityMode.java @@ -18,5 +18,6 @@ public enum PriorityMode { STRICT, - WEIGHTED + WEIGHTED, + HARD_STRICT } diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java new file mode 100644 index 00000000..9315c2c9 --- /dev/null +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java @@ -0,0 +1,178 @@ +package com.github.sonus21.rqueue.listener; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import com.github.sonus21.TestBase; +import com.github.sonus21.rqueue.CoreUnitTest; +import com.github.sonus21.rqueue.core.RqueueBeanProvider; +import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer.QueueStateMgr; +import com.github.sonus21.rqueue.utils.Constants; +import com.github.sonus21.rqueue.utils.QueueThreadPool; +import com.github.sonus21.rqueue.utils.TimeoutUtils; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.springframework.messaging.MessageHeaders; + +@CoreUnitTest +class HardStrictPriorityPollerTest extends TestBase { + + @Mock + private RqueueBeanProvider rqueueBeanProvider; + @Mock + private QueueStateMgr queueStateMgr; + @Mock + private PostProcessingHandler postProcessingHandler; + + private final String highPriorityQueue = "high-priority-" + UUID.randomUUID(); + private final String lowPriorityQueue = "low-priority-" + UUID.randomUUID(); + private HardStrictPriorityPoller poller; + private QueueDetail highDetail; + private QueueDetail lowDetail; + + @BeforeEach + public void setUp() { + MockitoAnnotations.openMocks(this); + + highDetail = createQueueDetail(highPriorityQueue, 100); + lowDetail = createQueueDetail(lowPriorityQueue, 10); + + List queueDetails = Arrays.asList(lowDetail, highDetail); + Map queueNameToThread = new HashMap<>(); + queueNameToThread.put(highPriorityQueue, mock(QueueThreadPool.class)); + queueNameToThread.put(lowPriorityQueue, mock(QueueThreadPool.class)); + + poller = spy(new HardStrictPriorityPoller( + "test-group", + queueDetails, + queueNameToThread, + rqueueBeanProvider, + queueStateMgr, + Collections.emptyList(), + 50L, + 50L, + postProcessingHandler, + new MessageHeaders(Collections.emptyMap()), + new HardStrictPriorityPollerProperties() + )); + + // КРИТИЧЕСКИ ВАЖНО: Разрешаем опрос очередей + lenient().doReturn(true).when(poller).eligibleForPolling(anyString()); + // КРИТИЧЕСКИ ВАЖНО: Запрещаем немедленный выход из цикла + lenient().doReturn(false).when(poller).shouldExit(); + } + + private QueueDetail createQueueDetail(String name, int priority) { + QueueDetail detail = mock(QueueDetail.class); + when(detail.getName()).thenReturn(name); + Map priorityMap = new HashMap<>(); + priorityMap.put(Constants.DEFAULT_PRIORITY_KEY, priority); + when(detail.getPriority()).thenReturn(priorityMap); + return detail; + } + + @Test + void testQueuesAreSortedByPriority() { + List sortedQueues = poller.queues; + assertTrue(sortedQueues.indexOf(highPriorityQueue) < sortedQueues.indexOf(lowPriorityQueue), + "High priority queue should be first"); + } + + @Test + void testExistMessagesInHigherPriorityQueueReturnsTrue() { + List queues = Arrays.asList(highPriorityQueue, lowPriorityQueue); + + // High queue has messages + lenient().doReturn(true).when(poller).existAvailableMessagesForPoll(highDetail); + + // Checking from low priority perspective + boolean result = poller.existMessagesInCurrentQueueOrHigherPriorityQueue(lowPriorityQueue, queues); + assertTrue(result, "Should return true because high priority queue has messages"); + } + + @Test + void testStrictExecutionPreventsLowPriorityPoll() throws Exception { + AtomicInteger highQueuePollCount = new AtomicInteger(0); + AtomicInteger lowQueuePollCount = new AtomicInteger(0); + + // High Priority always has messages + lenient().doReturn(true).when(poller).existAvailableMessagesForPoll(highDetail); + lenient().doAnswer(invocation -> { + highQueuePollCount.incrementAndGet(); + return 1; + }).when(poller).poll(anyInt(), eq(highPriorityQueue), eq(highDetail), any()); + + // Low Priority also has messages + lenient().doReturn(true).when(poller).existAvailableMessagesForPoll(lowDetail); + lenient().doAnswer(invocation -> { + lowQueuePollCount.incrementAndGet(); + return 1; + }).when(poller).poll(anyInt(), eq(lowPriorityQueue), eq(lowDetail), any()); + + Thread pollerThread = new Thread(poller::start); + pollerThread.start(); + + try { + // Wait for multiple polls of High priority + TimeoutUtils.waitFor(() -> highQueuePollCount.get() > 5, 2000, "high priority polls"); + + // Low priority must NEVER be polled because 'break' happens after high poll + assertTrue(lowQueuePollCount.get() == 0, "Low priority queue should not be polled"); + } finally { + stop(poller, pollerThread); + } + } + + @Test + void testLowPriorityIsPolledWhenHighIsEmpty() throws Exception { + AtomicInteger lowQueuePollCount = new AtomicInteger(0); + + // High is empty + lenient().doReturn(false).when(poller).existAvailableMessagesForPoll(highDetail); + + // Low has messages + lenient().doReturn(true).when(poller).existAvailableMessagesForPoll(lowDetail); + lenient().doAnswer(invocation -> { + lowQueuePollCount.incrementAndGet(); + return 1; + }).when(poller).poll(anyInt(), eq(lowPriorityQueue), eq(lowDetail), any()); + + Thread pollerThread = new Thread(poller::start); + pollerThread.start(); + + try { + TimeoutUtils.waitFor(() -> lowQueuePollCount.get() > 0, 2000, "low priority poll"); + assertTrue(lowQueuePollCount.get() > 0); + } finally { + stop(poller, pollerThread); + } + } + + private void stop(HardStrictPriorityPoller poller, Thread thread) { + lenient().doReturn(true).when(poller).shouldExit(); + if (thread != null && thread.isAlive()) { + thread.interrupt(); + try { + thread.join(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } +} \ No newline at end of file From 3f5b0a0637b253591025b7290bcc5b48b61b8a03 Mon Sep 17 00:00:00 2001 From: igorjava2025 <-> Date: Mon, 9 Mar 2026 10:41:05 +0300 Subject: [PATCH 2/4] fix comment (cherry picked from commit c92cb75dc50a1590f3881739db9f43b110c99217) --- .../sonus21/rqueue/listener/HardStrictPriorityPollerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java index 9315c2c9..2392ebb1 100644 --- a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerTest.java @@ -72,9 +72,9 @@ public void setUp() { new HardStrictPriorityPollerProperties() )); - // КРИТИЧЕСКИ ВАЖНО: Разрешаем опрос очередей + // Allowing queue polling lenient().doReturn(true).when(poller).eligibleForPolling(anyString()); - // КРИТИЧЕСКИ ВАЖНО: Запрещаем немедленный выход из цикла + // Disable immediate exit from the loop lenient().doReturn(false).when(poller).shouldExit(); } From c6459d3e6efb26622c06308fe8c50a1e686ae424 Mon Sep 17 00:00:00 2001 From: igorjava2025 <-> Date: Fri, 13 Mar 2026 12:54:02 +0300 Subject: [PATCH 3/4] add fixes after review (cherry picked from commit 21d6d8643aabe24b13f28244952a6d778b8732a4) --- .../rqueue/core/RqueueMessageTemplate.java | 7 +++ .../core/impl/RqueueMessageTemplateImpl.java | 16 ++++++ .../listener/HardStrictPriorityPoller.java | 38 +++++++++----- .../HardStrictPriorityPollerProperties.java | 9 ++++ ...ardStrictPriorityPollerPropertiesTest.java | 49 +++++++++++++++++++ 5 files changed, 107 insertions(+), 12 deletions(-) create mode 100644 rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerPropertiesTest.java diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java index 1885eafd..caf2e25f 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/RqueueMessageTemplate.java @@ -18,6 +18,7 @@ import com.github.sonus21.rqueue.models.MessageMoveResult; import java.util.List; +import java.util.Optional; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ZSetOperations.TypedTuple; import reactor.core.publisher.Flux; @@ -92,4 +93,10 @@ Long scheduleMessage( Flux addReactiveMessageWithDelay( String scheduledQueueName, String scheduledQueueChannelName, RqueueMessage rqueueMessage); + + Optional findFirstElementFromList(String name); + + Optional findFirstElementFromZset(String name); + + Optional> findFirstElementFromZsetWithScore(String name); } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java index fa6b288d..f121e775 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/impl/RqueueMessageTemplateImpl.java @@ -30,6 +30,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; @@ -343,4 +344,19 @@ public RedisTemplate getTemplate() { public Long removeElementFromZset(String zsetName, RqueueMessage rqueueMessage) { return super.removeFromZset(zsetName, rqueueMessage); } + + @Override + public Optional findFirstElementFromList(String name) { + return readFromList(name, 0, 0).stream().findFirst(); + } + + @Override + public Optional findFirstElementFromZset(String name) { + return readFromZset(name, 0, 0).stream().findFirst(); + } + + @Override + public Optional> findFirstElementFromZsetWithScore(String name) { + return readFromZsetWithScore(name, 0, 0).stream().findFirst(); + } } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java index 3d824bc3..04443827 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPoller.java @@ -151,23 +151,37 @@ boolean existMessagesInCurrentQueueOrHigherPriorityQueue(String currentQueue, Li } protected boolean existAvailableMessagesForPoll(QueueDetail queueDetail) { - List readyMessages = rqueueBeanProvider - .getRqueueMessageTemplate() - .readFromList(queueDetail.getQueueName(), 0, 0); - - if (readyMessages != null && !readyMessages.isEmpty()) { - log(Level.TRACE, "readyMessages exists for queue '{}', existAvailableMessagesForPoll = true.", null, queueDetail.getName()); + boolean readyMessagesExists = + rqueueBeanProvider + .getRqueueMessageTemplate() + .findFirstElementFromList(queueDetail.getQueueName()) + .isPresent(); + if (readyMessagesExists) { + log( + Level.TRACE, + "readyMessages exists for queue '{}', existAvailableMessagesForPoll = true.", + null, + queueDetail.getName()); return true; } // Only check delayed messages with score <= current time long currentTime = System.currentTimeMillis(); - List delayedMessages = rqueueBeanProvider - .getRqueueMessageTemplate() - .readFromZsetWithScore(queueDetail.getScheduledQueueName(), 0, currentTime); - - if (delayedMessages != null && !delayedMessages.isEmpty()) { - log(Level.TRACE, "delayedMessages exists for queue '{}', existAvailableMessagesForPoll = true.", null, queueDetail.getName()); + boolean delayedMessagesExists = + rqueueBeanProvider + .getRqueueMessageTemplate() + .findFirstElementFromZsetWithScore(queueDetail.getScheduledQueueName()) + .filter(element -> element.getScore() <= currentTime) + .isPresent(); + + if (delayedMessagesExists) { + log( + Level.TRACE, + "delayedMessages exists for scheduled queue '{}' and currentTime '{}'," + + " existAvailableMessagesForPoll = true.", + null, + queueDetail.getScheduledQueueName(), + currentTime); return true; } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java index 99ca44b9..618aadac 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerProperties.java @@ -11,6 +11,7 @@ public Long getAfterPollSleepInterval() { } public void setAfterPollSleepInterval(Long afterPollSleepInterval) { + validateTimeInterval(afterPollSleepInterval); this.afterPollSleepInterval = afterPollSleepInterval; } @@ -19,6 +20,14 @@ public Long getSemaphoreWaitTime() { } public void setSemaphoreWaitTime(Long semaphoreWaitTime) { + validateTimeInterval(semaphoreWaitTime); this.semaphoreWaitTime = semaphoreWaitTime; } + + private void validateTimeInterval(Long value) { + if (value == null || value > 0) { + return; + } + throw new IllegalArgumentException("Value must be positive: " + value); + } } diff --git a/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerPropertiesTest.java b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerPropertiesTest.java new file mode 100644 index 00000000..efa4e261 --- /dev/null +++ b/rqueue-core/src/test/java/com/github/sonus21/rqueue/listener/HardStrictPriorityPollerPropertiesTest.java @@ -0,0 +1,49 @@ +package com.github.sonus21.rqueue.listener; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.stream.LongStream; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.NullSource; + +class HardStrictPriorityPollerPropertiesTest { + + private final HardStrictPriorityPollerProperties properties = + new HardStrictPriorityPollerProperties(); + + static LongStream invalidValues() { + return LongStream.of(0L, -1L, -100L, Long.MIN_VALUE); + } + + static LongStream validValues() { + return LongStream.of(1L, Long.MAX_VALUE); + } + + @ParameterizedTest + @MethodSource("invalidValues") + void setAfterPollSleepIntervalInvalidValues(Long value) { + assertThrows(IllegalArgumentException.class, () -> properties.setAfterPollSleepInterval(value)); + } + + @ParameterizedTest + @NullSource + @MethodSource("validValues") + void setAfterPollSleepIntervalValidValues(Long value) { + assertDoesNotThrow(() -> properties.setAfterPollSleepInterval(value)); + } + + @ParameterizedTest + @MethodSource("invalidValues") + void setSemaphoreWaitTimeInvalidValues(Long value) { + assertThrows(IllegalArgumentException.class, () -> properties.setSemaphoreWaitTime(value)); + } + + @ParameterizedTest + @NullSource + @MethodSource("validValues") + void setSemaphoreWaitTimeValidValues(Long value) { + assertDoesNotThrow(() -> properties.setSemaphoreWaitTime(value)); + } +} From 5aff890c62520aae24a235fadd994b6bf504edf9 Mon Sep 17 00:00:00 2001 From: Sonu Kumar Date: Sun, 15 Mar 2026 01:37:43 +0530 Subject: [PATCH 4/4] Prepare 3.4.1 backport release --- build.gradle | 2 +- docs/CHANGELOG.md | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 74f54259..432ecfee 100644 --- a/build.gradle +++ b/build.gradle @@ -69,7 +69,7 @@ ext { subprojects { group = "com.github.sonus21" - version = "3.4.0-RELEASE" + version = "3.4.1-RELEASE" dependencies { // https://mvnrepository.com/artifact/org.springframework/spring-messaging diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 80a1c68e..f1b768b8 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -8,6 +8,13 @@ layout: default All notable user-facing changes to this project are documented in this file. +## Release [3.4.1] 15-March-2026 +### Features +* Backported `HARD_STRICT` priority mode from #279 for stricter priority queue polling on the 3.x line + +### Fixes +* Optimized hard-strict polling queue availability checks to avoid full list and sorted-set reads + ## Release [3.4.0] 22-July-2025 ### Fixes * Fixed unique enqueue message to reject the message upfront instead of identifying it later #259