Conversation
|
🚅 Deployed to the rivet-pr-3989 environment in rivet-frontend
|
How to use the Graphite Merge QueueAdd the label merge-queue to this PR to add it to the merge queue. You must have a Graphite account in order to use the merge queue. Sign up using this link. An organization admin has enabled the Graphite Merge Queue in this repository. Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue. This stack of pull requests is managed by Graphite. Learn more about stacking. |
Code Review - Actor Messages & Queues FeatureThis PR adds message queue functionality to RivetKit actors. Overall this is a solid implementation with good architectural decisions. Strengths
Critical IssuesRace Condition in Waiter Resolution (queue-manager.ts:300-325) The loop in maybeResolveWaiters could allow multiple waiters to claim the same messages. Each waiter calls drainMessages which loads all queue messages fresh from storage. Without tracking claimed messages within the resolution cycle, concurrent waiters matching the same queue names could receive duplicate messages. Recommendation: Track which messages have been claimed during the current resolution cycle. Memory Scaling (queue-manager.ts:246-279) Every queue operation loads ALL messages into memory via kvListPrefix. For actors with hundreds or thousands of queued messages, this creates unnecessary memory pressure and latency. Recommendation: Implement pagination or use the existing maxQueueSize as a hard limit on loading. Important IssuesSilent Decode Failures (queue-manager.ts:266-271) Failed message decoding only logs an error. These ghost messages occupy space in the queue but can never be delivered or removed. Over time this could exhaust the queue capacity. Recommendation: Track decode failures and implement dead-letter handling or automatic cleanup. Unpersisted Size Corrections (queue-manager.ts:274-277) When metadata size mismatches actual message count, the code corrects it in memory but doesn't persist the fix. This means every subsequent load will detect and re-correct the same discrepancy. Recommendation: Persist corrected metadata when detecting mismatches. Timeout Cleanup (queue-manager.ts:183-203) The timeout handler removes the waiter and resolves the promise but doesn't clean up the abort signal listeners. If the actor aborts after timeout, the onStop listener will attempt to reject an already-resolved promise. Recommendation: Remove event listeners in all exit paths (timeout, abort, success). Minor Issues
Security & PerformanceSecurity: Proper validation of CBOR serialization, size limits enforced, abort signals handled correctly ✅ Performance:
Test CoverageGood coverage of basic scenarios but missing:
RecommendationApprove with revisions - The race condition in maybeResolveWaiters should be fixed before merge as it could cause message loss or duplication in production. The memory scaling issue should also be addressed for actors with large queues. The implementation follows good TypeScript patterns and the overall architecture is sound. Focus on the concurrency edge cases and resource scaling concerns. |
20d84dd to
ed996b2
Compare
ed996b2 to
11d2f22
Compare
11d2f22 to
266ebbd
Compare
266ebbd to
2dd6f17
Compare
2dd6f17 to
92b8706
Compare
92b8706 to
893fa88
Compare
6474860 to
1e5f0e0
Compare
PR Review: Actor Messages & QueuesThis PR introduces a new actor queue system with comprehensive TypeScript implementation. Overall, the implementation is well-structured with good error handling and test coverage. Below are my findings: ✅ Strengths
🔍 Code Quality Issues1. Potential Race Condition in Queue Waiter Resolution// rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts:300-325
async #maybeResolveWaiters() {
if (this.#waiters.size === 0) {
return;
}
const pending = [...this.#waiters.values()];
for (const waiter of pending) {
if (waiter.signal?.aborted) {
this.#waiters.delete(waiter.id);
waiter.reject(new errors.ActorAborted());
continue;
}
const messages = await this.#drainMessages(
waiter.nameSet,
waiter.count,
);
if (messages.length === 0) {
continue;
}
this.#waiters.delete(waiter.id);
if (waiter.timeoutHandle) {
clearTimeout(waiter.timeoutHandle);
}
waiter.resolve(messages);
}
}Issue: This processes waiters sequentially. If waiter A and waiter B both want message "foo", and one message arrives, waiter A will drain it while waiter B continues waiting. This is correct, but there's a potential issue: if the queue has messages when Recommendation: Consider checking if any messages are available in the queue before iterating through waiters, or batch-fetch messages once and distribute them to waiters. 2. Inconsistent Error Handling Between Abort Scenarios// rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts:190-217
const onAbort = () => {
this.#waiters.delete(waiterId);
if (waiter.timeoutHandle) {
clearTimeout(waiter.timeoutHandle);
}
reject(new errors.ActorAborted());
};
const onStop = () => {
this.#waiters.delete(waiterId);
if (waiter.timeoutHandle) {
clearTimeout(waiter.timeoutHandle);
}
reject(new errors.ActorAborted());
};Issue: Recommendation: Use a single function: const cleanup = (reason: 'abort' | 'stop') => {
this.#waiters.delete(waiterId);
if (waiter.timeoutHandle) {
clearTimeout(waiter.timeoutHandle);
}
reject(new errors.ActorAborted());
};3. Missing Cleanup in Error Path// rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts:190-220
if (abortSignal) {
if (abortSignal.aborted) {
onAbort();
return promise;
}
abortSignal.addEventListener('abort', onAbort, { once: true });
}
this.#waiters.set(waiterId, waiter);
return promise;Issue: If Recommendation: Ensure both event listeners are cleaned up in all exit paths, or better yet, use a single cleanup function that removes both listeners. 4. Timeout Resolution Could Be More Explicit// rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts:184-188
if (timeout !== undefined) {
waiter.timeoutHandle = setTimeout(() => {
this.#waiters.delete(waiterId);
resolve([]);
}, timeout);
}Issue: When timeout fires, it resolves with an empty array but doesn't clean up the abort signal listeners. If the abort signal fires after timeout, it will try to reject an already-resolved promise (which is safe but unnecessary). Recommendation: Remove event listeners in the timeout handler as well. 🐛 Potential Bugs5. Metadata Size Drift Risk// rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts:273-278
decoded.sort((a, b) => (a.id < b.id ? -1 : a.id > b.id ? 1 : 0));
if (this.#metadata.size !== decoded.length) {
this.#metadata.size = decoded.length;
this.#actor.inspector.updateQueueSize(this.#metadata.size);
}Issue: This silent correction is good for resilience, but if metadata size drifts from actual message count, it's only corrected on reads, not writes. This could lead to incorrect Recommendation: Add a warning log when drift is detected: if (this.#metadata.size !== decoded.length) {
this.#actor.rLog.warn({
msg: 'queue metadata size drift detected',
expected: this.#metadata.size,
actual: decoded.length
});
this.#metadata.size = decoded.length;
this.#actor.inspector.updateQueueSize(this.#metadata.size);
}6. Potential Issue with CBOR Serialization Error Path// rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts:98-105
let invalidPath = '';
if (
!isCborSerializable(body, (path) => {
invalidPath = path;
})
) {
throw new errors.QueueMessageInvalid(invalidPath);
}Issue: The callback might be called multiple times if there are multiple invalid paths, but only the last one is saved. This could be confusing for debugging. Recommendation: Either capture the first invalid path or capture all of them. 🔒 Security Considerations7. Connection Cleanup in Queue Send// rivetkit-typescript/packages/rivetkit/src/actor/router-endpoints.ts:194-206
const conn = await actor.connectionManager.prepareAndConnectConn(
createHttpDriver(),
params,
c.req.raw,
c.req.path,
c.req.header(),
);
try {
await actor.queueManager.enqueue(request.name, request.body);
} finally {
conn.disconnect();
}Good: Proper cleanup with try-finally ensures connections are closed even on errors. ⚡ Performance Considerations8. Sequential Message Loading// rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts:246-279
async #loadQueueMessages(): Promise<QueueMessage[]> {
const entries = await this.#driver.kvListPrefix(
this.#actor.id,
KEYS.QUEUE_PREFIX,
);
const decoded: QueueMessage[] = [];
for (const [key, value] of entries) {
try {
const messageId = decodeQueueMessageKey(key);
const decodedPayload =
QUEUE_MESSAGE_VERSIONED.deserializeWithEmbeddedVersion(
value,
);
const body = cbor.decode(new Uint8Array(decodedPayload.body));
decoded.push({
id: messageId,
name: decodedPayload.name,
body,
createdAt: Number(decodedPayload.createdAt),
});
} catch (error) {
this.#actor.rLog.error({
msg: 'failed to decode queue message',
error,
});
}
}
// ...
}Concern: This loads ALL queue messages every time. For large queues, this could be expensive. Consider adding pagination or limiting how many messages are loaded at once. Recommendation: Add a limit parameter or use cursor-based pagination for queue scanning. 📝 Documentation & Testing9. Test Coverage GapsThe test suite is comprehensive, but consider adding:
10. Type Overloading Complexity// rivetkit-typescript/packages/rivetkit/src/actor/instance/queue.ts:31-46
next(name: string, opts?: QueueReceiveOptions): Promise<QueueMessage | undefined>;
next(name: string[], opts?: QueueReceiveOptions): Promise<QueueMessage[] | undefined>;
next(request: QueueReceiveRequest): Promise<QueueMessage[] | undefined>;Good: The overloads provide a great DX, but the implementation logic could be clearer with better variable names. 🎯 Minor Issues
✨ Suggestions
📊 SummaryRisk Level: 🟡 Medium This is a solid implementation with good architecture. The main concerns are:
Recommendation: ✅ Approve with minor revisions The issues identified are not blocking but should be addressed to improve robustness and performance. The test coverage is good, but consider adding the suggested test cases before merging. Review generated by Claude Code CLI |
9fc452e to
1e5f0e0
Compare
9fc452e to
1e5f0e0
Compare
1e5f0e0 to
facd2f1
Compare
facd2f1 to
bde0b0f
Compare
Merge activity
|

No description provided.