diff --git a/nats-task-v2.md b/nats-task-v2.md index ec224e8b..6510992a 100644 --- a/nats-task-v2.md +++ b/nats-task-v2.md @@ -8,18 +8,21 @@ All v1 items are done and 360 unit tests pass. Branch `nats-backend` is ready to ## v2 pending items -### 1. Web dashboard — NATS gaps +### 1. Web dashboard — NATS gaps *(IN PROGRESS — pause/delete/explore landed)* Controllers are no longer Redis-gated but several operations throw `BackendCapabilityException` (HTTP 501) on NATS. The front-end should hide unsupported panels proactively instead of relying on 501s. -- Expose `GET /rqueue/api/capabilities` returning the `Capabilities` record so the UI can conditionally hide panels. -- Extend `Capabilities` with dashboard-op flags: `supportsCharts`, `supportsMessageBrowse`, `supportsAdminMove`. -- Wire the flags into Pebble templates (scheduled panel, cron jobs panel, chart panel already have `hideScheduledPanel` / `hideCronJobs` hooks in `DataViewResponse`). +- ✅ `GET /rqueue/api/capabilities` already returns the `Capabilities` record so the UI can conditionally hide panels. +- ✅ `RqueueQDetailServiceImpl.getRunningTasks()` / `getScheduledTasks()` now return header-only tables on NATS instead of zero rows / 501s. Pending queue browsing routes through `MessageBroker.peek()`. +- ✅ `NatsRqueueUtilityService` implements `pauseUnpauseQueue` (persists flag + notifies local `RqueueMessageListenerContainer`), soft `deleteMessage` (KV metadata flag), `getDataType` (returns `"STREAM"`), `aggregateDataCounter`. 20 unit tests cover the path. +- ⏳ Pause-event multi-instance fan-out: `RqueueInternalPubSubChannel` is Redis-only. NATS bridge follow-up: subscribe to `rqueue.internal.` via `MessageBroker.subscribe/publish` and rebroadcast pause requests across worker JVMs. +- ⏳ Extend `Capabilities` with dashboard-op flags: `supportsCharts`, `supportsMessageBrowse`, `supportsAdminMove` (not yet — current flags suffice for the panels we hide today). +- ⏳ Pebble templates: `hideScheduledPanel` / `hideCronJobs` already wired into `DataViewResponse`. Front-end hides those panels; chart and message-browse hides still TBD. Affected services that throw on NATS today: -- `RqueueDashboardChartServiceImpl` — time-series charts (no equivalent in JetStream) -- `RqueueUtilityServiceImpl` — move/enqueue admin ops -- `NatsMessageBrowsingRepository.viewData` — positional message browse +- `RqueueDashboardChartServiceImpl` — time-series charts (no equivalent in JetStream) — still pending +- `RqueueUtilityServiceImpl` — move/enqueue admin ops — `moveMessage`, `enqueueMessage`, `makeEmpty` deliberately remain `notSupported` (no JetStream primitive); `pauseUnpauseQueue` and `deleteMessage` now implemented +- `NatsMessageBrowsingRepository.viewData` — positional message browse (Redis-only by design) ### 2. Reactive listener container diff --git a/nats-task.md b/nats-task.md index c56a5acd..14d6a3fe 100644 --- a/nats-task.md +++ b/nats-task.md @@ -154,9 +154,16 @@ Then re-run: ./gradlew :rqueue-spring-boot-starter:test --tests "com.github.sonus21.rqueue.spring.boot.integration.NatsBackendEndToEndIT" ``` -### Web-layer NATS dashboard gap (new follow-up) +### Web-layer NATS dashboard gap (new follow-up) *(PARTIAL — admin write ops landing)* -All 4 controllers and the 5 web service impls (`RqueueDashboardChartService*`, `RqueueQDetailService*`, `RqueueJobService*`, `RqueueSystemManagerService*`, `RqueueUtilityService*`) are still gated `@Conditional(RedisBackendCondition)`. On NATS the dashboard reports broker-derived sizes only; no charts, no message browse, no admin ops. Plan to fix: +All 4 controllers and the 5 web service impls (`RqueueDashboardChartService*`, `RqueueQDetailService*`, `RqueueJobService*`, `RqueueSystemManagerService*`, `RqueueUtilityService*`) are still gated `@Conditional(RedisBackendCondition)`. On NATS the dashboard reports broker-derived sizes only; no charts, no message browse, no admin ops. + +Status: +- ✅ `NatsRqueueUtilityService` (rqueue-nats `@Conditional(NatsBackendCondition)`) replaces the all-stub impl: `pauseUnpauseQueue`, soft `deleteMessage`, `getDataType`, `aggregateDataCounter` work end-to-end. `moveMessage` / `enqueueMessage` / `makeEmpty` are deliberately `notSupported` (no JetStream equivalent). +- ✅ `RqueueQDetailServiceImpl` returns header-only tables for `getRunningTasks` / `getScheduledTasks` when the broker capabilities suppress those sections, instead of rendering 0-rows / 501s. +- ⏳ Charts (`RqueueDashboardChartService`), message browse, and `moveMessage` on NATS — still pending. + +Plan to fix the rest: 1. Introduce repository interfaces in `rqueue-core/repository/` for the few storage primitives the web services share (queue browsing, time-series counters, atomic move). Web service impls move into core / `rqueue-web` and depend only on the repos. 2. Redis impls of the repos stay in `rqueue-redis`; NATS impls go in `rqueue-nats` and throw `BackendCapabilityException("nats", "operation", "reason")` for primitives JetStream can't model (positional message moves, time-bucket charts). diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/EndpointRegistry.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/EndpointRegistry.java index b8a40966..36721839 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/EndpointRegistry.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/EndpointRegistry.java @@ -154,4 +154,27 @@ public static int getActiveQueueCount() { public static int getRegisteredQueueCount() { return registry.size(); } + + /** + * Returns every {@link QueueDetail} registered under the given queue name, including all + * {@code @RqueueListener} methods that share the same backing storage. Used by the + * dashboard to render one subscriber row per handler. Returns an empty list when no + * detail is registered. + */ + public static List getAllForQueue(String queueName) { + if (queueName == null) { + return new ArrayList<>(); + } + synchronized (lock) { + List matches = registry.values().stream() + .filter(qd -> queueName.equals(qd.getName())) + .sorted(Comparator.comparing(qd -> { + String cn = qd.getConsumerName(); + return cn == null ? "" : cn; + })) + .collect(Collectors.toList()); + lock.notifyAll(); + return matches; + } + } } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java index 8a72e345..1f368f5a 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/MessageBroker.java @@ -110,6 +110,18 @@ default List pop( List peek(QueueDetail q, long offset, long count); + /** + * Consumer-aware peek overload. When {@code consumerName} is non-null and the backend has + * per-consumer offsets (e.g. NATS Limits-retention streams), the implementation starts + * pagination from that consumer's next undelivered sequence so the dashboard shows messages + * still pending for that specific subscriber instead of the entire retained window. The + * default delegates to {@link #peek(QueueDetail, long, long)} for backends with a single + * shared pool. + */ + default List peek(QueueDetail q, String consumerName, long offset, long count) { + return peek(q, offset, count); + } + /** * Remove {@code old} from the processing store and re-enqueue {@code updated} for retry. * {@code delayMs <= 0} means immediate; {@code delayMs > 0} means schedule after that delay. @@ -198,6 +210,85 @@ default String dlqStorageDisplayName(QueueDetail q) { return null; } + /** + * Indicates whether {@link #size(QueueDetail)} returns an exact count or an approximation + * for the given queue. Brokers that compute pending from per-consumer position math (e.g. + * NATS JetStream Limits-retention streams) return {@code true} so the dashboard renders + * the figure with a "~" prefix instead of presenting it as authoritative. Defaults to + * {@code false} (the historical Redis behavior — exact list / sorted-set lengths). + */ + default boolean isSizeApproximate(QueueDetail q) { + return false; + } + + /** + * Per-consumer pending breakdown for queues whose backend has multiple independent + * subscribers — e.g. JetStream Limits-retention streams where each durable consumer + * progresses at its own pace. Returns an ordered map of {@code consumerName -> pending} + * so the dashboard can render one row per consumer instead of a single aggregate. + * + *

