Skip to content

feat: actor messages & queues#3989

Closed
NathanFlurry wants to merge 1 commit intomainfrom
actor-messages
Closed

feat: actor messages & queues#3989
NathanFlurry wants to merge 1 commit intomainfrom
actor-messages

Conversation

@NathanFlurry
Copy link
Member

No description provided.

@railway-app
Copy link

railway-app bot commented Jan 21, 2026

🚅 Deployed to the rivet-pr-3989 environment in rivet-frontend

Service Status Web Updated (UTC)
frontend-inspector ❌ Build Failed (View Logs) Web Feb 3, 2026 at 8:04 pm
frontend-cloud ❌ Build Failed (View Logs) Web Feb 3, 2026 at 8:03 pm
website ❌ Build Failed (View Logs) Web Feb 3, 2026 at 7:44 pm

@railway-app railway-app bot temporarily deployed to rivet-frontend / rivet-pr-3989 January 21, 2026 00:59 Destroyed
Copy link
Member Author

NathanFlurry commented Jan 21, 2026


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link

claude bot commented Jan 21, 2026

Code Review - Actor Messages & Queues Feature

This PR adds message queue functionality to RivetKit actors. Overall this is a solid implementation with good architectural decisions.

Strengths

  1. Well-structured implementation with clean separation between queue manager and user-facing API
  2. Comprehensive test coverage for edge cases (timeouts, aborts, size limits)
  3. Proper custom error types with helpful messages
  4. Good use of versioned schemas for persistence
  5. Smart batching of storage operations to reduce I/O
  6. Metadata corruption recovery mechanism

Critical Issues

Race Condition in Waiter Resolution (queue-manager.ts:300-325)

The loop in maybeResolveWaiters could allow multiple waiters to claim the same messages. Each waiter calls drainMessages which loads all queue messages fresh from storage. Without tracking claimed messages within the resolution cycle, concurrent waiters matching the same queue names could receive duplicate messages.

Recommendation: Track which messages have been claimed during the current resolution cycle.

Memory Scaling (queue-manager.ts:246-279)

Every queue operation loads ALL messages into memory via kvListPrefix. For actors with hundreds or thousands of queued messages, this creates unnecessary memory pressure and latency.

Recommendation: Implement pagination or use the existing maxQueueSize as a hard limit on loading.

Important Issues

Silent Decode Failures (queue-manager.ts:266-271)

Failed message decoding only logs an error. These ghost messages occupy space in the queue but can never be delivered or removed. Over time this could exhaust the queue capacity.

Recommendation: Track decode failures and implement dead-letter handling or automatic cleanup.

Unpersisted Size Corrections (queue-manager.ts:274-277)

When metadata size mismatches actual message count, the code corrects it in memory but doesn't persist the fix. This means every subsequent load will detect and re-correct the same discrepancy.

Recommendation: Persist corrected metadata when detecting mismatches.

Timeout Cleanup (queue-manager.ts:183-203)

The timeout handler removes the waiter and resolves the promise but doesn't clean up the abort signal listeners. If the actor aborts after timeout, the onStop listener will attempt to reject an already-resolved promise.

Recommendation: Remove event listeners in all exit paths (timeout, abort, success).

Minor Issues

  1. Queue name validation missing in createQueueProxy - should validate format/length
  2. Variable naming could be clearer (limitedCount -> normalizedCount)
  3. Missing JSDoc on several public methods
  4. Complex ternary logic in queue.ts:48-51 could use early returns

Security & Performance

Security: Proper validation of CBOR serialization, size limits enforced, abort signals handled correctly ✅

Performance:

  • Good use of BigInt IDs with proper byte ordering ✅
  • Efficient batch operations ✅
  • Concern: Multiple serialization passes for large messages (JSON → CBOR → Bare)
  • Concern: No caching of frequently accessed metadata

Test Coverage

Good coverage of basic scenarios but missing:

  • Concurrent waiter resolution tests
  • Message decode failure scenarios
  • Metadata corruption recovery verification
  • Performance tests with large queues

Recommendation

Approve with revisions - The race condition in maybeResolveWaiters should be fixed before merge as it could cause message loss or duplication in production. The memory scaling issue should also be addressed for actors with large queues.

The implementation follows good TypeScript patterns and the overall architecture is sound. Focus on the concurrency edge cases and resource scaling concerns.

@railway-app railway-app bot temporarily deployed to rivet-frontend / rivet-pr-3989 January 22, 2026 04:10 Destroyed
@NathanFlurry NathanFlurry marked this pull request as ready for review January 22, 2026 05:36
@railway-app railway-app bot temporarily deployed to rivet-frontend / rivet-pr-3989 January 22, 2026 07:14 Destroyed
@railway-app railway-app bot temporarily deployed to rivet-frontend / rivet-pr-3989 January 22, 2026 17:45 Destroyed
@railway-app railway-app bot temporarily deployed to rivet-frontend / rivet-pr-3989 January 24, 2026 23:35 Destroyed
@railway-app railway-app bot temporarily deployed to rivet-frontend / rivet-pr-3989 January 28, 2026 01:12 Destroyed
@railway-app railway-app bot temporarily deployed to rivet-frontend / rivet-pr-3989 January 28, 2026 19:55 Destroyed
@railway-app railway-app bot temporarily deployed to rivet-frontend / rivet-pr-3989 January 30, 2026 08:19 Destroyed
@claude
Copy link

