feat(rivetkit): workflows#3734
feat(rivetkit): workflows#3734NathanFlurry wants to merge 1 commit into01-21-feat_rivetkit_add_actor_run_handlerfrom
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub. 4 Skipped Deployments
|
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
How to use the Graphite Merge QueueAdd the label merge-queue to this PR to add it to the merge queue. You must have a Graphite account in order to use the merge queue. Sign up using this link. An organization admin has enabled the Graphite Merge Queue in this repository. Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue. This stack of pull requests is managed by Graphite. Learn more about stacking. |
a1796cf to
4ca7516
Compare
4ca7516 to
cff619f
Compare
cff619f to
3700d45
Compare
3700d45 to
fbdbcd3
Compare
fbdbcd3 to
246d677
Compare
70b8762 to
2fe5b7d
Compare
246d677 to
a3fbdd7
Compare
PR Review: Workflow Engine for RivetKitOverviewThis PR introduces a comprehensive durable workflow engine for RivetKit TypeScript. This is a major addition (~9,257 lines) that implements a reentrant execution framework for building fault-tolerant, long-running workflows. Summary AssessmentOverall: Strong implementation with production-ready patterns This is high-quality code that demonstrates deep understanding of durable execution semantics. The implementation includes sophisticated features like:
Code Quality AnalysisStrengths ✅
Issues & Concerns 🔍1. Critical: Race Condition in Message Consumption (High Priority)Location: export async function consumeMessage(
storage: Storage,
driver: EngineDriver,
messageName: string,
): Promise<Message | null> {
const index = storage.messages.findIndex(
(message) => message.name === messageName,
);
if (index === -1) {
return null;
}
const message = storage.messages[index];
// Delete from driver first - if this fails, memory is unchanged
await driver.delete(buildMessageKey(message.id));
// Only remove from memory after successful driver deletion
storage.messages.splice(index, 1);
return message;
}Problem: If the workflow crashes between deleting from KV and removing from memory, the message will be lost on next load (since it's deleted from KV but storage.messages still has it in memory). Impact: Message loss in crash scenarios. Recommendation: Consider a two-phase approach:
Alternatively, accept this as "at-most-once" delivery and document it clearly. 2. Performance: Excessive Metadata Loading (Medium Priority)Location: Every retry attempt loads metadata from driver. If metadata is already loaded in 3. Timeout Behavior May Be Surprising (Medium Priority)Location: The comment correctly notes that timeouts don't cancel underlying operations. This can lead to resource leaks if step operations don't respect
4. Missing Input Validation (Low Priority)Several functions lack input validation:
5. Inconsistent Error Messages (Low Priority)Some errors are user-facing while others are internal. Consider standardizing error messages with actionable guidance. Security Considerations 🔒
Performance Considerations ⚡
Test Coverage Analysis 🧪Strengths:
Gaps:
Recommendation: Add tests for KV failures, partial batch writes, concurrent messages, and loop iteration limits. Architecture Review 🏗️Isolation ModelThe driver design assumes external isolation (Durable Objects, actors). This is well-documented and appropriate. Consideration: What happens if isolation is violated? (e.g., two processes run same workflow ID)
Serialization StrategyUses BARE schema for binary serialization. Good choice for compact storage, schema evolution, and type safety. Note: Ensure BARE schema changes are backward compatible or migrations are handled. Recommendations SummaryMust Fix (Before Merge)
Should Fix (High Priority)
Nice to Have (Medium Priority)
Documentation Improvements
ConclusionThis is excellent work and demonstrates a mature understanding of durable execution patterns. The code quality is high, error handling is sophisticated, and the API is well-designed. Recommendation: APPROVE with minor fixes The identified issues are mostly documentation gaps and potential edge cases. The core implementation is solid and ready for production use with proper driver implementations. Key Action Items:
Great job on this implementation! 🎉 Reviewed using Rivet CLAUDE.md guidelines and TypeScript best practices. |
a3fbdd7 to
b4b061f
Compare
2fe5b7d to
329640c
Compare
329640c to
4e8ca02
Compare
b4b061f to
8a67460
Compare
4e8ca02 to
f1e4e30
Compare
8a67460 to
a5391dd
Compare
Pull Request Review: Workflow Engine ImplementationOverviewThis PR introduces a comprehensive workflow engine package (@rivetkit/workflow-engine) for RivetKit. This is a substantial addition (9,257 additions across 46 files) that implements durable execution for TypeScript workflows. Architecture AssessmentStrengths1. Solid Core Design
2. Comprehensive Feature Set
3. Excellent Test Coverage
4. Good Documentation
Issues and RecommendationsCritical Issues1. Race Condition in Message Consumption (storage.ts:307-328) If multiple workflow executions run concurrently, they could both find the same message in memory, both attempt to delete it, and both think they successfully consumed it. Recommendation:
2. Non-Atomic Batch Operations (storage.ts:194-278) The EngineDriver interface says batch "Should be atomic" but it is not required. If a batch partially fails, the workflow could end up in an inconsistent state. Recommendation:
High Priority Issues3. Missing Input Validation The code does not validate critical inputs like workflow IDs, message names, or entry names. For example, context.ts:946 uses colons as delimiters in name:count. If a user passes a name containing colons, this creates ambiguous keys. Recommendation:
4. Unbounded Message Queue Messages accumulate in memory without bounds. For long-running workflows receiving many messages, this could cause OOM errors. Recommendation:
5. Timeout Does Not Cancel Underlying Work (context.ts:464-480) While documented, timeouts do not cancel the underlying async operation. If a step times out but continues running, it might hold database connections, continue making API calls, or modify shared state. Recommendation:
6. Hardcoded Console Logging (context.ts:1746-1751) Using console.warn directly makes logging not configurable. Recommendation:
Minor Issues7. Magic Numbers - Hardcoded retry delay of 100ms in index.ts:405 should be a named constant 8. Type Safety - Several type assertions that could be avoided with better type design 9. Incomplete Error Serialization - extractErrorInfo only captures primitive metadata, losing objects and arrays 10. Non-null Assertions - Multiple uses of ! operator that could fail if state is corrupted Performance Considerations11. Inefficient Loop Cleanup (context.ts:753-763) Deletes each old iteration sequentially. For loops with thousands of iterations, this could be very slow. Recommendation: Batch deletes or use range-based deletion if driver supports it. 12. Sequential Metadata Loading (index.ts:338) Could be parallelized for better performance during rollback. Security Considerations13. No Resource Limits Missing limits on workflow execution time, entry count, recursion depth, and message size. Recommendation: Add configurable limits to prevent DoS attacks or runaway workflows. 14. No Input Validation on Deserialization (index.ts:695) No validation of deserialized input. If storage is compromised, malicious input could be injected. Recommendation: Add input schema validation. Testing GapsMissing test cases for:
Recommendation: Add stress tests and chaos engineering scenarios. Documentation Issues15. Driver Interface Requirements The list() requirement "MUST be sorted" is mentioned in comments but easy to miss. Recommendation: Make this more prominent in interface documentation and add runtime validation. 16. Migration Path Unclear The removed() API is documented but there is no clear guide on versioning strategies or safe migration patterns. Positive Patterns Worth Highlighting
SummaryThis is a well-engineered workflow engine with strong fundamentals. The main concerns are around concurrent execution safety, resource limits, and edge case handling. Action Items PriorityMust Fix Before Merge:
Should Fix Soon: Nice to Have: Overall assessment: Approve with changes. This is production-quality code with some important refinements needed. Review generated by Claude Sonnet 4.5 |
f1e4e30 to
1c34e5c
Compare
a5391dd to
18bb55f
Compare
PR Review: feat(rivetkit): workflowsOverviewThis PR adds a comprehensive workflow engine to RivetKit - a durable, reentrant execution system for TypeScript workflows. The implementation is substantial (9,257 additions across 46 files) and introduces a sophisticated state management system with replay capabilities. 🎯 Architecture & DesignStrengths
Concerns1. Memory Management in Long-Running WorkflowsThe in-memory visitedKeys set and usedNamesInExecution set (context.ts:104, 109) grow unbounded during execution. For workflows with thousands of steps or long-running loops, this could cause memory issues. Recommendation: Consider clearing these sets after validateComplete() or implementing periodic cleanup. 2. Race Condition in Message HandlingThe notifyMessage function (index.ts:180-195) only notifies the first matching waiter when a message arrives. If multiple listen() calls are waiting for the same message name, only one will be woken. Recommendation: Either document this as intentional behavior or support multiple waiters per message name. 🐛 Potential Bugs1. Type Safety Issue in Step ConfigurationThe run! assertion in context.ts:309 assumes run is defined when nameOrConfig is a string, but TypeScript does not enforce this. 2. Silent Error in RollbackErrors during rollback replay are silently ignored (index.ts:322), which could hide bugs. Consider logging these errors for observability. ⚡ Performance Considerations1. Batch Write PerformanceThe flush() function does not limit batch size. For workflows with thousands of dirty entries, this could hit driver batch size limits. 2. Message List Sorting OverheadMessage sorting (storage.ts:134) may be redundant if the driver returns sorted results as documented. 3. History Entry LookupThe O(n) iteration over all history entries in validateComplete() (context.ts:246-267) could be expensive for large workflows. 🔒 Security Concerns1. Denial of Service via Large InputsNo validation exists on workflow input/output sizes. Malicious code could store massive objects exhausting memory/storage. 2. Arbitrary Code Execution in RollbackRollback handlers execute with potentially stale data. Document security requirements for rollback handlers. 📚 Code QualityPositives
Areas for Improvement
✅ Recommendations SummaryCritical (Must Fix)
High Priority
Documentation
🎉 ConclusionThis is a well-architected and thoughtfully implemented workflow engine. The deterministic replay mechanism is sophisticated and the separation of concerns is excellent. Overall Assessment: ✅ Approve with recommendations The core functionality is solid. Address the critical issues before merge, and consider the high-priority items for follow-up PRs. Great work on this complex feature! 🚀 |
1c34e5c to
539fdca
Compare
18bb55f to
177df46
Compare
177df46 to
532ab13
Compare
539fdca to
21606fd
Compare
532ab13 to
dc53f52
Compare
Merge activity
|

No description provided.