The default returns {@code null}, signalling that the queue has a single shared pool + * (Redis lists, NATS WorkQueue streams) and the caller should fall back to + * {@link #size(QueueDetail)}. Empty / null also means "no consumers attached". + * + * @deprecated superseded by {@link #subscribers(QueueDetail)} which returns a richer view + * (consumer + pending + in-flight + shared flag). Retained for one release so + * downstream callers keep compiling. + */ + @Deprecated + default java.util.Map consumerPendingSizes(QueueDetail q) { + return null; + } + + /** + * Per-subscriber breakdown for the queue-detail dashboard. Each entry represents one + * logical handler attached to the queue: + * + *

    + *
  • Redis — one entry per {@code @RqueueListener} method that registered for + * the queue. {@code pending} is the shared list size on every row + * ({@code pendingShared = true}); {@code inFlight} is the shared processing-ZSET + * size. + *
  • NATS JetStream — one entry per durable consumer. For WorkQueue retention + * {@code pending} is the shared stream {@code msgCount} ({@code pendingShared = true}); + * for Limits retention it is the exact per-consumer {@code numPending} + * ({@code pendingShared = false}). {@code inFlight} is the consumer's + * {@code numAckPending} in both cases. + *
+ * + *

The default returns a single anonymous row backed by {@link #size(QueueDetail)}, so + * brokers that don't track named subscribers still render a working table. + */ + default java.util.List subscribers(QueueDetail q) { + long pending; + try { + pending = size(q); + } catch (RuntimeException e) { + pending = 0L; + } + return java.util.Collections.singletonList( + new SubscriberView(q.resolvedConsumerName(), pending, 0L, true)); + } + + /** + * Backend-aware human-readable label for the given Redis-shaped {@code DataType} on the given + * dashboard tab. Surfaces in the queue-detail page's "Data Type" column so NATS deployments + * can show "Queue (Stream)" instead of "LIST". + * + *

The default returns {@code null}, which the dashboard interprets as "fall back to + * {@code DataType.name()}" (the historical Redis behavior). + * + * @param tab the dashboard nav tab the row corresponds to (PENDING, RUNNING, SCHEDULED, DEAD, + * COMPLETED, etc.). May be {@code null} when called in a context without a tab. + * @param type Redis-shaped data type used by the dashboard's table rendering. + * @return display label, or {@code null} to fall through to the default rendering. + */ + default String dataTypeLabel( + com.github.sonus21.rqueue.models.enums.NavTab tab, + com.github.sonus21.rqueue.models.enums.DataType type) { + return null; + } + AutoCloseable subscribe(String channel, Consumer handler); void publish(String channel, String payload); diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/SubscriberView.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/SubscriberView.java new file mode 100644 index 00000000..e9702561 --- /dev/null +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/SubscriberView.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.core.spi; + +/** + * Per-subscriber row surfaced by {@link MessageBroker#subscribers} for the queue-detail + * dashboard. Each entry corresponds to one logical consumer attached to the queue: + * + *

    + *
  • On Redis, one entry per {@code @RqueueListener} method (handlers all share the + * same backing list, so {@code pending} is the same on every row and + * {@code pendingShared} is {@code true}). + *
  • On NATS JetStream, one entry per durable consumer. {@code pending} is the + * per-consumer {@code numPending} for Limits retention (exact, divergent across + * rows) and the shared {@code msgCount} for WorkQueue retention (same on every + * row, {@code pendingShared = true}). + *
+ * + * @param consumerName logical consumer / handler name (from {@code @RqueueListener.consumerName} + * when set, otherwise a backend-derived name like {@code rqueue-}). + * @param pending messages waiting to be processed by this subscriber. + * @param inFlight messages this subscriber has received but not yet acknowledged. + * @param pendingShared {@code true} when {@code pending} is a queue-wide aggregate rather + * than this subscriber's exclusive backlog. The dashboard renders these with a + * "(shared)" hint so it's clear the figure is not per-consumer. + */ +public record SubscriberView( + String consumerName, long pending, long inFlight, boolean pendingShared) { + + public SubscriberView(String consumerName, long pending, long inFlight) { + this(consumerName, pending, inFlight, false); + } +} diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBroker.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBroker.java index 2351f820..5ac2f868 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBroker.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/core/spi/redis/RedisMessageBroker.java @@ -138,6 +138,50 @@ public long size(QueueDetail q) { return size == null ? 0L : size; } + /** + * Per-subscriber rows for the Redis backend. Walks the {@link + * com.github.sonus21.rqueue.core.EndpointRegistry} for every {@code @RqueueListener} + * registered against this queue, then reports the shared list size and processing-ZSET + * size on each row. {@code pendingShared = true} on every row because Redis listeners + * compete on the same backing list — the figure is identical across rows but surfacing + * each handler still tells the operator which methods are subscribed and (joined with + * the worker registry by the dashboard) when each was last active. + */ + @Override + public java.util.List subscribers( + QueueDetail q) { + long sharedPending; + try { + sharedPending = size(q); + } catch (RuntimeException e) { + sharedPending = 0L; + } + long sharedInFlight; + try { + RedisTemplate rt = template.getTemplate(); + Long zsetSize = rt.opsForZSet().size(q.getProcessingQueueName()); + sharedInFlight = zsetSize == null ? 0L : zsetSize; + } catch (RuntimeException e) { + sharedInFlight = 0L; + } + java.util.List registered = + com.github.sonus21.rqueue.core.EndpointRegistry.getAllForQueue(q.getName()); + if (registered.isEmpty()) { + // Queue is registered as primary only (no secondary handlers) — fall through to a + // single row using the queue's own consumer name so the table still renders. + return java.util.Collections.singletonList( + new com.github.sonus21.rqueue.core.spi.SubscriberView( + q.resolvedConsumerName(), sharedPending, sharedInFlight, true)); + } + java.util.List out = + new java.util.ArrayList<>(registered.size()); + for (QueueDetail qd : registered) { + out.add(new com.github.sonus21.rqueue.core.spi.SubscriberView( + qd.resolvedConsumerName(), sharedPending, sharedInFlight, true)); + } + return out; + } + @Override public AutoCloseable subscribe(String channel, Consumer handler) { if (pubSubContainer == null) { diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java index 9cd159c4..c89bcb2d 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/listener/QueueDetail.java @@ -174,19 +174,23 @@ public Duration visibilityDuration() { } /** - * Returns the effective JetStream consumer name for this queue. When {@link #consumerName} is - * explicitly set it is returned as-is. Otherwise a default is derived from the queue name: - * primary (non-system-generated) queues get {@code {name}-consumer-primary}; system-generated - * priority sub-queues get {@code {name}-consumer}. The name is sanitized so that characters + * Returns the effective broker-level consumer name for this queue. When {@link #consumerName} is + * explicitly set on the {@code @RqueueListener} it is returned as-is. Otherwise the default is + * derived from the queue name as {@code {name}-consumer}. The name is sanitized so characters * outside {@code [A-Za-z0-9_-]} (e.g. the {@code ::} priority suffix separator) are replaced * with {@code -}, producing a valid NATS consumer name in all cases. + * + *

A single suffix is used regardless of {@code systemGenerated} so the bootstrap validator + * and the runtime poller agree on the consumer name. NATS workqueue streams reject multiple + * non-filtered consumers (error 10099); using two different suffixes would cause the poller to + * try creating a second consumer with a different name and fail. */ public String resolvedConsumerName() { if (consumerName != null && !consumerName.isEmpty()) { return consumerName; } String sanitized = name.replaceAll("[^A-Za-z0-9_-]", "-"); - return systemGenerated ? sanitized + "-consumer" : sanitized + "-consumer-primary"; + return sanitized + "-consumer"; } public boolean isDoNotRetryError(Throwable throwable) { diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/request/QueueExploreRequest.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/request/QueueExploreRequest.java index 76350e28..05500cf6 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/request/QueueExploreRequest.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/request/QueueExploreRequest.java @@ -43,4 +43,12 @@ public class QueueExploreRequest extends SerializableBase { @JsonProperty("count") private int itemPerPage = 20; + + /** + * Optional consumer / subscriber name. When set on Limits-retention streams the broker + * starts the peek from that consumer's next undelivered sequence instead of the stream's + * first sequence, so the explorer shows messages that are still pending for this specific + * consumer rather than the entire retained window. + */ + private String consumerName; } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/response/RedisDataDetail.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/response/RedisDataDetail.java index fbe40d51..801aea4c 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/response/RedisDataDetail.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/response/RedisDataDetail.java @@ -37,4 +37,30 @@ public class RedisDataDetail extends SerializableBase { private String name; private DataType type; private long size; + + /** + * Backend-aware human-readable label for the {@link #type}. The legacy templates surface + * this directly so deployments can display "Queue" / "Stream" on NATS instead of Redis-shaped + * "LIST" / "ZSET" tokens. When unset, templates default to {@code type.name()}. + */ + private String typeLabel; + + /** + * Indicates that {@link #size} is an approximation rather than an exact count. NATS + * Limits-retention streams compute pending size from {@code lastSeq - min(delivered.streamSeq)} + * across durable consumers, which is approximate when filter subjects or per-consumer + * positions diverge. Templates render the size with a {@code ~} prefix when this is set. + */ + private boolean approximate; + + /** + * Optional consumer name when the row represents a single subscriber's view of a shared + * stream (e.g. NATS Limits-retention streams). Renders next to the stream name in the + * "Name" column so each durable consumer's lag is visible separately. + */ + private String consumerName; + + public RedisDataDetail(String name, DataType type, long size) { + this(name, type, size, null, false, null); + } } diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/response/SubscriberRow.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/response/SubscriberRow.java new file mode 100644 index 00000000..4f21819f --- /dev/null +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/response/SubscriberRow.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.models.response; + +import com.github.sonus21.rqueue.models.SerializableBase; +import com.github.sonus21.rqueue.models.enums.DataType; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +/** + * Per-subscriber row rendered by the queue-detail "Subscribers" section. Joins + * broker-supplied per-consumer counts with worker-registry status info so the dashboard + * can drop the standalone "Queue Pollers" table. + */ +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +@Builder +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = false) +public class SubscriberRow extends SerializableBase { + + private static final long serialVersionUID = 1L; + + /** Logical handler name (resolved consumer name on NATS, listener bean#method on Redis). */ + private String consumerName; + + /** Backend-aware label, e.g. "Queue (Stream)" / "Stream consumer" / "List". */ + private String typeLabel; + + /** The underlying storage handle exposed to the message-explorer modal. */ + private String storageName; + + /** Redis-shaped data type used for the explorer modal's pagination logic. */ + private DataType dataType; + + /** + * Messages this subscriber is responsible for processing. For brokers with shared pools + * (Redis, NATS WorkQueue) this is the queue-wide pending count; for NATS Limits streams + * it's the per-consumer {@code numPending}. + */ + private long pending; + + /** + * Indicates {@code pending} is a queue-wide aggregate, not this consumer's exclusive + * backlog. Templates render a "(shared)" hint when this is true. + */ + private boolean pendingShared; + + /** Messages delivered but not yet acknowledged by this subscriber. */ + private long inFlight; + + /** Worker status: ACTIVE / STALE / UNKNOWN. */ + private String status; + + /** + * Number of distinct {@code (JVM, consumer)} heartbeats currently active in the worker + * registry — i.e. how many polling worker instances are attached to this consumer across + * the cluster. The row's {@link #host}/{@link #pid}/{@link #lastPollAt} fields describe + * only the most-recently polling instance, so the dashboard surfaces the total count + * separately. Thread-level fanout from {@code @RqueueListener(concurrency = "10-20")} + * lives inside a single instance and is not reflected here — the registry tracks + * heartbeats per JVM, not per thread. + */ + private int workerCount; + + /** Host running the active worker, when known. */ + private String host; + + /** PID of the active worker, when known (kept as String to mirror the worker registry). */ + private String pid; + + /** Epoch ms of the most recent poll observed for this subscriber. */ + private long lastPollAt; + + /** Human-formatted "last poll age" — populated server-side for Pebble. */ + private String lastPollAge; +} diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/response/TerminalStorageRow.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/response/TerminalStorageRow.java new file mode 100644 index 00000000..cb2886fb --- /dev/null +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/models/response/TerminalStorageRow.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package com.github.sonus21.rqueue.models.response; + +import com.github.sonus21.rqueue.models.SerializableBase; +import com.github.sonus21.rqueue.models.enums.DataType; +import com.github.sonus21.rqueue.models.enums.NavTab; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +/** + * Row in the queue-detail "Terminal Storage" table — one entry per shared bucket + * (COMPLETED messages, DEAD letter queues). These are not per-consumer; the broker + * contributes a single shared count. + */ +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +@Builder +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = false) +public class TerminalStorageRow extends SerializableBase { + + private static final long serialVersionUID = 1L; + + /** Tab category — COMPLETED / DEAD. Used for the "Bucket" column label. */ + private NavTab tab; + + /** Backend-aware label, e.g. "Completed (KV)" / "Dead Letter (Stream)" / "ZSET" / "LIST". */ + private String typeLabel; + + /** Underlying storage handle (e.g. Redis ZSET key, JetStream stream name). */ + private String storageName; + + /** Redis-shaped data type used by the explorer modal's pagination logic. */ + private DataType dataType; + + /** + * Number of messages currently in this bucket. Negative values render as "Queue-backed" + * (matches the existing data-detail semantics for DLQs whose consumer is enabled). + */ + private long size; + + /** {@code true} when {@code size} is a best-effort estimate rather than an exact count. */ + private boolean approximate; +} diff --git a/rqueue-core/src/main/java/com/github/sonus21/rqueue/web/RqueueQDetailService.java b/rqueue-core/src/main/java/com/github/sonus21/rqueue/web/RqueueQDetailService.java index 5dde86b9..7e5f8706 100644 --- a/rqueue-core/src/main/java/com/github/sonus21/rqueue/web/RqueueQDetailService.java +++ b/rqueue-core/src/main/java/com/github/sonus21/rqueue/web/RqueueQDetailService.java @@ -22,6 +22,8 @@ import com.github.sonus21.rqueue.models.registry.RqueueWorkerPollerView; import com.github.sonus21.rqueue.models.response.DataViewResponse; import com.github.sonus21.rqueue.models.response.RedisDataDetail; +import com.github.sonus21.rqueue.models.response.SubscriberRow; +import com.github.sonus21.rqueue.models.response.TerminalStorageRow; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -38,10 +40,25 @@ Map>> getQueueDataStructureDetails( List> getQueueDataStructureDetail(QueueConfig queueConfig); + /** + * One row per subscriber attached to the queue (every {@code @RqueueListener} on Redis, + * every JetStream durable consumer on NATS). Backs the new "Subscribers" section on the + * queue-detail page; folds in last-active info from the worker registry so the standalone + * Queue Pollers section can be retired. + */ + List getSubscriberRows(QueueConfig queueConfig); + + /** + * One row per terminal storage bucket (COMPLETED, DEAD letter queues) for the given queue. + * These are not per-consumer — they are shared stores — so the dashboard renders them in a + * separate section from the per-subscriber rows. + */ + List getTerminalRows(QueueConfig queueConfig); + List getNavTabs(QueueConfig queueConfig); DataViewResponse getExplorePageData( - String src, String name, DataType type, int pageNumber, int itemPerPage); + String src, String name, DataType type, String consumerName, int pageNumber, int itemPerPage); DataViewResponse viewData( String name, DataType type, String key, int pageNumber, int itemPerPage); @@ -57,7 +74,7 @@ DataViewResponse viewData( List getQueueWorkers(String queueName); Mono getReactiveExplorePageData( - String src, String name, DataType type, int pageNumber, int itemPerPage); + String src, String name, DataType type, String consumerName, int pageNumber, int itemPerPage); Mono viewReactiveData( String name, DataType type, String key, int pageNumber, int itemPerPage); diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java index 3e64ca0a..8a17cb4e 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/js/JetStreamMessageBroker.java @@ -31,16 +31,13 @@ import io.nats.client.JetStreamSubscription; import io.nats.client.Message; import io.nats.client.PullSubscribeOptions; -import io.nats.client.api.AckPolicy; -import io.nats.client.api.ConsumerConfiguration; -import io.nats.client.api.DeliverPolicy; import io.nats.client.impl.Headers; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import java.util.logging.Level; @@ -81,10 +78,19 @@ public class JetStreamMessageBroker implements MessageBroker, AutoCloseable { private final NatsProvisioner provisioner; /** - * keyed by RqueueMessage.id, value is the underlying NATS Message for ack/nak. + * keyed by {@code "::"}, value is the underlying NATS + * Message for ack/nak. The consumer prefix is required for Limits-retention streams where + * multiple durable consumers each receive their own copy of every message — keying on + * just the message id would let one consumer's {@code put} overwrite another's, and the + * subsequent ack would target the wrong NATS Message handle (leaving the original delivery + * stuck in {@code numAckPending} until {@code AckWait} expires). */ private final ConcurrentHashMap inFlight = new ConcurrentHashMap<>(); + private static String inFlightKey(String consumerName, String messageId) { + return (consumerName == null ? "" : consumerName) + "::" + messageId; + } + /** * Cached pull subscriptions keyed by stream + consumerName so we don't re-bind on every pop. */ @@ -417,7 +423,7 @@ private List popInternal( // defensive: metadata unavailable on non-JetStream messages } if (rm.getId() != null) { - inFlight.put(rm.getId(), nm); + inFlight.put(inFlightKey(consumerName, rm.getId()), nm); } out.add(rm); } catch (RuntimeException | IOException e) { @@ -442,7 +448,7 @@ public boolean ack(QueueDetail q, RqueueMessage m) { if (m.getId() == null) { return false; } - Message nm = inFlight.remove(m.getId()); + Message nm = inFlight.remove(inFlightKey(q.resolvedConsumerName(), m.getId())); if (nm == null) { return false; } @@ -455,7 +461,7 @@ public boolean nack(QueueDetail q, RqueueMessage m, long retryDelayMs) { if (m.getId() == null) { return false; } - Message nm = inFlight.remove(m.getId()); + Message nm = inFlight.remove(inFlightKey(q.resolvedConsumerName(), m.getId())); if (nm == null) { return false; } @@ -472,7 +478,7 @@ public void moveToDlq( long delayMs) { // Ack the original NATS message so it is removed from the source stream. if (old.getId() != null) { - Message nm = inFlight.remove(old.getId()); + Message nm = inFlight.remove(inFlightKey(source.resolvedConsumerName(), old.getId())); if (nm != null) { nm.ack(); } @@ -510,55 +516,269 @@ public long moveExpired(QueueDetail q, long now, int batch) { @Override public List peek(QueueDetail q, long offset, long count) { + return peek(q, null, offset, count); + } + + @Override + public List peek(QueueDetail q, String consumerName, long offset, long count) { String stream = streamFor(q); - String subject = subjectFor(q); - JetStreamSubscription sub = null; + if (count <= 0) { + return Collections.emptyList(); + } try { - ConsumerConfiguration.Builder cb = ConsumerConfiguration.builder() - .ackPolicy(AckPolicy.None) - .filterSubject(subject) - .name("rqueue-js-peek-" + UUID.randomUUID()); - if (offset > 0) { - cb.deliverPolicy(DeliverPolicy.ByStartSequence).startSequence(Math.max(1L, offset)); - } else { - cb.deliverPolicy(DeliverPolicy.All); + // Read messages directly from the stream by sequence number via the JetStream + // Management API. This avoids creating any consumer, which sidesteps two NATS 2.12+ + // restrictions on WorkQueue-retention streams: + // 1. Pull consumers require AckPolicy.Explicit (error 10084). + // 2. Multiple consumers on a WorkQueue stream must be mutually exclusive via + // filter subjects (error 10100) — incompatible with the always-on durable + // consumer that the listener container uses. + // Reading by sequence is purely non-destructive and works regardless of retention + // policy or what other consumers exist on the stream. + io.nats.client.api.StreamInfo info = jsm.getStreamInfo(stream); + long firstSeq = info.getStreamState().getFirstSequence(); + long lastSeq = info.getStreamState().getLastSequence(); + if (lastSeq < firstSeq) { + return Collections.emptyList(); + } + // Consumer-aware base sequence for Limits-retention streams: when a consumerName is + // provided, start from that consumer's lowest unacked sequence (ackFloor + 1) so the + // dashboard shows everything this subscriber still has work to do on — both messages + // already delivered but not yet acked (in-flight) and messages still to be delivered + // (pending). Using delivered.streamSeq + 1 would hide the in-flight window, which + // surprises operators who see "in-flight = 15" but get an empty explorer. + // WorkQueue streams have a single shared consumer (msgs are removed on ack) so the + // stream's firstSeq is already the right base — skip the lookup. + long base = firstSeq; + if (consumerName != null + && !consumerName.isEmpty() + && info.getConfiguration() != null + && info.getConfiguration().getRetentionPolicy() + == io.nats.client.api.RetentionPolicy.Limits) { + try { + io.nats.client.api.ConsumerInfo ci = jsm.getConsumerInfo(stream, consumerName); + if (ci != null && ci.getAckFloor() != null) { + base = Math.max(firstSeq, ci.getAckFloor().getStreamSequence() + 1); + } + } catch (JetStreamApiException ignore) { + // consumer may have disappeared mid-walk; fall back to stream firstSeq + } } - PullSubscribeOptions opts = PullSubscribeOptions.builder().stream(stream) - .configuration(cb.build()) - .build(); - sub = js.subscribe(subject, opts); - int n = (int) Math.min(Integer.MAX_VALUE, Math.max(0L, count)); - List msgs = sub.fetch(n, Duration.ofSeconds(2)); - List out = new ArrayList<>(msgs.size()); - for (Message nm : msgs) { + long startSeq = base + Math.max(0L, offset); + long endSeq = Math.min(lastSeq, startSeq + count - 1); + List out = new ArrayList<>(); + for (long seq = startSeq; seq <= endSeq && out.size() < count; seq++) { try { - out.add(serdes.deserialize(nm.getData(), RqueueMessage.class)); - } catch (Exception e) { - log.log(Level.WARNING, "peek: skipping undeserializable message", e); + io.nats.client.api.MessageInfo mi = jsm.getMessage(stream, seq); + if (mi == null || mi.getData() == null) { + continue; + } + out.add(serdes.deserialize(mi.getData(), RqueueMessage.class)); + } catch (JetStreamApiException notFound) { + // Sequence may have been purged or skipped (e.g. WorkQueue acks); keep walking. + log.log( + Level.FINE, "peek: skipping missing seq=" + seq + " on stream=" + stream, notFound); + } catch (Exception deserErr) { + log.log(Level.WARNING, "peek: skipping undeserializable seq=" + seq, deserErr); } } return out; } catch (IOException | JetStreamApiException e) { throw new RqueueNatsException( "Failed to peek queue=" + q.getName() + " offset=" + offset + " count=" + count, e); - } finally { - if (sub != null) { + } + } + + @Override + public long size(QueueDetail q) { + String stream = streamFor(q); + try { + io.nats.client.api.StreamInfo info = jsm.getStreamInfo(stream); + io.nats.client.api.RetentionPolicy retention = + info.getConfiguration() != null ? info.getConfiguration().getRetentionPolicy() : null; + // WorkQueue retention removes messages on ack, so streamState.msgCount is the exact + // count of outstanding work — the natural "pending size" for queue mode. For Limits + // retention msgCount is the total retained messages regardless of consumer progress, + // so we compute the worst-case outstanding work from stream position math: + // outstanding ≈ lastSeq - min(consumer.ackFloor.streamSeq) + // which is the messages the slowest durable consumer has not yet acked. This matches + // the per-consumer pending semantic in subscribers() (numPending + numAckPending) so + // the queue-level "size" and the per-row pending counts agree on what "outstanding" + // means. + if (retention == io.nats.client.api.RetentionPolicy.Limits) { + return approximateLimitsPending(stream, info); + } + return info.getStreamState().getMsgCount(); + } catch (IOException | JetStreamApiException e) { + throw new RqueueNatsException("Failed to read stream size for queue=" + q.getName(), e); + } + } + + /** + * Position-based outstanding-work estimate for a Limits-retention stream: + * {@code lastSeq - min(consumer.ackFloor.streamSeq)} across all durable consumers — i.e. the + * size of the unacked window for the slowest consumer (counts both yet-to-deliver and + * delivered-but-unacked messages). Returns {@code msgCount} as a fallback when no consumers + * exist or the enumeration fails, so the dashboard never misses a non-zero queue. + */ + private long approximateLimitsPending(String stream, io.nats.client.api.StreamInfo info) { + long lastSeq = info.getStreamState().getLastSequence(); + if (lastSeq <= 0) { + return 0L; + } + try { + List consumers = jsm.getConsumerNames(stream); + if (consumers == null || consumers.isEmpty()) { + // No consumers attached: the entire retained range is outstanding from the perspective + // of any future consumer. Stream's msgCount is the right approximation. + return info.getStreamState().getMsgCount(); + } + long minAckFloor = Long.MAX_VALUE; + for (String consumer : consumers) { try { - sub.unsubscribe(); - } catch (RuntimeException ignored) { - // ephemeral consumer is auto-reaped server-side; ignore + io.nats.client.api.ConsumerInfo ci = jsm.getConsumerInfo(stream, consumer); + if (ci == null || ci.getAckFloor() == null) { + continue; + } + long ackFloor = ci.getAckFloor().getStreamSequence(); + if (ackFloor < minAckFloor) { + minAckFloor = ackFloor; + } + } catch (IOException | JetStreamApiException ignore) { + // best-effort; skip consumers that disappear mid-walk + } + } + if (minAckFloor == Long.MAX_VALUE) { + return info.getStreamState().getMsgCount(); + } + return Math.max(0L, lastSeq - minAckFloor); + } catch (IOException | JetStreamApiException ignore) { + return info.getStreamState().getMsgCount(); + } + } + + /** + * Reports whether {@link #size(QueueDetail)} is an approximation. True for Limits-retention + * streams (per-consumer position math) and false for WorkQueue streams (msgCount is exact). + */ + public boolean isSizeApproximate(QueueDetail q) { + String stream = streamFor(q); + try { + io.nats.client.api.StreamInfo info = jsm.getStreamInfo(stream); + io.nats.client.api.RetentionPolicy retention = + info.getConfiguration() != null ? info.getConfiguration().getRetentionPolicy() : null; + return retention == io.nats.client.api.RetentionPolicy.Limits; + } catch (IOException | JetStreamApiException e) { + return false; + } + } + + /** + * Per-consumer subscriber view used by the queue-detail dashboard. Walks all durable + * consumers on the queue's stream and reports each one's pending + in-flight counts as + * separate columns — same split as the Redis backend's processing ZSET vs ready LIST. + * + *

Pending semantics. {@code pending} is yet-to-deliver work for this consumer. + * For WorkQueue retention this is the stream's shared {@code msgCount} (every row shows the + * same number, marked {@code pendingShared = true}); for Limits retention it is the + * consumer's exact {@code numPending}. {@code inFlight} is always the consumer's exclusive + * {@code numAckPending}: messages delivered but not yet acked. The two are disjoint — + * {@code pending} excludes anything currently in flight — so an operator reading the row + * sees the work split between "still to dispatch" and "currently being processed". Total + * outstanding work for the consumer is the sum of the two, which is what the explorer + * surfaces when the operator clicks the consumer link. + * + *

If consumer enumeration fails or the stream is unprovisioned, falls back to the + * default single-row implementation so the dashboard still renders something useful. + */ + @Override + public java.util.List subscribers( + QueueDetail q) { + String stream = streamFor(q); + try { + io.nats.client.api.StreamInfo info = jsm.getStreamInfo(stream); + io.nats.client.api.RetentionPolicy retention = + info.getConfiguration() != null ? info.getConfiguration().getRetentionPolicy() : null; + boolean pendingIsShared = retention != io.nats.client.api.RetentionPolicy.Limits; + long sharedPending = info.getStreamState().getMsgCount(); + List consumers = jsm.getConsumerNames(stream); + if (consumers == null || consumers.isEmpty()) { + return com.github.sonus21.rqueue.core.spi.MessageBroker.super.subscribers(q); + } + java.util.List out = + new java.util.ArrayList<>(consumers.size()); + for (String consumer : consumers) { + try { + io.nats.client.api.ConsumerInfo ci = jsm.getConsumerInfo(stream, consumer); + if (ci == null) { + continue; + } + long pending = pendingIsShared ? sharedPending : ci.getNumPending(); + long inFlight = ci.getNumAckPending(); + out.add(new com.github.sonus21.rqueue.core.spi.SubscriberView( + consumer, pending, inFlight, pendingIsShared)); + } catch (IOException | JetStreamApiException ignore) { + // best-effort; skip consumers that disappear mid-walk } } + if (out.isEmpty()) { + return com.github.sonus21.rqueue.core.spi.MessageBroker.super.subscribers(q); + } + return out; + } catch (IOException | JetStreamApiException e) { + log.log(Level.WARNING, "subscribers() failed for stream=" + stream, e); + return com.github.sonus21.rqueue.core.spi.MessageBroker.super.subscribers(q); } } + /** + * For Limits-retention streams, returns an exact per-consumer pending count + * ({@code lastSeq - delivered.streamSeq}). For WorkQueue streams returns {@code null} so the + * dashboard falls back to the single {@link #size(QueueDetail)} row — WorkQueue messages are + * shared across consumers, so a per-consumer split is meaningless. + * + *

The map iteration order matches {@link io.nats.client.JetStreamManagement#getConsumerNames} + * (insertion order), giving the dashboard a stable rendering. + */ @Override - public long size(QueueDetail q) { + @Deprecated + public java.util.Map consumerPendingSizes(QueueDetail q) { String stream = streamFor(q); try { - return jsm.getStreamInfo(stream).getStreamState().getMsgCount(); + io.nats.client.api.StreamInfo info = jsm.getStreamInfo(stream); + io.nats.client.api.RetentionPolicy retention = + info.getConfiguration() != null ? info.getConfiguration().getRetentionPolicy() : null; + if (retention != io.nats.client.api.RetentionPolicy.Limits) { + // WorkQueue (and any future single-pool retention) doesn't have per-consumer pending. + return null; + } + long lastSeq = info.getStreamState().getLastSequence(); + List consumers = jsm.getConsumerNames(stream); + if (consumers == null || consumers.isEmpty()) { + return java.util.Collections.emptyMap(); + } + java.util.Map out = new java.util.LinkedHashMap<>(); + for (String consumer : consumers) { + try { + io.nats.client.api.ConsumerInfo ci = jsm.getConsumerInfo(stream, consumer); + if (ci == null) { + continue; + } + // Prefer numPending when available (server-computed); fall back to position math. + long pending = ci.getNumPending(); + if (pending == 0 && ci.getDelivered() != null) { + long delivered = ci.getDelivered().getStreamSequence(); + pending = Math.max(0L, lastSeq - delivered); + } + out.put(consumer, pending); + } catch (IOException | JetStreamApiException ignore) { + // best-effort; skip consumers that disappear mid-walk + } + } + return out; } catch (IOException | JetStreamApiException e) { - throw new RqueueNatsException("Failed to read stream size for queue=" + q.getName(), e); + log.log(Level.WARNING, "consumerPendingSizes failed for stream=" + stream, e); + return null; } } @@ -641,6 +861,32 @@ public String dlqStorageDisplayName(QueueDetail q) { return dlqStreamFor(q); } + /** + * Map the dashboard's Redis-shaped data-type tokens onto NATS terminology. Each per-queue + * stream uses the JetStream {@code WorkQueue} retention policy by default, so the pending + * row is labelled "Queue (Stream)". Completed messages are tracked in a KV bucket and DLQs + * are independent streams. + */ + @Override + public String dataTypeLabel( + com.github.sonus21.rqueue.models.enums.NavTab tab, + com.github.sonus21.rqueue.models.enums.DataType type) { + if (tab == null) { + return type == null ? null : "Stream"; + } + switch (tab) { + case PENDING: + return "Queue (Stream)"; + case DEAD: + return "Dead Letter (Stream)"; + case COMPLETED: + return "Completed (KV)"; + default: + // Running / Scheduled / Cron tabs are hidden on NATS via Capabilities; fall through. + return type == null ? null : "Stream"; + } + } + @Override public void close() { for (JetStreamSubscription s : subscriptionCache.values()) { diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueMessageMetadataService.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueMessageMetadataService.java index 25240262..24db30df 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueMessageMetadataService.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueMessageMetadataService.java @@ -122,10 +122,16 @@ public boolean deleteMessage(String queueName, String messageId, Duration ttl) { String metaId = RqueueMessageUtils.getMessageMetaId(queueName, messageId); MessageMetadata m = get(metaId); if (m == null) { - return false; + // NATS doesn't store metadata at enqueue time (storeMessageMetadata short-circuits in + // BaseMessageSender for brokers that don't use primary-handler dispatch). So a delete + // request from the dashboard for a stream-resident message will see no metadata. Create + // a tombstone entry keyed by metaId so subsequent peeks render the row as "deleted". + m = new MessageMetadata(metaId, MessageStatus.DELETED); } m.setDeleted(true); + m.setStatus(MessageStatus.DELETED); m.setDeletedOn(System.currentTimeMillis()); + m.setUpdatedOn(System.currentTimeMillis()); save(m, ttl, false); return true; } diff --git a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityService.java b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityService.java index b7fdc3f3..792f6ffc 100644 --- a/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityService.java +++ b/rqueue-nats/src/main/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityService.java @@ -12,7 +12,10 @@ import com.github.sonus21.rqueue.config.NatsBackendCondition; import com.github.sonus21.rqueue.config.RqueueWebConfig; +import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao; +import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; import com.github.sonus21.rqueue.models.Pair; +import com.github.sonus21.rqueue.models.db.QueueConfig; import com.github.sonus21.rqueue.models.enums.AggregationType; import com.github.sonus21.rqueue.models.request.MessageMoveRequest; import com.github.sonus21.rqueue.models.request.PauseUnpauseQueueRequest; @@ -21,48 +24,125 @@ import com.github.sonus21.rqueue.models.response.DataSelectorResponse; import com.github.sonus21.rqueue.models.response.MessageMoveResponse; import com.github.sonus21.rqueue.models.response.StringResponse; +import com.github.sonus21.rqueue.service.RqueueMessageMetadataService; import com.github.sonus21.rqueue.service.RqueueUtilityService; import com.github.sonus21.rqueue.utils.Constants; +import com.github.sonus21.rqueue.utils.StringUtils; +import java.time.Duration; import java.util.LinkedList; import java.util.List; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Conditional; import org.springframework.stereotype.Service; import reactor.core.publisher.Mono; /** - * NATS-backend stub for {@link RqueueUtilityService}. Admin/dashboard utility methods are - * Redis-only in v1; this stub returns "not supported" responses uniformly so the rest of the - * bean graph stays consistent. Replace with a NATS-native implementation in a follow-up. + * NATS-backend implementation of {@link RqueueUtilityService}. + * + *

The implementation supports operations that map cleanly onto JetStream's model: + *

    + *
  • {@link #pauseUnpauseQueue(PauseUnpauseQueueRequest)} — flips the {@code paused} flag on + * {@link QueueConfig} in the queue-config KV bucket and propagates the change to the local + * {@link RqueueMessageListenerContainer} so the poller stops polling. Multi-instance fan-out + * is a follow-up (NATS pub/sub bridge). + *
  • {@link #deleteMessage(String, String)} — soft delete: marks the metadata record in the + * message-metadata KV bucket. The stream message persists; the dashboard hides it via the + * {@code deleted} flag, matching the Redis impl's semantics. + *
  • {@link #aggregateDataCounter(AggregationType)} — pure date-selector logic, no backend + * dependency. + *
  • {@link #getDataType(String)} — reports {@code "STREAM"} since JetStream subjects map to + * stream messages, not Redis-shaped data structures. + *
+ * + *

Operations that have no JetStream equivalent return a structured "not supported" response: + * {@link #moveMessage(MessageMoveRequest)}, {@link #enqueueMessage(String, String, String)} (no + * scheduled-queue ZSET to re-enqueue from), and {@link #makeEmpty(String, String)} (would require + * stream re-creation, which is destructive and out-of-band). */ @Service @Conditional(NatsBackendCondition.class) +@Slf4j public class NatsRqueueUtilityService implements RqueueUtilityService { + private static final String NOT_SUPPORTED_SUFFIX = + " is not supported with rqueue.backend=nats in v1"; + + private final RqueueWebConfig rqueueWebConfig; + private final RqueueSystemConfigDao systemConfigDao; + private final RqueueMessageMetadataService messageMetadataService; + private final RqueueMessageListenerContainer rqueueMessageListenerContainer; + @Autowired - private RqueueWebConfig rqueueWebConfig; + public NatsRqueueUtilityService( + RqueueWebConfig rqueueWebConfig, + RqueueSystemConfigDao systemConfigDao, + RqueueMessageMetadataService messageMetadataService, + RqueueMessageListenerContainer rqueueMessageListenerContainer) { + this.rqueueWebConfig = rqueueWebConfig; + this.systemConfigDao = systemConfigDao; + this.messageMetadataService = messageMetadataService; + this.rqueueMessageListenerContainer = rqueueMessageListenerContainer; + } private static T notSupported(T response, String op) { response.setCode(1); - response.setMessage(op + " is not supported with rqueue.backend=nats in v1"); + response.setMessage(op + NOT_SUPPORTED_SUFFIX); return response; } + /** + * Soft-delete: marks the message metadata as deleted. The underlying stream message persists + * (JetStream streams are immutable), but the dashboard and consumers honor the deleted flag. + */ @Override public BooleanResponse deleteMessage(String queueName, String id) { - return notSupported(new BooleanResponse(), "deleteMessage"); + BooleanResponse response = new BooleanResponse(); + if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(id)) { + response.setCode(1); + response.setMessage("queueName and id are required"); + return response; + } + try { + boolean ok = messageMetadataService.deleteMessage( + queueName, id, Duration.ofDays(Constants.DAYS_IN_A_MONTH)); + if (!ok) { + response.setCode(1); + response.setMessage("Message metadata not found for queue=" + queueName + " id=" + id); + return response; + } + response.setValue(true); + return response; + } catch (Exception e) { + log.warn("deleteMessage failed for queue={} id={}", queueName, id, e); + response.setCode(1); + response.setMessage("deleteMessage failed: " + e.getMessage()); + return response; + } } + /** + * NATS does not support arbitrary message re-enqueue: stream sequences are immutable and there + * is no scheduled-queue ZSET to pull from. Surfaces a structured "not supported" response. + */ @Override public BooleanResponse enqueueMessage(String queueName, String id, String position) { return notSupported(new BooleanResponse(), "enqueueMessage"); } + /** + * NATS does not support cross-queue positional moves: streams are independent, sequences are + * immutable. Surfaces a structured "not supported" response. + */ @Override public MessageMoveResponse moveMessage(MessageMoveRequest messageMoveRequest) { return notSupported(new MessageMoveResponse(), "moveMessage"); } + /** + * NATS would require destructive stream re-creation to empty a queue. Out-of-band admin op + * (e.g. {@code nats stream purge}) is the recommended path; surfaces "not supported" for now. + */ @Override public BooleanResponse makeEmpty(String queueName, String dataName) { return notSupported(new BooleanResponse(), "makeEmpty"); @@ -73,45 +153,86 @@ public Pair getLatestVersion() { return new Pair<>("", ""); } + /** + * NATS-backed queues are always JetStream streams; report a fixed type rather than probing the + * KV / stream layer per call. + */ @Override public StringResponse getDataType(String name) { - return notSupported(new StringResponse(), "getDataType"); + StringResponse response = new StringResponse(); + response.setVal("STREAM"); + return response; } @Override public Mono makeEmptyReactive(String queueName, String datasetName) { - return Mono.just(notSupported(new BooleanResponse(), "makeEmptyReactive")); + return Mono.just(makeEmpty(queueName, datasetName)); } @Override public Mono deleteReactiveMessage(String queueName, String messageId) { - return Mono.just(notSupported(new BooleanResponse(), "deleteReactiveMessage")); + return Mono.just(deleteMessage(queueName, messageId)); } @Override public Mono enqueueReactiveMessage( String queueName, String messageId, String position) { - return Mono.just(notSupported(new BooleanResponse(), "enqueueReactiveMessage")); + return Mono.just(enqueueMessage(queueName, messageId, position)); } @Override public Mono getReactiveDataType(String name) { - return Mono.just(notSupported(new StringResponse(), "getReactiveDataType")); + return Mono.just(getDataType(name)); } @Override public Mono moveReactiveMessage(MessageMoveRequest request) { - return Mono.just(notSupported(new MessageMoveResponse(), "moveReactiveMessage")); + return Mono.just(moveMessage(request)); } @Override public Mono reactivePauseUnpauseQueue(PauseUnpauseQueueRequest request) { - return Mono.just(notSupported(new BaseResponse(), "reactivePauseUnpauseQueue")); + return Mono.just(pauseUnpauseQueue(request)); } + /** + * Toggle the {@code paused} flag on the queue's {@link QueueConfig} (persisted in the + * {@code rqueue-queue-config} KV bucket) and propagate the change to the local listener + * container so the poller stops dispatching new work. + * + *

Multi-instance fan-out (i.e. propagating the pause across worker JVMs) is a follow-up; + * single-instance deployments are fully covered by this path. + */ @Override public BaseResponse pauseUnpauseQueue(PauseUnpauseQueueRequest request) { - return notSupported(new BaseResponse(), "pauseUnpauseQueue"); + log.info("Queue PauseUnpause request {}", request); + BaseResponse response = new BaseResponse(); + if (request == null || StringUtils.isEmpty(request.getName())) { + response.set(400, "Queue name is required"); + return response; + } + QueueConfig queueConfig = systemConfigDao.getConfigByName(request.getName(), true); + if (queueConfig == null) { + response.set(404, "Queue does not exist"); + return response; + } + boolean targetState = request.isPause(); + if (queueConfig.isPaused() == targetState) { + // No-op: state already matches; respond OK and skip the listener call to avoid the + // "duplicate pause" / "not paused but unpause" warnings in QueueStateMgr. + return response; + } + queueConfig.setPaused(targetState); + systemConfigDao.saveQConfig(queueConfig); + try { + rqueueMessageListenerContainer.pauseUnpauseQueue(request.getName(), targetState); + } catch (Exception e) { + // QueueConfig is already persisted; surface the pause-propagation failure to the caller + // but do not roll back — the next listener restart will pick up the persisted flag. + log.warn("pauseUnpauseQueue listener notification failed for queue={}", request.getName(), e); + response.set(500, "Persisted but listener notification failed: " + e.getMessage()); + } + return response; } @Override diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AbstractJetStreamIT.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AbstractJetStreamIT.java index 2831edb9..70097635 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AbstractJetStreamIT.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/AbstractJetStreamIT.java @@ -79,9 +79,20 @@ protected QueueDetail mockQueue(String name) { } protected QueueDetail mockQueue(String name, QueueType type) { + return mockQueue(name, type, null); + } + + /** + * Build a mock QueueDetail whose {@code resolvedConsumerName()} returns the given consumer + * name. Used by tests that exercise multi-consumer flows where pop's {@code consumerName} + * argument must match what ack/nack derive from the QueueDetail. + */ + protected QueueDetail mockQueue(String name, QueueType type, String consumerName) { QueueDetail q = mock(QueueDetail.class); when(q.getName()).thenReturn(name); when(q.getType()).thenReturn(type); + String resolved = consumerName != null ? consumerName : name + "-consumer"; + when(q.resolvedConsumerName()).thenReturn(resolved); return q; } } diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerCompetingConsumersIT.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerCompetingConsumersIT.java index b253d230..1a2438a4 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerCompetingConsumersIT.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerCompetingConsumersIT.java @@ -27,7 +27,8 @@ class JetStreamMessageBrokerCompetingConsumersIT extends AbstractJetStreamIT { @Test void twoWorkersSharingDurable_eachMessageDeliveredOnce() throws Exception { - QueueDetail q = mockQueue("ccq-" + System.nanoTime()); + QueueDetail q = mockQueue( + "ccq-" + System.nanoTime(), com.github.sonus21.rqueue.enums.QueueType.QUEUE, "shared"); int total = 20; try (JetStreamMessageBroker broker = JetStreamMessageBroker.builder().connection(connection).build()) { diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerConsumerAwarePeekIT.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerConsumerAwarePeekIT.java new file mode 100644 index 00000000..d8583a2d --- /dev/null +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerConsumerAwarePeekIT.java @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + */ +package com.github.sonus21.rqueue.nats; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.enums.QueueType; +import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.nats.js.JetStreamMessageBroker; +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Test; + +/** + * Covers {@link com.github.sonus21.rqueue.core.spi.MessageBroker#peek(QueueDetail, String, long, + * long)} on a Limits-retention stream with two durable consumers at different progress levels. + * + *

The dashboard explorer wires the {@code consumerName} so each row's "browse" action shows + * messages still outstanding for that specific subscriber instead of the entire retained + * window. This test asserts that contract: + * + *

    + *
  • consumer-fast pops + acks the first half of the stream, advancing its {@code ackFloor}. + *
  • consumer-slow does nothing, so its {@code ackFloor} stays at 0. + *
  • {@code peek(q, "consumer-fast", 0, total)} returns only the second half (skips acked). + *
  • {@code peek(q, "consumer-slow", 0, total)} returns the whole stream. + *
  • {@code peek(q, null, 0, total)} also returns the whole stream — the no-consumer + * overload bases on the stream's first sequence and is unchanged by per-consumer state. + *
+ */ +@NatsIntegrationTest +class JetStreamMessageBrokerConsumerAwarePeekIT extends AbstractJetStreamIT { + + @Test + void peek_skipsAlreadyAckedRangeForSpecificConsumer() throws Exception { + String name = "cap-" + System.nanoTime(); + QueueDetail enqueueFacet = mockQueue(name, QueueType.STREAM); + QueueDetail qFast = mockQueue(name, QueueType.STREAM, "consumer-fast"); + QueueDetail qSlow = mockQueue(name, QueueType.STREAM, "consumer-slow"); + int total = 8; + int firstHalf = total / 2; + + RqueueNatsConfig cfg = RqueueNatsConfig.defaults(); + try (JetStreamMessageBroker broker = + JetStreamMessageBroker.builder().connection(connection).config(cfg).build()) { + // Enqueue first to create the stream — pop's ensureConsumer needs the stream to exist. + // DeliverPolicy.All on the consumer means it starts at the stream's first sequence, so + // both durables created after the publish still see every message. + for (int i = 0; i < total; i++) { + broker.enqueue( + enqueueFacet, RqueueMessage.builder().id("m-" + i).message("p" + i).build()); + } + + // Drain the first half on consumer-fast — pop + ack so its ackFloor advances. + Set fastSeen = new HashSet<>(); + long deadline = System.currentTimeMillis() + 5000; + while (fastSeen.size() < firstHalf && System.currentTimeMillis() < deadline) { + List batch = + broker.pop(qFast, "consumer-fast", firstHalf - fastSeen.size(), Duration.ofMillis(500)); + for (RqueueMessage m : batch) { + if (fastSeen.add(m.getId())) { + assertTrue(broker.ack(qFast, m), "ack must succeed for " + m.getId()); + } + } + } + assertEquals(firstHalf, fastSeen.size(), "consumer-fast should have drained first half"); + // Wait for the server to apply the acks before peeking — getConsumerInfo's ackFloor is + // observed asynchronously after nm.ack(). + waitForAckFloorAtLeast(cfg.getStreamPrefix() + name, "consumer-fast", firstHalf); + + // Per-consumer peek for consumer-fast must SKIP the acked range and return only the + // second half (msgs whose stream seq > ackFloor). + List fastPeek = broker.peek(qFast, "consumer-fast", 0, total); + Set fastIds = fastPeek.stream().map(RqueueMessage::getId).collect(Collectors.toSet()); + assertEquals( + total - firstHalf, + fastPeek.size(), + "consumer-fast peek should return only the un-acked tail; got " + fastIds); + for (String acked : fastSeen) { + assertFalse( + fastIds.contains(acked), + "consumer-fast peek must not include already-acked id=" + acked); + } + + // Per-consumer peek for consumer-slow should still see every message — its ackFloor is 0. + List slowPeek = broker.peek(qSlow, "consumer-slow", 0, total); + assertEquals( + total, + slowPeek.size(), + "consumer-slow peek should return the full stream — its ackFloor hasn't advanced"); + + // No-consumer peek behaves identically regardless of per-consumer progress — bases on + // the stream's first sequence (this is the legacy 2-arg overload's contract). + List globalPeek = broker.peek(qFast, 0, total); + assertEquals( + total, + globalPeek.size(), + "global (no-consumer) peek should ignore per-consumer ackFloor"); + } + } + + private void waitForAckFloorAtLeast(String stream, String consumer, long minStreamSeq) + throws Exception { + long deadline = System.currentTimeMillis() + 5000; + while (System.currentTimeMillis() < deadline) { + io.nats.client.api.ConsumerInfo ci = + connection.jetStreamManagement().getConsumerInfo(stream, consumer); + if (ci != null + && ci.getAckFloor() != null + && ci.getAckFloor().getStreamSequence() >= minStreamSeq) { + return; + } + Thread.sleep(50L); + } + throw new AssertionError( + "Timed out waiting for " + consumer + " ackFloor to reach streamSeq " + minStreamSeq); + } +} diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerEnqueueAckIT.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerEnqueueAckIT.java index accef105..56e2c708 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerEnqueueAckIT.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerEnqueueAckIT.java @@ -25,7 +25,8 @@ class JetStreamMessageBrokerEnqueueAckIT extends AbstractJetStreamIT { @Test void enqueuePopAck_drainsStream() throws Exception { - QueueDetail q = mockQueue("eaq-" + System.nanoTime()); + QueueDetail q = mockQueue( + "eaq-" + System.nanoTime(), com.github.sonus21.rqueue.enums.QueueType.QUEUE, "worker"); RqueueNatsConfig cfg = RqueueNatsConfig.defaults(); cfg.getStreamDefaults().setRetention(io.nats.client.api.RetentionPolicy.WorkQueue); try (JetStreamMessageBroker broker = diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerIndependentConsumersIT.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerIndependentConsumersIT.java index cf25cf19..7f342fb3 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerIndependentConsumersIT.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerIndependentConsumersIT.java @@ -26,18 +26,24 @@ class JetStreamMessageBrokerIndependentConsumersIT extends AbstractJetStreamIT { @Test void twoDurables_eachReceiveAllMessages() throws Exception { - QueueDetail q = mockQueue("icq-" + System.nanoTime(), QueueType.STREAM); + String name = "icq-" + System.nanoTime(); + // Same stream, two QueueDetail facets (one per @RqueueListener) — mirrors how production + // builds a separate QueueDetail per listener with its own resolvedConsumerName. + QueueDetail enqueueFacet = mockQueue(name, QueueType.STREAM); + QueueDetail qa = mockQueue(name, QueueType.STREAM, "consumer-a"); + QueueDetail qb = mockQueue(name, QueueType.STREAM, "consumer-b"); int total = 5; try (JetStreamMessageBroker broker = JetStreamMessageBroker.builder().connection(connection).build()) { for (int i = 0; i < total; i++) { - broker.enqueue(q, RqueueMessage.builder().id("m-" + i).message("p" + i).build()); + broker.enqueue( + enqueueFacet, RqueueMessage.builder().id("m-" + i).message("p" + i).build()); } Set aSeen = new HashSet<>(); Set bSeen = new HashSet<>(); - drainInto(broker, q, "consumer-a", aSeen); - drainInto(broker, q, "consumer-b", bSeen); + drainInto(broker, qa, "consumer-a", aSeen); + drainInto(broker, qb, "consumer-b", bSeen); assertEquals(total, aSeen.size()); assertEquals(total, bSeen.size()); diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerMultiConsumerAckIT.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerMultiConsumerAckIT.java new file mode 100644 index 00000000..d6cda836 --- /dev/null +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerMultiConsumerAckIT.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + */ +package com.github.sonus21.rqueue.nats; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.github.sonus21.rqueue.core.RqueueMessage; +import com.github.sonus21.rqueue.enums.QueueType; +import com.github.sonus21.rqueue.listener.QueueDetail; +import com.github.sonus21.rqueue.nats.js.JetStreamMessageBroker; +import io.nats.client.api.ConsumerInfo; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; + +/** + * Regression test for the inFlight key-collision bug. + * + *

On a Limits-retention stream with two durable consumers (multi-listener fan-out), each + * consumer receives its own NATS Message handle for every published message. The broker's + * {@code inFlight} map was previously keyed only on {@code RqueueMessage.id}, so the second + * consumer's pop overwrote the first's handle, and the first's later {@code ack} would target + * the wrong NATS Message — leaving the original delivery stuck in {@code numAckPending} until + * AckWait expired. + * + *

The test pops on both consumers before acking either, which is the only timing + * that triggers the collision: a sequential drain-then-drain hides the bug because the inFlight + * key is removed before the second pop populates it. After acking each consumer's deliveries, + * the test asserts that both consumers reach {@code numAckPending == 0} and their ack + * floors advance to the stream's last sequence. + */ +@NatsIntegrationTest +class JetStreamMessageBrokerMultiConsumerAckIT extends AbstractJetStreamIT { + + @Test + void twoDurables_bothAckTheirOwnDeliveries() throws Exception { + String name = "mca-" + System.nanoTime(); + QueueDetail enqueueFacet = mockQueue(name, QueueType.STREAM); + QueueDetail qa = mockQueue(name, QueueType.STREAM, "consumer-a"); + QueueDetail qb = mockQueue(name, QueueType.STREAM, "consumer-b"); + int total = 6; + + RqueueNatsConfig cfg = RqueueNatsConfig.defaults(); + try (JetStreamMessageBroker broker = + JetStreamMessageBroker.builder().connection(connection).config(cfg).build()) { + for (int i = 0; i < total; i++) { + broker.enqueue( + enqueueFacet, RqueueMessage.builder().id("m-" + i).message("p" + i).build()); + } + + // Pop on BOTH consumers BEFORE acking either — this is the key to triggering the + // collision: consumer-b's pop overwrites consumer-a's inFlight entry, so when + // consumer-a later calls ack, the buggy implementation reaches for consumer-b's + // NATS Message handle. Acking sequentially (drain-then-drain) hides the bug + // because the inFlight key is removed before the second pop populates it. + List aPopped = pop(broker, qa, "consumer-a", total); + List bPopped = pop(broker, qb, "consumer-b", total); + + assertEquals(total, aPopped.size(), "consumer-a should see every published message"); + assertEquals(total, bPopped.size(), "consumer-b should see every published message"); + + // Ack both — under the buggy implementation, consumer-a's ack resolves to consumer-b's + // NATS Message and acks that one; consumer-a's original delivery stays stuck in + // numAckPending and consumer-b's ack returns false (entry already removed by a). + for (RqueueMessage m : aPopped) { + assertTrue(broker.ack(qa, m), "ack(consumer-a, " + m.getId() + ") must succeed"); + } + for (RqueueMessage m : bPopped) { + assertTrue(broker.ack(qb, m), "ack(consumer-b, " + m.getId() + ") must succeed"); + } + + String stream = cfg.getStreamPrefix() + name; + // Acks are async — the server applies them after the broker returns. Poll for drain. + ConsumerInfo aInfo = waitForAckPendingZero(stream, "consumer-a"); + ConsumerInfo bInfo = waitForAckPendingZero(stream, "consumer-b"); + + assertEquals( + 0L, + aInfo.getNumAckPending(), + "consumer-a numAckPending must drain to 0; was " + aInfo.getNumAckPending() + + " — indicates ack went to the wrong NATS handle"); + assertEquals( + 0L, + bInfo.getNumAckPending(), + "consumer-b numAckPending must drain to 0; was " + bInfo.getNumAckPending()); + + long lastSeq = connection + .jetStreamManagement() + .getStreamInfo(stream) + .getStreamState() + .getLastSequence(); + assertTrue( + aInfo.getAckFloor().getStreamSequence() >= lastSeq, + "consumer-a ackFloor should reach lastSeq=" + lastSeq + " but was " + + aInfo.getAckFloor().getStreamSequence()); + assertTrue( + bInfo.getAckFloor().getStreamSequence() >= lastSeq, + "consumer-b ackFloor should reach lastSeq=" + lastSeq + " but was " + + bInfo.getAckFloor().getStreamSequence()); + } + } + + private List pop( + JetStreamMessageBroker broker, QueueDetail q, String consumer, int expected) + throws Exception { + List all = new ArrayList<>(expected); + long deadline = System.currentTimeMillis() + 5000; + while (all.size() < expected && System.currentTimeMillis() < deadline) { + List batch = broker.pop(q, consumer, expected, Duration.ofMillis(500)); + all.addAll(batch); + } + return all; + } + + private ConsumerInfo waitForAckPendingZero(String stream, String consumer) throws Exception { + long deadline = System.currentTimeMillis() + 5000; + ConsumerInfo last = connection.jetStreamManagement().getConsumerInfo(stream, consumer); + while (last.getNumAckPending() > 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(50L); + last = connection.jetStreamManagement().getConsumerInfo(stream, consumer); + } + return last; + } +} diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerRetryDlqIT.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerRetryDlqIT.java index 6a37ea67..3200aee0 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerRetryDlqIT.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamMessageBrokerRetryDlqIT.java @@ -24,7 +24,7 @@ class JetStreamMessageBrokerRetryDlqIT extends AbstractJetStreamIT { @Test void exhaustedMessage_landsOnDlqStream() throws Exception { String name = "rdq-" + System.nanoTime(); - QueueDetail q = mockQueue(name); + QueueDetail q = mockQueue(name, com.github.sonus21.rqueue.enums.QueueType.QUEUE, "worker"); RqueueNatsConfig cfg = RqueueNatsConfig.defaults(); cfg.getConsumerDefaults().setMaxDeliver(2); cfg.getConsumerDefaults().setAckWait(Duration.ofMillis(500)); diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamQueueModeIT.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamQueueModeIT.java index fc9917a0..b80ddfeb 100644 --- a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamQueueModeIT.java +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/JetStreamQueueModeIT.java @@ -95,8 +95,8 @@ void queueMode_stream_createsLimitsStream() throws Exception { */ @Test void queueMode_consumerReuse_preservesDeliveryPosition() throws Exception { - QueueDetail q = mockQueue("qm-reuse-" + System.nanoTime(), QueueType.QUEUE); String consumerName = "c1-reuse"; + QueueDetail q = mockQueue("qm-reuse-" + System.nanoTime(), QueueType.QUEUE, consumerName); int total = 5; int firstBatch = 3; @@ -156,7 +156,8 @@ void queueMode_consumerReuse_preservesDeliveryPosition() throws Exception { @Test void queueMode_queue_competingConsumers_eachMessageDeliveredOnce() throws Exception { - QueueDetail q = mockQueue("qm-cc-" + System.nanoTime(), QueueType.QUEUE); + String sharedConsumer = "shared-cc"; + QueueDetail q = mockQueue("qm-cc-" + System.nanoTime(), QueueType.QUEUE, sharedConsumer); int total = 20; try (JetStreamMessageBroker broker = @@ -167,7 +168,6 @@ void queueMode_queue_competingConsumers_eachMessageDeliveredOnce() throws Except Set seen = ConcurrentHashMap.newKeySet(); CountDownLatch done = new CountDownLatch(total); - String sharedConsumer = "shared-cc"; var pool = Executors.newFixedThreadPool(2); for (int t = 0; t < 2; t++) { pool.submit(() -> { @@ -194,19 +194,25 @@ void queueMode_queue_competingConsumers_eachMessageDeliveredOnce() throws Except @Test void queueMode_stream_fanOut_everyConsumerReceivesAllMessages() throws Exception { - QueueDetail q = mockQueue("qm-fo-" + System.nanoTime(), QueueType.STREAM); + String name = "qm-fo-" + System.nanoTime(); + // Same stream, two QueueDetail facets — one per @RqueueListener with its own + // resolvedConsumerName so ack/nack key on the right (consumer, id) pair. + QueueDetail enqueueFacet = mockQueue(name, QueueType.STREAM); + QueueDetail q1 = mockQueue(name, QueueType.STREAM, "listener-svc-1"); + QueueDetail q2 = mockQueue(name, QueueType.STREAM, "listener-svc-2"); int total = 8; try (JetStreamMessageBroker broker = JetStreamMessageBroker.builder().connection(connection).build()) { for (int i = 0; i < total; i++) { - broker.enqueue(q, RqueueMessage.builder().id("fo-" + i).message("p" + i).build()); + broker.enqueue( + enqueueFacet, RqueueMessage.builder().id("fo-" + i).message("p" + i).build()); } // Each listener group uses a distinct consumer name — they are independent on a // Limits-retention stream and each track their own delivery position. - Set listenerOneSeen = drain(broker, q, "listener-svc-1", total); - Set listenerTwoSeen = drain(broker, q, "listener-svc-2", total); + Set listenerOneSeen = drain(broker, q1, "listener-svc-1", total); + Set listenerTwoSeen = drain(broker, q2, "listener-svc-2", total); assertEquals( total, listenerOneSeen.size(), "STREAM mode: listener-svc-1 must receive all messages"); diff --git a/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityServiceTest.java b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityServiceTest.java new file mode 100644 index 00000000..6a353b1d --- /dev/null +++ b/rqueue-nats/src/test/java/com/github/sonus21/rqueue/nats/service/NatsRqueueUtilityServiceTest.java @@ -0,0 +1,291 @@ +/* + * Copyright (c) 2026 Sonu Kumar + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + */ + +package com.github.sonus21.rqueue.nats.service; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.github.sonus21.rqueue.config.RqueueWebConfig; +import com.github.sonus21.rqueue.dao.RqueueSystemConfigDao; +import com.github.sonus21.rqueue.listener.RqueueMessageListenerContainer; +import com.github.sonus21.rqueue.models.db.QueueConfig; +import com.github.sonus21.rqueue.models.enums.AggregationType; +import com.github.sonus21.rqueue.models.request.MessageMoveRequest; +import com.github.sonus21.rqueue.models.request.PauseUnpauseQueueRequest; +import com.github.sonus21.rqueue.models.response.BaseResponse; +import com.github.sonus21.rqueue.models.response.BooleanResponse; +import com.github.sonus21.rqueue.models.response.DataSelectorResponse; +import com.github.sonus21.rqueue.models.response.MessageMoveResponse; +import com.github.sonus21.rqueue.models.response.StringResponse; +import com.github.sonus21.rqueue.nats.NatsUnitTest; +import com.github.sonus21.rqueue.service.RqueueMessageMetadataService; +import java.time.Duration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +/** + * Unit tests for {@link NatsRqueueUtilityService}. Covers the soft-delete, pause/unpause, and + * "not supported" stubs to lock in v1 behavior. + */ +@NatsUnitTest +class NatsRqueueUtilityServiceTest { + + private RqueueWebConfig webConfig; + private RqueueSystemConfigDao systemConfigDao; + private RqueueMessageMetadataService metadataService; + private RqueueMessageListenerContainer listenerContainer; + private NatsRqueueUtilityService service; + + @BeforeEach + void setup() { + webConfig = Mockito.mock(RqueueWebConfig.class); + when(webConfig.getHistoryDay()).thenReturn(7); + systemConfigDao = Mockito.mock(RqueueSystemConfigDao.class); + metadataService = Mockito.mock(RqueueMessageMetadataService.class); + listenerContainer = Mockito.mock(RqueueMessageListenerContainer.class); + service = new NatsRqueueUtilityService( + webConfig, systemConfigDao, metadataService, listenerContainer); + } + + // --- deleteMessage -------------------------------------------------------- + + @Test + void deleteMessage_softDeletesMetadata_returnsValueTrue() { + when(metadataService.deleteMessage(eq("q"), eq("m1"), any(Duration.class))).thenReturn(true); + BooleanResponse response = service.deleteMessage("q", "m1"); + assertEquals(0, response.getCode()); + assertTrue(response.isValue()); + verify(metadataService).deleteMessage(eq("q"), eq("m1"), any(Duration.class)); + } + + @Test + void deleteMessage_metadataMissing_returnsErrorCode() { + when(metadataService.deleteMessage(anyString(), anyString(), any(Duration.class))) + .thenReturn(false); + BooleanResponse response = service.deleteMessage("q", "missing"); + assertEquals(1, response.getCode()); + assertNotNull(response.getMessage()); + assertFalse(response.isValue()); + } + + @Test + void deleteMessage_emptyQueueName_returnsValidationError() { + BooleanResponse response = service.deleteMessage("", "m1"); + assertEquals(1, response.getCode()); + verify(metadataService, never()).deleteMessage(anyString(), anyString(), any(Duration.class)); + } + + @Test + void deleteMessage_metadataServiceThrows_returnsErrorCode() { + when(metadataService.deleteMessage(anyString(), anyString(), any(Duration.class))) + .thenThrow(new RuntimeException("kv unavailable")); + BooleanResponse response = service.deleteMessage("q", "m1"); + assertEquals(1, response.getCode()); + assertNotNull(response.getMessage()); + assertTrue(response.getMessage().contains("kv unavailable")); + } + + // --- pauseUnpauseQueue ---------------------------------------------------- + + @Test + void pauseUnpauseQueue_persistsFlagAndNotifiesListener() { + QueueConfig config = + QueueConfig.builder().name("q").queueName("q").paused(false).build(); + when(systemConfigDao.getConfigByName("q", true)).thenReturn(config); + PauseUnpauseQueueRequest request = new PauseUnpauseQueueRequest(); + request.setName("q"); + request.setPause(true); + + BaseResponse response = service.pauseUnpauseQueue(request); + + assertEquals(0, response.getCode()); + ArgumentCaptor captor = ArgumentCaptor.forClass(QueueConfig.class); + verify(systemConfigDao).saveQConfig(captor.capture()); + assertTrue(captor.getValue().isPaused(), "QueueConfig should be persisted with paused=true"); + verify(listenerContainer).pauseUnpauseQueue("q", true); + } + + @Test + void pauseUnpauseQueue_unpause_propagatesFalse() { + QueueConfig config = + QueueConfig.builder().name("q").queueName("q").paused(true).build(); + when(systemConfigDao.getConfigByName("q", true)).thenReturn(config); + PauseUnpauseQueueRequest request = new PauseUnpauseQueueRequest(); + request.setName("q"); + request.setPause(false); + + BaseResponse response = service.pauseUnpauseQueue(request); + + assertEquals(0, response.getCode()); + verify(listenerContainer).pauseUnpauseQueue("q", false); + } + + @Test + void pauseUnpauseQueue_unknownQueue_returns404() { + when(systemConfigDao.getConfigByName(anyString(), anyBoolean())).thenReturn(null); + PauseUnpauseQueueRequest request = new PauseUnpauseQueueRequest(); + request.setName("missing"); + request.setPause(true); + + BaseResponse response = service.pauseUnpauseQueue(request); + + assertEquals(404, response.getCode()); + verify(systemConfigDao, never()).saveQConfig(any(QueueConfig.class)); + verify(listenerContainer, never()).pauseUnpauseQueue(anyString(), anyBoolean()); + } + + @Test + void pauseUnpauseQueue_alreadyInTargetState_isNoOp() { + QueueConfig config = + QueueConfig.builder().name("q").queueName("q").paused(true).build(); + when(systemConfigDao.getConfigByName("q", true)).thenReturn(config); + PauseUnpauseQueueRequest request = new PauseUnpauseQueueRequest(); + request.setName("q"); + request.setPause(true); + + BaseResponse response = service.pauseUnpauseQueue(request); + + assertEquals(0, response.getCode()); + // No save and no listener notification when state is already correct. + verify(systemConfigDao, never()).saveQConfig(any(QueueConfig.class)); + verify(listenerContainer, never()).pauseUnpauseQueue(anyString(), anyBoolean()); + } + + @Test + void pauseUnpauseQueue_emptyName_returns400() { + PauseUnpauseQueueRequest request = new PauseUnpauseQueueRequest(); + request.setName(""); + request.setPause(true); + + BaseResponse response = service.pauseUnpauseQueue(request); + + assertEquals(400, response.getCode()); + } + + @Test + void pauseUnpauseQueue_listenerThrows_persistsButReports500() { + QueueConfig config = + QueueConfig.builder().name("q").queueName("q").paused(false).build(); + when(systemConfigDao.getConfigByName("q", true)).thenReturn(config); + Mockito.doThrow(new RuntimeException("listener offline")) + .when(listenerContainer) + .pauseUnpauseQueue(anyString(), anyBoolean()); + PauseUnpauseQueueRequest request = new PauseUnpauseQueueRequest(); + request.setName("q"); + request.setPause(true); + + BaseResponse response = service.pauseUnpauseQueue(request); + + assertEquals(500, response.getCode()); + verify(systemConfigDao, times(1)).saveQConfig(any(QueueConfig.class)); + } + + // --- unsupported operations ---------------------------------------------- + + @Test + void enqueueMessage_returnsNotSupported() { + BooleanResponse response = service.enqueueMessage("q", "m1", "FRONT"); + assertEquals(1, response.getCode()); + assertNotNull(response.getMessage()); + assertTrue(response.getMessage().contains("not supported")); + } + + @Test + void moveMessage_returnsNotSupported() { + MessageMoveResponse response = service.moveMessage(new MessageMoveRequest()); + assertEquals(1, response.getCode()); + assertTrue(response.getMessage().contains("not supported")); + } + + @Test + void makeEmpty_returnsNotSupported() { + BooleanResponse response = service.makeEmpty("q", "queue:q"); + assertEquals(1, response.getCode()); + assertTrue(response.getMessage().contains("not supported")); + } + + // --- backend-agnostic operations ----------------------------------------- + + @Test + void getDataType_alwaysReportsStream() { + StringResponse response = service.getDataType("anything"); + assertEquals(0, response.getCode()); + assertEquals("STREAM", response.getVal()); + } + + @Test + void getLatestVersion_returnsEmptyPair() { + assertNotNull(service.getLatestVersion()); + } + + @Test + void aggregateDataCounter_dailyHasSelectionEntry() { + DataSelectorResponse response = service.aggregateDataCounter(AggregationType.DAILY); + assertNotNull(response); + assertEquals("Select Number of Days", response.getTitle()); + assertNotEquals(0, response.getData().size()); + } + + @Test + void aggregateDataCounter_weeklyHasSelectionEntry() { + DataSelectorResponse response = service.aggregateDataCounter(AggregationType.WEEKLY); + assertEquals("Select Number of Weeks", response.getTitle()); + assertNotEquals(0, response.getData().size()); + } + + @Test + void aggregateDataCounter_monthlyHasSelectionEntry() { + DataSelectorResponse response = service.aggregateDataCounter(AggregationType.MONTHLY); + assertEquals("Select Number of Months", response.getTitle()); + assertNotEquals(0, response.getData().size()); + } + + // --- reactive wrappers ---------------------------------------------------- + + @Test + void reactivePauseUnpauseQueue_delegatesToSync() { + QueueConfig config = + QueueConfig.builder().name("q").queueName("q").paused(false).build(); + when(systemConfigDao.getConfigByName("q", true)).thenReturn(config); + PauseUnpauseQueueRequest request = new PauseUnpauseQueueRequest(); + request.setName("q"); + request.setPause(true); + + BaseResponse response = service.reactivePauseUnpauseQueue(request).block(); + + assertNotNull(response); + assertEquals(0, response.getCode()); + verify(listenerContainer).pauseUnpauseQueue("q", true); + } + + @Test + void deleteReactiveMessage_delegatesToSync() { + when(metadataService.deleteMessage(eq("q"), eq("m1"), any(Duration.class))).thenReturn(true); + + BooleanResponse response = service.deleteReactiveMessage("q", "m1").block(); + + assertNotNull(response); + assertTrue(response.isValue()); + } +} diff --git a/rqueue-spring-boot-nats-example/src/main/java/com/github/sonus21/rqueue/example/MessageListener.java b/rqueue-spring-boot-nats-example/src/main/java/com/github/sonus21/rqueue/example/MessageListener.java index 38b5cd7c..70b0b923 100644 --- a/rqueue-spring-boot-nats-example/src/main/java/com/github/sonus21/rqueue/example/MessageListener.java +++ b/rqueue-spring-boot-nats-example/src/main/java/com/github/sonus21/rqueue/example/MessageListener.java @@ -17,6 +17,7 @@ package com.github.sonus21.rqueue.example; import com.github.sonus21.rqueue.annotation.RqueueListener; +import com.github.sonus21.rqueue.enums.QueueType; import com.github.sonus21.rqueue.utils.TimeoutUtils; import java.util.Random; import lombok.extern.slf4j.Slf4j; @@ -63,11 +64,16 @@ public void onSimpleMessage(String message) { execute("simple: {}", message, false); } + // Two listeners on the same queue with distinct consumerNames — exercises multi-listener + // fan-out. mode = QueueType.STREAM makes the underlying JetStream stream Limits-retention so + // both consumers can coexist (NATS rejects multiple non-filtered consumers on a WorkQueue + // stream with error 10099). @RqueueListener( value = "job-queue", deadLetterQueue = "job-queue-linkedin-dlq", numRetries = "2", concurrency = "10-20", + mode = QueueType.STREAM, consumerName = "linkedin-search") public void onJobMessage(Job job) { execute("job-queue-linkedin: {}", job, true); @@ -78,6 +84,7 @@ public void onJobMessage(Job job) { numRetries = "2", deadLetterQueue = "job-queue-google-dlq", concurrency = "10-20", + mode = QueueType.STREAM, consumerName = "google-search") public void onJobMessageGooglSearch(Job job) { execute("job-queue-google: {}", job, true); diff --git a/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/controller/ReactiveRqueueRestController.java b/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/controller/ReactiveRqueueRestController.java index 440ec408..1b905396 100644 --- a/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/controller/ReactiveRqueueRestController.java +++ b/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/controller/ReactiveRqueueRestController.java @@ -119,6 +119,7 @@ public Mono exploreQueue( request.getSrc(), request.getName(), request.getType(), + request.getConsumerName(), request.getPageNumber(), request.getItemPerPage()); } diff --git a/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/controller/RqueueRestController.java b/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/controller/RqueueRestController.java index 3436ffab..351a885f 100644 --- a/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/controller/RqueueRestController.java +++ b/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/controller/RqueueRestController.java @@ -117,6 +117,7 @@ public DataViewResponse exploreQueue( request.getSrc(), request.getName(), request.getType(), + request.getConsumerName(), request.getPageNumber(), request.getItemPerPage()); } diff --git a/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/service/RqueueQDetailServiceImpl.java b/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/service/RqueueQDetailServiceImpl.java index 5200e1e6..8f15d382 100644 --- a/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/service/RqueueQDetailServiceImpl.java +++ b/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/service/RqueueQDetailServiceImpl.java @@ -24,6 +24,7 @@ import com.github.sonus21.rqueue.core.RqueueMessage; import com.github.sonus21.rqueue.core.RqueueMessageTemplate; import com.github.sonus21.rqueue.core.spi.MessageBroker; +import com.github.sonus21.rqueue.core.spi.SubscriberView; import com.github.sonus21.rqueue.core.support.RqueueMessageUtils; import com.github.sonus21.rqueue.exception.UnknownSwitchCase; import com.github.sonus21.rqueue.listener.QueueDetail; @@ -40,8 +41,10 @@ import com.github.sonus21.rqueue.models.response.RedisDataDetail; import com.github.sonus21.rqueue.models.response.RowColumnMeta; import com.github.sonus21.rqueue.models.response.RowColumnMetaType; +import com.github.sonus21.rqueue.models.response.SubscriberRow; import com.github.sonus21.rqueue.models.response.TableColumn; import com.github.sonus21.rqueue.models.response.TableRow; +import com.github.sonus21.rqueue.models.response.TerminalStorageRow; import com.github.sonus21.rqueue.repository.MessageBrowsingRepository; import com.github.sonus21.rqueue.service.RqueueMessageMetadataService; import com.github.sonus21.rqueue.utils.Constants; @@ -181,28 +184,52 @@ public List> getQueueDataStructureDetail(QueueCon brokerQueueDetail != null && messageBroker.storageDisplayName(brokerQueueDetail) != null ? messageBroker.storageDisplayName(brokerQueueDetail) : queueConfig.getQueueName(); - List> queueRedisDataDetails = - newArrayList(new HashMap.SimpleEntry<>( - NavTab.PENDING, - new RedisDataDetail(pendingDisplayName, DataType.LIST, pending == null ? 0 : pending))); + List> queueRedisDataDetails = newArrayList(); + // Per-consumer pending breakdown for brokers that expose it (e.g. NATS Limits-retention + // streams where each durable consumer has its own offset). When present, render one row + // per consumer with an exact pending count instead of a single aggregated "~ N" row. + Map perConsumer = + brokerQueueDetail != null ? messageBroker.consumerPendingSizes(brokerQueueDetail) : null; + if (perConsumer != null && !perConsumer.isEmpty()) { + String label = brokerLabel(NavTab.PENDING, DataType.LIST); + for (Map.Entry entry : perConsumer.entrySet()) { + Long size = entry.getValue(); + RedisDataDetail consumerDetail = + new RedisDataDetail(pendingDisplayName, DataType.LIST, size == null ? 0 : size); + consumerDetail.setTypeLabel(label); + consumerDetail.setConsumerName(entry.getKey()); + // Per-consumer counts are exact (numPending or position math for that subscriber); + // the approximation flag only applies to the aggregated single-row view. + queueRedisDataDetails.add(new HashMap.SimpleEntry<>(NavTab.PENDING, consumerDetail)); + } + } else { + RedisDataDetail pendingDetail = + new RedisDataDetail(pendingDisplayName, DataType.LIST, pending == null ? 0 : pending); + pendingDetail.setTypeLabel(brokerLabel(NavTab.PENDING, DataType.LIST)); + if (brokerQueueDetail != null) { + pendingDetail.setApproximate(messageBroker.isSizeApproximate(brokerQueueDetail)); + } + queueRedisDataDetails.add(new HashMap.SimpleEntry<>(NavTab.PENDING, pendingDetail)); + } // Brokers that manage their own in-flight tracking (e.g. NATS JetStream) have no separate // processing ZSET, so omit the RUNNING entry to avoid a 501 when the explorer opens it. if (!brokerHidesRunning()) { String processingQueueName = queueConfig.getProcessingQueueName(); Long running = messageBrowsingRepository.getDataSize(processingQueueName, DataType.ZSET); - queueRedisDataDetails.add(new HashMap.SimpleEntry<>( - NavTab.RUNNING, - new RedisDataDetail(processingQueueName, DataType.ZSET, running == null ? 0 : running))); + RedisDataDetail runningDetail = + new RedisDataDetail(processingQueueName, DataType.ZSET, running == null ? 0 : running); + runningDetail.setTypeLabel(brokerLabel(NavTab.RUNNING, DataType.ZSET)); + queueRedisDataDetails.add(new HashMap.SimpleEntry<>(NavTab.RUNNING, runningDetail)); } String scheduledQueueName = queueConfig.getScheduledQueueName(); // When the broker doesn't support scheduled introspection (e.g. JetStream), suppress // the SCHEDULED nav tab entry entirely so the dashboard doesn't query an absent ZSET. if (!brokerHidesScheduled()) { Long scheduled = messageBrowsingRepository.getDataSize(scheduledQueueName, DataType.ZSET); - queueRedisDataDetails.add(new HashMap.SimpleEntry<>( - NavTab.SCHEDULED, - new RedisDataDetail( - scheduledQueueName, DataType.ZSET, scheduled == null ? 0 : scheduled))); + RedisDataDetail scheduledDetail = + new RedisDataDetail(scheduledQueueName, DataType.ZSET, scheduled == null ? 0 : scheduled); + scheduledDetail.setTypeLabel(brokerLabel(NavTab.SCHEDULED, DataType.ZSET)); + queueRedisDataDetails.add(new HashMap.SimpleEntry<>(NavTab.SCHEDULED, scheduledDetail)); } if (!CollectionUtils.isEmpty(queueConfig.getDeadLetterQueues())) { for (DeadLetterQueue dlq : queueConfig.getDeadLetterQueues()) { @@ -210,16 +237,17 @@ public List> getQueueDataStructureDetail(QueueCon && messageBroker.dlqStorageDisplayName(brokerQueueDetail) != null ? messageBroker.dlqStorageDisplayName(brokerQueueDetail) : dlq.getName(); + RedisDataDetail dlqDetail; if (!dlq.isConsumerEnabled()) { Long dlqSize = messageBrowsingRepository.getDataSize(dlq.getName(), DataType.LIST); - queueRedisDataDetails.add(new HashMap.SimpleEntry<>( - NavTab.DEAD, - new RedisDataDetail(dlqDisplayName, DataType.LIST, dlqSize == null ? 0 : dlqSize))); + dlqDetail = + new RedisDataDetail(dlqDisplayName, DataType.LIST, dlqSize == null ? 0 : dlqSize); } else { // TODO should we redirect to the queue page? - queueRedisDataDetails.add(new HashMap.SimpleEntry<>( - NavTab.DEAD, new RedisDataDetail(dlqDisplayName, DataType.LIST, -1))); + dlqDetail = new RedisDataDetail(dlqDisplayName, DataType.LIST, -1); } + dlqDetail.setTypeLabel(brokerLabel(NavTab.DEAD, DataType.LIST)); + queueRedisDataDetails.add(new HashMap.SimpleEntry<>(NavTab.DEAD, dlqDetail)); } } if (rqueueConfig.messageInTerminalStateShouldBeStored() @@ -230,14 +258,23 @@ public List> getQueueDataStructureDetail(QueueCon brokerQueueDetail != null && messageBroker.storageDisplayName(brokerQueueDetail) != null ? messageBroker.storageDisplayName(brokerQueueDetail) : queueConfig.getCompletedQueueName(); - queueRedisDataDetails.add(new HashMap.SimpleEntry<>( - NavTab.COMPLETED, - new RedisDataDetail( - completedDisplayName, DataType.ZSET, completed == null ? 0 : completed))); + RedisDataDetail completedDetail = new RedisDataDetail( + completedDisplayName, DataType.ZSET, completed == null ? 0 : completed); + completedDetail.setTypeLabel(brokerLabel(NavTab.COMPLETED, DataType.ZSET)); + queueRedisDataDetails.add(new HashMap.SimpleEntry<>(NavTab.COMPLETED, completedDetail)); } return queueRedisDataDetails; } + /** + * Resolve the broker-specific human-readable label for the given (NavTab, DataType) pair. + * Returns {@code null} on the legacy Redis path so the template falls back to + * {@code DataType.name()} (LIST/ZSET). + */ + private String brokerLabel(NavTab tab, DataType type) { + return messageBroker != null ? messageBroker.dataTypeLabel(tab, type) : null; + } + @Override public List getNavTabs(QueueConfig queueConfig) { List navTabs = new ArrayList<>(); @@ -352,7 +389,12 @@ private void addActionsIfRequired( @Override public DataViewResponse getExplorePageData( - String src, String name, DataType type, int pageNumber, int itemPerPage) { + String src, + String name, + DataType type, + String consumerName, + int pageNumber, + int itemPerPage) { QueueConfig queueConfig = rqueueSystemManagerService.getQueueConfig(src); DataViewResponse response = new DataViewResponse(); boolean deadLetterQueue = queueConfig.isDeadLetterQueue(name); @@ -376,7 +418,7 @@ public DataViewResponse getExplorePageData( QueueDetail qd = lookupQueueDetail(queueConfig.getName()); if (qd != null) { long offset = (long) pageNumber * itemPerPage; - List peeked = messageBroker.peek(qd, offset, itemPerPage); + List peeked = messageBroker.peek(qd, consumerName, offset, itemPerPage); List> tuples = peeked.stream() .map(m -> (TypedTuple) new DefaultTypedTuple<>(m, null)) .collect(Collectors.toList()); @@ -465,6 +507,12 @@ private void setHeadersIfRequired( @Override public List> getRunningTasks() { + // Brokers that manage in-flight tracking internally (e.g. NATS JetStream durable consumers) + // have no separate processing ZSET to report on. Surface an empty table with just the header + // row so the home dashboard shows the section but doesn't render a column of zeros. + if (brokerHidesRunning()) { + return emptyTable("Processing"); + } return bulkSizeTable( rqueueSystemManagerService.getSortedQueueConfigs(), QueueConfig::getProcessingQueueName, @@ -483,6 +531,11 @@ public List> getWaitingTasks() { @Override public List> getScheduledTasks() { + // Brokers without scheduled-queue introspection (e.g. NATS JetStream) have no scheduled ZSET. + // Return an empty table so the home dashboard doesn't query an absent data structure. + if (brokerHidesScheduled()) { + return emptyTable("Scheduled"); + } return bulkSizeTable( rqueueSystemManagerService.getSortedQueueConfigs(), QueueConfig::getScheduledQueueName, @@ -490,6 +543,17 @@ public List> getScheduledTasks() { "Scheduled [ZSET]"); } + /** + * Header-only table used when a broker capability suppresses an entire section (e.g. + * NATS hiding the running / scheduled rows). The frontend renders the column header and + * no body rows. + */ + private List> emptyTable(String section) { + List> rows = new ArrayList<>(); + rows.add(Arrays.asList("Queue", section, "Number of Messages")); + return rows; + } + /** * Render the home-dashboard "queue / data-name / count" 3-column table for a per-queue data * structure. The repository's {@link MessageBrowsingRepository#getDataSizes(List, List)} is @@ -576,10 +640,187 @@ public List getQueueWorkers(String queueName) { return rqueueWorkerRegistry.getQueueWorkers(queueName); } + // ------------------------------------------------------------------------- + // Subscriber + Terminal Storage rows (new queue-detail UI) + // ------------------------------------------------------------------------- + + /** + * Build the per-subscriber rows that drive the new "Subscribers" section. Joins broker SPI + * data ({@link MessageBroker#subscribers}) with last-active info from the worker registry, + * keyed on {@code consumerName}. Falls back to a single anonymous row when the queue has + * no registered handlers (e.g. a producer-only deployment). + */ + @Override + public List getSubscriberRows(QueueConfig queueConfig) { + if (queueConfig == null) { + return Collections.emptyList(); + } + QueueDetail brokerQueueDetail = + messageBroker != null ? lookupQueueDetail(queueConfig.getName()) : null; + List views = brokerSubscribers(queueConfig, brokerQueueDetail); + if (views.isEmpty()) { + return Collections.emptyList(); + } + String storageName = + brokerQueueDetail != null && messageBroker.storageDisplayName(brokerQueueDetail) != null + ? messageBroker.storageDisplayName(brokerQueueDetail) + : queueConfig.getQueueName(); + String label = brokerLabel(NavTab.PENDING, DataType.LIST); + Map workersByConsumer = + indexWorkersByConsumer(queueConfig.getName()); + Map workerCountByConsumer = countWorkersByConsumer(queueConfig.getName()); + List rows = new ArrayList<>(views.size()); + long now = System.currentTimeMillis(); + for (SubscriberView v : views) { + SubscriberRow.SubscriberRowBuilder builder = SubscriberRow.builder() + .consumerName(v.consumerName()) + .typeLabel(label) + .storageName(storageName) + .dataType(DataType.LIST) + .pending(v.pending()) + .pendingShared(v.pendingShared()) + .inFlight(v.inFlight()) + .workerCount(workerCountByConsumer.getOrDefault(v.consumerName(), 0)); + RqueueWorkerPollerView w = workersByConsumer.get(v.consumerName()); + if (w != null) { + builder + .status(w.getStatus()) + .host(w.getHost()) + .pid(w.getPid()) + .lastPollAt(w.getLastPollAt()); + if (w.getLastPollAt() > 0) { + builder.lastPollAge(DateTimeUtils.milliToHumanRepresentation(now - w.getLastPollAt())); + } + } + rows.add(builder.build()); + } + return rows; + } + + /** + * Count the live worker threads bucketed by {@code consumerName}. Mirrors the bucketing that + * {@link #indexWorkersByConsumer(String)} does but keeps the count instead of collapsing to + * the most-recently polling worker — surfaces the {@code @RqueueListener.concurrency} fanout + * separately from the row's representative worker (host / pid / lastPollAt). + */ + private Map countWorkersByConsumer(String queueName) { + List workers = rqueueWorkerRegistry.getQueueWorkers(queueName); + if (CollectionUtils.isEmpty(workers)) { + return Collections.emptyMap(); + } + Map out = new HashMap<>(); + for (RqueueWorkerPollerView w : workers) { + String key = w.getConsumerName(); + if (key == null || key.isEmpty()) { + continue; + } + out.merge(key, 1, Integer::sum); + } + return out; + } + + private List brokerSubscribers( + QueueConfig queueConfig, QueueDetail brokerQueueDetail) { + if (brokerQueueDetail != null && messageBroker != null) { + try { + List views = messageBroker.subscribers(brokerQueueDetail); + if (views != null && !views.isEmpty()) { + return views; + } + } catch (RuntimeException ignored) { + // fall through to producer-only path + } + } + // No active QueueDetail registered (producer-only or shutdown). Surface a single row so + // the operator at least sees the queue's pending count from the repository fallback. + Long pending = messageBrowsingRepository.getDataSize(queueConfig.getQueueName(), DataType.LIST); + if (pending == null || pending <= 0) { + return Collections.emptyList(); + } + return Collections.singletonList(new SubscriberView(queueConfig.getName(), pending, 0L, true)); + } + + private Map indexWorkersByConsumer(String queueName) { + List workers = rqueueWorkerRegistry.getQueueWorkers(queueName); + if (CollectionUtils.isEmpty(workers)) { + return Collections.emptyMap(); + } + Map out = new HashMap<>(workers.size()); + for (RqueueWorkerPollerView w : workers) { + String key = w.getConsumerName(); + if (key == null || key.isEmpty()) { + continue; + } + RqueueWorkerPollerView existing = out.get(key); + if (existing == null || w.getLastPollAt() > existing.getLastPollAt()) { + out.put(key, w); + } + } + return out; + } + + /** + * Terminal-storage rows: COMPLETED set + each DLQ. These are shared across subscribers, so + * they live in their own table rather than being repeated on every subscriber row. + */ + @Override + public List getTerminalRows(QueueConfig queueConfig) { + if (queueConfig == null) { + return Collections.emptyList(); + } + List out = new ArrayList<>(); + QueueDetail brokerQueueDetail = + messageBroker != null ? lookupQueueDetail(queueConfig.getName()) : null; + if (rqueueConfig.messageInTerminalStateShouldBeStored() + && !StringUtils.isEmpty(queueConfig.getCompletedQueueName())) { + Long completed = + messageBrowsingRepository.getDataSize(queueConfig.getCompletedQueueName(), DataType.ZSET); + String completedDisplayName = + brokerQueueDetail != null && messageBroker.storageDisplayName(brokerQueueDetail) != null + ? messageBroker.storageDisplayName(brokerQueueDetail) + : queueConfig.getCompletedQueueName(); + out.add(TerminalStorageRow.builder() + .tab(NavTab.COMPLETED) + .typeLabel(brokerLabel(NavTab.COMPLETED, DataType.ZSET)) + .storageName(completedDisplayName) + .dataType(DataType.ZSET) + .size(completed == null ? 0L : completed) + .build()); + } + if (!CollectionUtils.isEmpty(queueConfig.getDeadLetterQueues())) { + for (DeadLetterQueue dlq : queueConfig.getDeadLetterQueues()) { + String dlqDisplayName = brokerQueueDetail != null + && messageBroker.dlqStorageDisplayName(brokerQueueDetail) != null + ? messageBroker.dlqStorageDisplayName(brokerQueueDetail) + : dlq.getName(); + long size; + if (dlq.isConsumerEnabled()) { + size = -1L; + } else { + Long dlqSize = messageBrowsingRepository.getDataSize(dlq.getName(), DataType.LIST); + size = dlqSize == null ? 0L : dlqSize; + } + out.add(TerminalStorageRow.builder() + .tab(NavTab.DEAD) + .typeLabel(brokerLabel(NavTab.DEAD, DataType.LIST)) + .storageName(dlqDisplayName) + .dataType(DataType.LIST) + .size(size) + .build()); + } + } + return out; + } + @Override public Mono getReactiveExplorePageData( - String src, String name, DataType type, int pageNumber, int itemPerPage) { - return Mono.just(getExplorePageData(src, name, type, pageNumber, itemPerPage)); + String src, + String name, + DataType type, + String consumerName, + int pageNumber, + int itemPerPage) { + return Mono.just(getExplorePageData(src, name, type, consumerName, pageNumber, itemPerPage)); } @Override diff --git a/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/service/RqueueViewControllerServiceImpl.java b/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/service/RqueueViewControllerServiceImpl.java index 7f993cc2..ac7e15ba 100644 --- a/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/service/RqueueViewControllerServiceImpl.java +++ b/rqueue-web/src/main/java/com/github/sonus21/rqueue/web/service/RqueueViewControllerServiceImpl.java @@ -18,6 +18,8 @@ import com.github.sonus21.rqueue.config.RqueueConfig; import com.github.sonus21.rqueue.config.RqueueWebConfig; +import com.github.sonus21.rqueue.core.spi.Capabilities; +import com.github.sonus21.rqueue.core.spi.MessageBroker; import com.github.sonus21.rqueue.models.Pair; import com.github.sonus21.rqueue.models.db.QueueConfig; import com.github.sonus21.rqueue.models.enums.AggregationType; @@ -52,6 +54,13 @@ public class RqueueViewControllerServiceImpl implements RqueueViewControllerServ private final RqueueUtilityService rqueueUtilityService; private final RqueueSystemManagerService rqueueSystemManagerService; + /** + * Optional broker SPI. When set (non-Redis backend), {@link #addBasicDetails(Model, String)} + * propagates {@link Capabilities} flags to every view template so the navigation, charts, and + * other panels can hide unsupported sections globally. + */ + private MessageBroker messageBroker; + @Autowired public RqueueViewControllerServiceImpl( RqueueConfig rqueueConfig, @@ -66,6 +75,11 @@ public RqueueViewControllerServiceImpl( this.rqueueSystemManagerService = rqueueSystemManagerService; } + @Autowired(required = false) + public void setMessageBroker(MessageBroker messageBroker) { + this.messageBroker = messageBroker; + } + private void addNavData(Model model, NavTab tab) { for (NavTab navTab : NavTab.values()) { String name = navTab.name().toLowerCase() + "Active"; @@ -73,6 +87,14 @@ private void addNavData(Model model, NavTab tab) { } } + /** + * Resolved capabilities for the active broker. Defaults to {@link Capabilities#REDIS_DEFAULTS} + * (everything supported) so the legacy no-broker path keeps the historical UI. + */ + private Capabilities capabilities() { + return messageBroker != null ? messageBroker.capabilities() : Capabilities.REDIS_DEFAULTS; + } + private void addBasicDetails(Model model, String xForwardedPrefix) { Pair releaseAndVersion = rqueueUtilityService.getLatestVersion(); model.addAttribute("releaseLink", releaseAndVersion.getFirst()); @@ -81,6 +103,16 @@ private void addBasicDetails(Model model, String xForwardedPrefix) { model.addAttribute("timeInMilli", System.currentTimeMillis()); model.addAttribute("version", rqueueConfig.getLibVersion()); model.addAttribute("urlPrefix", rqueueWebConfig.getUrlPrefix(xForwardedPrefix)); + // Capability-driven UI hide flags. Templates default to "show" when these are absent / + // false, matching the historical Redis behavior. + Capabilities caps = capabilities(); + model.addAttribute("hideScheduledPanel", !caps.supportsScheduledIntrospection()); + model.addAttribute("hideRunningPanel", !caps.usesPrimaryHandlerDispatch()); + model.addAttribute("hideCronJobs", !caps.supportsCronJobs()); + // Charts always render; NATS deployments will show empty until counters accumulate. + model.addAttribute("hideCharts", false); + model.addAttribute("storageKicker", rqueueQDetailService.storageKicker()); + model.addAttribute("storageDescription", rqueueQDetailService.storageDescription()); } @Override @@ -217,6 +249,10 @@ public void queueDetail(Model model, String xForwardedPrefix, String queueName) model.addAttribute("typeSelectors", ChartDataType.getActiveCharts()); model.addAttribute("queueActions", queueActions); model.addAttribute("queueRedisDataDetails", queueRedisDataDetail); + // New per-subscriber + terminal-storage rows; the template renders these in place of the + // legacy data-structure table when present. + model.addAttribute("subscribers", rqueueQDetailService.getSubscriberRows(queueConfig)); + model.addAttribute("terminalRows", rqueueQDetailService.getTerminalRows(queueConfig)); model.addAttribute("config", queueConfig); model.addAttribute("workerRegistryEnabled", rqueueConfig.isWorkerRegistryEnabled()); model.addAttribute("queueWorkers", queueWorkers); diff --git a/rqueue-web/src/main/resources/public/rqueue/css/rqueue.css b/rqueue-web/src/main/resources/public/rqueue/css/rqueue.css index 60af65a5..1f7e68d6 100644 --- a/rqueue-web/src/main/resources/public/rqueue/css/rqueue.css +++ b/rqueue-web/src/main/resources/public/rqueue/css/rqueue.css @@ -1946,3 +1946,432 @@ section { font-size: 35px; cursor: pointer; } + + +/* ============================================================================ + Queue Detail — compact, table-driven, action-first + ========================================================================== */ + +.qd { + padding-bottom: 32px; +} + +/* ----- Header -------------------------------------------------------------- + Single horizontal bar: name + state pill + pause toggle, summary stats on + the right. Replaces the big hero so the data is visible without scrolling. +*/ +.qd-header { + align-items: center; + border-bottom: 1px solid #e0e3d4; + display: flex; + flex-wrap: wrap; + gap: 16px; + justify-content: space-between; + margin: 8px 0 16px; + padding-bottom: 12px; +} + +.qd-header-title { + align-items: center; + display: flex; + flex-wrap: wrap; + gap: 12px; +} + +.qd-name { + color: #273126; + font-size: 26px; + font-weight: 800; + letter-spacing: -0.01em; + margin: 0; + word-break: break-word; +} + +.qd-state { + align-items: center; + border-radius: 999px; + display: inline-flex; + font-size: 12px; + font-weight: 700; + gap: 4px; + letter-spacing: 0.06em; + padding: 3px 10px; + text-transform: uppercase; +} + +.qd-state .bx { + font-size: 14px; +} + +.qd-state-live { + background: #e7efdb; + color: #4a6b35; +} + +.qd-state-paused { + background: #fbe5d9; + color: #b45433; +} + +/* Pause / play action button next to the state pill. Reuses the existing + .pause-queue-btn handler so a click POSTs to /pause-unpause-queue. */ +.qd-pause-btn { + align-items: center; + background: #f6f7ee; + border: 1px solid #d8dbcb; + border-radius: 8px; + color: #4a5d44; + cursor: pointer; + display: inline-flex; + height: 34px; + justify-content: center; + padding: 0; + transition: background 0.15s ease, border-color 0.15s ease, color 0.15s ease; + width: 34px; +} + +.qd-pause-btn:hover { + background: #eef0e3; + border-color: #b4bba0; + color: #2f3d2b; +} + +.qd-pause-btn:focus-visible { + outline: 2px solid #6d8a52; + outline-offset: 2px; +} + +.qd-pause-btn .pause-queue-btn { + cursor: pointer; + font-size: 22px; +} + +.qd-header-stats { + align-items: baseline; + color: #4a5d44; + display: inline-flex; + flex-wrap: wrap; + font-size: 14px; + gap: 8px; +} + +.qd-stat strong { + color: #273126; + font-size: 17px; + font-weight: 800; + margin-right: 4px; +} + +.qd-stat-sep { + color: #c0c5b0; + font-weight: 700; +} + +/* ----- Configuration chip strip ------------------------------------------- */ +.qd-config { + background: #fbfbf6; + border: 1px solid #e0e3d4; + border-radius: 10px; + display: grid; + gap: 0; + grid-template-columns: repeat(auto-fit, minmax(140px, 1fr)); + margin-bottom: 20px; + padding: 12px 16px; +} + +.qd-config-cell { + border-right: 1px dashed #e3e7d8; + display: flex; + flex-direction: column; + gap: 2px; + padding: 4px 12px 4px 0; +} + +.qd-config-cell:last-child { + border-right: none; +} + +.qd-config-cell-meta .qd-config-value { + font-size: 12px; + font-weight: 600; +} + +.qd-config-label { + align-items: center; + color: #6d7561; + display: inline-flex; + font-size: 10px; + font-weight: 700; + gap: 4px; + letter-spacing: 0.08em; + text-transform: uppercase; +} + +.qd-config-label .bx { + font-size: 14px; +} + +.qd-config-value { + color: #273126; + font-size: 14px; + font-weight: 700; +} + +.qd-config-value .bx { + vertical-align: middle; +} + +/* ----- Section ------------------------------------------------------------- + Tight section header (no kicker pill) and a table directly under it. +*/ +.qd-section { + margin: 18px 0; +} + +.qd-section-head { + align-items: baseline; + border-bottom: 1px solid #eef0e3; + display: flex; + flex-wrap: wrap; + gap: 12px; + justify-content: space-between; + margin-bottom: 8px; + padding-bottom: 6px; +} + +.qd-section-title { + color: #273126; + font-size: 16px; + font-weight: 800; + letter-spacing: -0.005em; + margin: 0; +} + +.qd-count { + background: #f1f3e8; + border-radius: 999px; + color: #4a5d44; + font-size: 11px; + font-weight: 700; + margin-left: 6px; + padding: 2px 8px; +} + +.qd-section-hint { + color: #6d7561; + font-size: 12px; +} + +.qd-empty { + background: #fbfbf6; + border: 1px dashed #d8dbcb; + border-radius: 8px; + color: #6d7561; + font-size: 13px; + padding: 16px; + text-align: center; +} + +/* ----- Tables -------------------------------------------------------------- */ +.qd-table { + border-collapse: separate; + border-spacing: 0; + font-size: 13px; + width: 100%; +} + +.qd-table thead th { + background: #f6f7ee; + border-bottom: 1px solid #d8dbcb; + border-top: 1px solid #d8dbcb; + color: #4a5d44; + font-size: 11px; + font-weight: 700; + letter-spacing: 0.06em; + padding: 8px 10px; + text-align: left; + text-transform: uppercase; +} + +.qd-table thead th:first-child { + border-top-left-radius: 8px; +} + +.qd-table thead th:last-child { + border-top-right-radius: 8px; +} + +.qd-table thead th.qd-num, +.qd-table tbody td.qd-num { + text-align: right; + white-space: nowrap; +} + +.qd-table tbody td { + border-bottom: 1px solid #eef0e3; + color: #273126; + padding: 10px; + vertical-align: top; +} + +.qd-table tbody tr:hover { + background: #fafbf3; +} + +.qd-table tbody tr:last-child td:first-child { + border-bottom-left-radius: 8px; +} + +.qd-table tbody tr:last-child td:last-child { + border-bottom-right-radius: 8px; +} + +.qd-num strong { + color: #273126; + font-size: 15px; + font-weight: 800; +} + +.qd-num .qd-muted { + display: inline-block; + margin-left: 4px; +} + +.qd-link { + color: #4a5d44; + font-weight: 700; + text-decoration: none; +} + +.qd-link:hover, +.qd-link:focus { + color: #2f3d2b; + text-decoration: underline; +} + +.qd-pill { + background: #f1f3e8; + border-radius: 999px; + color: #4a5d44; + display: inline-block; + font-size: 11px; + font-weight: 700; + letter-spacing: 0.04em; + padding: 2px 8px; + white-space: nowrap; +} + +.qd-code { + background: #f6f7ee; + border-radius: 4px; + color: #2f3d2b; + font-family: "SFMono-Regular", Consolas, "Liberation Mono", Menlo, monospace; + font-size: 12px; + padding: 1px 6px; + word-break: break-all; +} + +.qd-muted { + color: #6d7561; + font-size: 11px; +} + +/* Status badges (inline-table version of worker-status-*) */ +.qd-status { + border-radius: 999px; + display: inline-block; + font-size: 11px; + font-weight: 700; + letter-spacing: 0.06em; + padding: 2px 8px; + text-transform: uppercase; +} + +.qd-status-active { + background: #d6e7c5; + color: #3e5a2c; +} + +.qd-status-stale { + background: #f4e3c5; + color: #7a5a26; +} + +.qd-status-paused { + background: #fbe5d9; + color: #b45433; +} + +.qd-poll-time { + color: #273126; + font-weight: 600; +} + +/* Bucket label (Terminal Storage table) */ +.qd-bucket { + border-radius: 4px; + display: inline-block; + font-size: 11px; + font-weight: 800; + letter-spacing: 0.08em; + padding: 2px 8px; + text-transform: uppercase; +} + +.qd-bucket-completed { + background: #d6e7c5; + color: #3e5a2c; +} + +.qd-bucket-dead { + background: #fbe5d9; + color: #b45433; +} + +/* ----- Charts disclosure --------------------------------------------------- */ +.qd-charts { + background: #fbfbf6; + border: 1px solid #e0e3d4; + border-radius: 10px; + margin-top: 20px; +} + +.qd-charts-head { + align-items: baseline; + cursor: pointer; + display: flex; + gap: 12px; + justify-content: space-between; + list-style: none; + padding: 12px 16px; +} + +.qd-charts-head::-webkit-details-marker { + display: none; +} + +.qd-charts[open] .qd-charts-head { + border-bottom: 1px solid #eef0e3; +} + +.qd-charts-body { + padding: 16px; +} + +@media (max-width: 720px) { + .qd-header { + flex-direction: column; + align-items: flex-start; + } + .qd-config-cell { + border-right: none; + border-bottom: 1px dashed #e3e7d8; + padding: 6px 0; + } + .qd-config-cell:last-child { + border-bottom: none; + } + .qd-table { + font-size: 12px; + } +} diff --git a/rqueue-web/src/main/resources/public/rqueue/js/rqueue.js b/rqueue-web/src/main/resources/public/rqueue/js/rqueue.js index e140efba..f1aab5d3 100644 --- a/rqueue-web/src/main/resources/public/rqueue/js/rqueue.js +++ b/rqueue-web/src/main/resources/public/rqueue/js/rqueue.js @@ -18,6 +18,8 @@ var queueName = null; var dataPageUrl = null; var dataKey = null; var dataName = null; +var dataTypeLabel = null; +var dataConsumer = null; var deleteActionMessage = null; var dataType = null; var currentPage = 0; @@ -185,7 +187,14 @@ function exploreData() { let element = $(this); dataName = element.data('name'); dataType = element.data('type'); + // Backend-aware human label set by the template (e.g. "Queue (Stream)" for NATS). + // Falls back to the Redis-shaped DataType when the attribute isn't present. + dataTypeLabel = element.data('type-label') || element.data('type'); dataKey = element.data('key'); + // Optional per-subscriber consumer name. When the queue has competing consumers + // (Limits-retention streams) this lets the server start pagination from this + // consumer's next undelivered sequence instead of the stream's first sequence. + dataConsumer = element.data('consumer'); } function displayHeader(response, displayPageNumberEl, pageSize) { @@ -385,7 +394,8 @@ function displayTable(nextOrPrev) { 'page': pageNumber, 'type': dataType, 'name': dataName, - 'key': dataKey + 'key': dataKey, + 'consumerName': dataConsumer }; ajaxRequest(getAbsoluteUrl(dataPageUrl), 'POST', data, function (response) { @@ -569,7 +579,11 @@ function updateDeleteModal() { } function deleteMessage() { - let id = $($($($(this).parent()).parent()).children()[0]).text(); + // The delete button is wrapped in inside the action cell, + // which is a direct child of the . Walk up: button → div → td → tr, then read the first + // cell (the message id). The earlier two-parent walk landed on the td and read "Delete" + // (the wrapping div's text) as the id. + let id = $($(this).closest('tr').children()[0]).text().trim(); let payload = { "queue": queueName, "message_id": id, @@ -588,11 +602,11 @@ function deleteMessage() { } function enqueueMessage() { - enqueueMessageAtPosition($($(this).parent()).parent(), 'FRONT'); + enqueueMessageAtPosition($(this).closest('tr'), 'FRONT'); } function enqueueRearMessage() { - enqueueMessageAtPosition($($(this).parent()).parent(), 'REAR'); + enqueueMessageAtPosition($(this).closest('tr'), 'REAR'); } function enqueueMessageAtPosition(rowEl, position) { diff --git a/rqueue-web/src/main/resources/templates/rqueue/base.html b/rqueue-web/src/main/resources/templates/rqueue/base.html index add22e71..a3cfcf38 100644 --- a/rqueue-web/src/main/resources/templates/rqueue/base.html +++ b/rqueue-web/src/main/resources/templates/rqueue/base.html @@ -53,12 +53,17 @@

Rqueue

Workers + {# Hidden when the active broker reports !usesPrimaryHandlerDispatch (e.g. JetStream + durable consumers manage in-flight tracking internally — there is no separate + processing ZSET to inspect). Defaults to visible for the Redis backend. #} + {% if not hideRunningPanel %}
  • Running
  • + {% endif %} {# Hidden when the active broker reports !supportsScheduledIntrospection (e.g. JetStream). Defaults to visible (hideScheduledPanel == null/false) for the Redis backend. #} {% if not hideScheduledPanel %} diff --git a/rqueue-web/src/main/resources/templates/rqueue/index.html b/rqueue-web/src/main/resources/templates/rqueue/index.html index 91aa30a0..21e19d1b 100644 --- a/rqueue-web/src/main/resources/templates/rqueue/index.html +++ b/rqueue-web/src/main/resources/templates/rqueue/index.html @@ -1,8 +1,7 @@ {% extends 'base' %} {% block main %} -{% include 'stats_chart' %} +{% include 'stats_chart' %}
    {% include 'latency_chart' %} {% endblock %} @@ -38,4 +38,3 @@ }); {% endblock %} - diff --git a/rqueue-web/src/main/resources/templates/rqueue/queue_detail.html b/rqueue-web/src/main/resources/templates/rqueue/queue_detail.html index 7e97f934..780a711b 100644 --- a/rqueue-web/src/main/resources/templates/rqueue/queue_detail.html +++ b/rqueue-web/src/main/resources/templates/rqueue/queue_detail.html @@ -16,168 +16,215 @@ ~ --> -
    -
    - +
    + + {# ----- Compact header: name, state pill, pause toggle, key stats -------- #} +
    +
    +

    + {% if config != null %}{{config.name}}{% else %}{{queueName}}{% endif %} +

    + + + {% if config != null and config.paused %}Paused{% else %}Live{% endif %} + + {% if config != null %} + + {% endif %} +
    +
    + {{ subscribers | length }} subscribers + · + {% if subscribers is not empty %}{{ subscribers[0].pending }}{% else %}0{% endif %} pending + · + + + {% set totalInFlight = 0 %} + {% for sub in subscribers %}{% set totalInFlight = totalInFlight + sub.inFlight %}{% endfor %} + {{ totalInFlight }} + + in-flight + +
    +
    + + {# ----- Configuration chip strip ---------------------------------------- #} + {% if config != null %} +
    +
    + Concurrency + + {% if config.concurrency.min == -1 and config.concurrency.max == -1 %} + Unbounded + {% else %} + {{config.concurrency.min}}–{{config.concurrency.max}} + {% endif %} + +
    +
    + Retries + + {% if config.unlimitedRetry %} Unlimited{% else %}{{config.numRetry}}{% endif %} + +
    +
    + Visibility + {{duration(config.visibilityTimeout)}} +
    +
    + DLQ + + {% if config.deadLetterQueues is empty %}{% else %}{{ dlq(config.deadLetterQueues) }}{% endif %} + +
    +
    + Created + {{ time(config.createdOn) }} +
    +
    + Updated + {{ time(config.updatedOn) }} +
    +
    + {% endif %} + + {# ----- Subscribers table ----------------------------------------------- #} +
    +
    +

    Subscribers {{ subscribers | length }}

    + Click a consumer to browse its messages. +
    + {% if subscribers is empty %} +
    No subscribers attached yet.
    + {% else %} +
    - - - - - - - + + + + + + + + + + {% for sub in subscribers %} - {% if config == null %} - {{ queueName }} - {% else %} - + + + + + - + - - - {% endif %} + {% endfor %}
    QueueConcurrencyRetry CountVisibility TimeoutDead Letter queue(s)Created OnUpdated OnConsumerTypeStoragePendingIn-FlightWorkersStatusHostLast Poll
    {{config.name}} - {% if config.concurrency.min == -1 and config.concurrency.max == -1 %} - - - - {% else %} - {{config.concurrency.min}},{{config.concurrency.max}} - {% endif %} + {{sub.consumerName}} + {{ sub.typeLabel | default(sub.dataType) }}{{sub.storageName}} + {{sub.pending}} + {% if sub.pendingShared %}shared{% endif %} + {{sub.inFlight}} + {% if sub.workerCount > 0 %} + {{sub.workerCount}} + {% else %}{% endif %} - {% if config.unlimitedRetry %} - - - - {% else %} - {{config.numRetry}} - {% endif %} + {% if sub.status %} + {{sub.status}} + {% else %}{% endif %} {{duration(config.visibilityTimeout)}} - {{ dlq(config.deadLetterQueues) }} + {% if sub.host %}{{sub.host}}{% if sub.pid %} / {{sub.pid}}{% endif %}{% else %}{% endif %} + + {% if sub.lastPollAt > 0 %} +
    {{ time(sub.lastPollAt) }}
    + {% if sub.lastPollAge %}{{sub.lastPollAge}} ago{% endif %} + {% else %}{% endif %}
    {{ time(config.createdOn) }}{{ time(config.updatedOn) }}
    - + {% endif %} + + + {# ----- Terminal storage table ------------------------------------------ #} + {% if terminalRows is not empty %} +
    +
    +

    Terminal Storage {{ terminalRows | length }}

    + Shared buckets — completed and dead-letter messages. +
    +
    - - - - + + + + - {% for queueData in queueRedisDataDetails %} + {% for row in terminalRows %} - - + + {% endfor %}
    Job TypeData TypeNameSizeBucketTypeStorageSize
    - - {{queueData.key.name}} - + {{row.tab}} {{queueData.value.type}}{{queueData.value.name}}{{ row.typeLabel | default(row.dataType) }} - {% if queueData.value.size < 0 %} - Queue-backed + {{row.storageName}} + + {% if row.size < 0 %} + Queue-backed {% else %} - {{queueData.value.size}} + {% if row.approximate %}~{% endif %} + {{row.size}} {% endif %}
    -
    -
    -
    -{% if workerRegistryEnabled %} -
    -

    Queue Pollers

    -
    - - - - - - - - - - - - - - - -
    Active PollersStale PollersRecent Exhaustion
    {{activeQueueWorkers}}{{staleQueueWorkers}}{{queueWorkerRecentCapacityExhausted}}
    - {% if queueWorkers is empty %} - -
    -{% endif %} -
    -{% include 'stats_chart' %} -
    -{% include 'latency_chart' %} + -{% include 'data_explorer_modal' %} + {% include 'data_explorer_modal' %} +
    {% endblock %} {% block additional_script %} @@ -187,22 +234,28 @@

    Queue Pollers

    var chartParams = {"queue": queueName, "type": "STATS", 'aggregationType': 'DAILY'}; var latencyChartParams = {"queue": queueName, "type": "LATENCY", 'aggregationType': 'DAILY'}; $(document).ready(function () { - drawChart(chartParams, "stats_chart"); - drawChart(latencyChartParams, "latency_chart"); - $('#refresh-chart').click(function () { - refreshStatsChart(chartParams, "stats_chart"); - }); - - $('#refresh-latency-chart').click(function () { - refreshLatencyChart(latencyChartParams, "latency_chart"); + var chartsRendered = false; + function renderChartsOnce() { + if (chartsRendered) return; + chartsRendered = true; + drawChart(chartParams, "stats_chart"); + drawChart(latencyChartParams, "latency_chart"); + $('#refresh-chart').click(function () { + refreshStatsChart(chartParams, "stats_chart"); + }); + $('#refresh-latency-chart').click(function () { + refreshLatencyChart(latencyChartParams, "latency_chart"); + }); + attachChartEventListeners(); + } + $('#queue-detail-charts').on('toggle', function () { + if (this.open) renderChartsOnce(); }); - attachChartEventListeners(); $('#explore-queue').on('shown.bs.modal', function () { $('#explorer-title').empty().append("Queue:").append(" " + queueName + "").append( - " [" + dataType + "]").append(" " + dataName + ""); + " [" + (dataTypeLabel || dataType) + "]").append(" " + dataName + ""); refreshPage(); }); }); - {% endblock %} diff --git a/rqueue-web/src/main/resources/templates/rqueue/queues.html b/rqueue-web/src/main/resources/templates/rqueue/queues.html index de4d7fcd..fa006b5f 100644 --- a/rqueue-web/src/main/resources/templates/rqueue/queues.html +++ b/rqueue-web/src/main/resources/templates/rqueue/queues.html @@ -22,7 +22,7 @@ Queue Catalog

    Operational View of Every Queue

    - Browse queue configuration, retry policy, pause state, and backing Redis structures from a single page. + Browse queue configuration, retry policy, pause state, and backing {{ storageKicker | default('Redis') }} structures from a single page.