claude bot commented Jan 30, 2026

PR Review: Actor Messages & Queues

This PR introduces a new actor queue system with comprehensive TypeScript implementation. Overall, the implementation is well-structured with good error handling and test coverage. Below are my findings:

✅ Strengths

  1. Well-designed queue management: The QueueManager class provides a clean abstraction with proper separation of concerns
  2. Comprehensive test coverage: The test suite covers key scenarios including timeouts, limits, and error conditions
  3. Good error handling: Custom error classes with proper metadata (e.g., QueueFull, QueueMessageTooLarge)
  4. Type-safe client API: The queue proxy pattern provides excellent DX with type-safe queue names
  5. Proper resource cleanup: Timeout handles and abort listeners are cleaned up correctly

🔍 Code Quality Issues

1. Potential Race Condition in Queue Waiter Resolution

// rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts:300-325
async #maybeResolveWaiters() {
    if (this.#waiters.size === 0) {
        return;
    }
    const pending = [...this.#waiters.values()];
    for (const waiter of pending) {
        if (waiter.signal?.aborted) {
            this.#waiters.delete(waiter.id);
            waiter.reject(new errors.ActorAborted());
            continue;
        }

        const messages = await this.#drainMessages(
            waiter.nameSet,
            waiter.count,
        );
        if (messages.length === 0) {
            continue;
        }
        this.#waiters.delete(waiter.id);
        if (waiter.timeoutHandle) {
            clearTimeout(waiter.timeoutHandle);
        }
        waiter.resolve(messages);
    }
}

Issue: This processes waiters sequentially. If waiter A and waiter B both want message "foo", and one message arrives, waiter A will drain it while waiter B continues waiting. This is correct, but there's a potential issue: if the queue has messages when #maybeResolveWaiters() is called but they don't match any waiter, we'll do expensive #drainMessages() calls for each waiter unnecessarily.

Recommendation: Consider checking if any messages are available in the queue before iterating through waiters, or batch-fetch messages once and distribute them to waiters.

2. Inconsistent Error Handling Between Abort Scenarios

// rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts:190-217
const onAbort = () => {
    this.#waiters.delete(waiterId);
    if (waiter.timeoutHandle) {
        clearTimeout(waiter.timeoutHandle);
    }
    reject(new errors.ActorAborted());
};
const onStop = () => {
    this.#waiters.delete(waiterId);
    if (waiter.timeoutHandle) {
        clearTimeout(waiter.timeoutHandle);
    }
    reject(new errors.ActorAborted());
};

Issue: onAbort and onStop are identical functions. This is duplicative and harder to maintain.

Recommendation: Use a single function:

const cleanup = (reason: 'abort' | 'stop') => {
    this.#waiters.delete(waiterId);
    if (waiter.timeoutHandle) {
        clearTimeout(waiter.timeoutHandle);
    }
    reject(new errors.ActorAborted());
};

3. Missing Cleanup in Error Path

// rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts:190-220
if (abortSignal) {
    if (abortSignal.aborted) {
        onAbort();
        return promise;
    }
    abortSignal.addEventListener('abort', onAbort, { once: true });
}

this.#waiters.set(waiterId, waiter);
return promise;

Issue: If abortSignal.aborted is true, we call onAbort() but don't remove the event listener from actorAbortSignal which was registered earlier. This could cause memory leaks in edge cases.

Recommendation: Ensure both event listeners are cleaned up in all exit paths, or better yet, use a single cleanup function that removes both listeners.

4. Timeout Resolution Could Be More Explicit

// rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts:184-188
if (timeout !== undefined) {
    waiter.timeoutHandle = setTimeout(() => {
        this.#waiters.delete(waiterId);
        resolve([]);
    }, timeout);
}

Issue: When timeout fires, it resolves with an empty array but doesn't clean up the abort signal listeners. If the abort signal fires after timeout, it will try to reject an already-resolved promise (which is safe but unnecessary).

Recommendation: Remove event listeners in the timeout handler as well.

🐛 Potential Bugs

5. Metadata Size Drift Risk

// rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts:273-278
decoded.sort((a, b) => (a.id < b.id ? -1 : a.id > b.id ? 1 : 0));
if (this.#metadata.size !== decoded.length) {
    this.#metadata.size = decoded.length;
    this.#actor.inspector.updateQueueSize(this.#metadata.size);
}

Issue: This silent correction is good for resilience, but if metadata size drifts from actual message count, it's only corrected on reads, not writes. This could lead to incorrect QueueFull errors if metadata.size is larger than actual.

Recommendation: Add a warning log when drift is detected:

if (this.#metadata.size !== decoded.length) {
    this.#actor.rLog.warn({
        msg: 'queue metadata size drift detected',
        expected: this.#metadata.size,
        actual: decoded.length
    });
    this.#metadata.size = decoded.length;
    this.#actor.inspector.updateQueueSize(this.#metadata.size);
}

6. Potential Issue with CBOR Serialization Error Path

// rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts:98-105
let invalidPath = '';
if (
    !isCborSerializable(body, (path) => {
        invalidPath = path;
    })
) {
    throw new errors.QueueMessageInvalid(invalidPath);
}

Issue: The callback might be called multiple times if there are multiple invalid paths, but only the last one is saved. This could be confusing for debugging.

Recommendation: Either capture the first invalid path or capture all of them.

🔒 Security Considerations

7. Connection Cleanup in Queue Send

// rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts:194-206
const conn = await actor.connectionManager.prepareAndConnectConn(
    createHttpDriver(),
    params,
    c.req.raw,
    c.req.path,
    c.req.header(),
);
try {
    await actor.queueManager.enqueue(request.name, request.body);
} finally {
    conn.disconnect();
}

Good: Proper cleanup with try-finally ensures connections are closed even on errors.

⚡ Performance Considerations

8. Sequential Message Loading

// rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts:246-279
async #loadQueueMessages(): Promise<QueueMessage[]> {
    const entries = await this.#driver.kvListPrefix(
        this.#actor.id,
        KEYS.QUEUE_PREFIX,
    );
    const decoded: QueueMessage[] = [];
    for (const [key, value] of entries) {
        try {
            const messageId = decodeQueueMessageKey(key);
            const decodedPayload =
                QUEUE_MESSAGE_VERSIONED.deserializeWithEmbeddedVersion(
                    value,
                );
            const body = cbor.decode(new Uint8Array(decodedPayload.body));
            decoded.push({
                id: messageId,
                name: decodedPayload.name,
                body,
                createdAt: Number(decodedPayload.createdAt),
            });
        } catch (error) {
            this.#actor.rLog.error({
                msg: 'failed to decode queue message',
                error,
            });
        }
    }
    // ...
}

Concern: This loads ALL queue messages every time. For large queues, this could be expensive. Consider adding pagination or limiting how many messages are loaded at once.

Recommendation: Add a limit parameter or use cursor-based pagination for queue scanning.

📝 Documentation & Testing

9. Test Coverage Gaps

The test suite is comprehensive, but consider adding:

  • Test for concurrent receives (multiple waiters for same message)
  • Test for queue persistence across actor restarts
  • Test for metadata rebuild after corruption
  • Test for very large queues (performance test)
  • Test for rapid enqueue/dequeue cycles

10. Type Overloading Complexity

// rivetkit-typescript/packages/rivetkit/src/actor/instance/queue.ts:31-46
next(name: string, opts?: QueueReceiveOptions): Promise<QueueMessage | undefined>;
next(name: string[], opts?: QueueReceiveOptions): Promise<QueueMessage[] | undefined>;
next(request: QueueReceiveRequest): Promise<QueueMessage[] | undefined>;

Good: The overloads provide a great DX, but the implementation logic could be clearer with better variable names.

🎯 Minor Issues

  1. Magic Numbers: waitForAbort in test fixture uses hardcoded timeout values (10ms and 10_000ms). Consider constants.

  2. Comment Style: Some comments are missing periods. Follow CLAUDE.md guidelines for complete sentences.

✨ Suggestions

  1. Consider adding queue metrics: Track enqueue/dequeue rates, wait times, etc. for observability
  2. Add queue draining utility: A method to clear all messages could be useful for testing/admin
  3. Consider priority queues: Future enhancement for message prioritization

📊 Summary

Risk Level: 🟡 Medium

This is a solid implementation with good architecture. The main concerns are:

  • Potential race conditions in waiter resolution
  • Memory leak risks from incomplete event listener cleanup
  • Performance concerns for large queues
  • Metadata drift handling

Recommendation: ✅ Approve with minor revisions

The issues identified are not blocking but should be addressed to improve robustness and performance. The test coverage is good, but consider adding the suggested test cases before merging.


Review generated by Claude Code CLI

@graphite-app
Copy link
Contributor

graphite-app bot commented Feb 4, 2026

Merge activity

  • Feb 4, 8:36 PM UTC: NathanFlurry added this pull request to the Graphite merge queue.
  • Feb 4, 8:37 PM UTC: CI is running for this pull request on a draft pull request (#4114) due to your merge queue CI optimization settings.
  • Feb 4, 8:38 PM UTC: Merged by the Graphite merge queue via draft PR: #4114.

graphite-app bot pushed a commit that referenced this pull request Feb 4, 2026
@graphite-app graphite-app bot closed this Feb 4, 2026
@railway-app railway-app bot temporarily deployed to rivet-frontend / rivet-pr-3989 February 4, 2026 20:38 Destroyed
@graphite-app graphite-app bot deleted the actor-messages branch February 4, 2026 20:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant