| title | id | skillLevel | applicationPatternId | summary | tags | rule | author | related | lessonOrder | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Test Concurrent Code |
testing-concurrent-code |
advanced |
testing |
Test race conditions, parallelism, and concurrent behavior in Effect programs. |
|
|
PaulJPhilp |
|
3 |
Use Effect's TestClock and fiber control to make concurrent tests deterministic and repeatable.
Concurrent code is hard to test:
- Non-determinism - Different runs, different results
- Race conditions - Timing-dependent bugs
- Deadlocks - Hard to reproduce
- Flaky tests - Pass sometimes, fail others
Effect's test utilities provide control over timing and concurrency.
import { describe, it, expect } from "vitest"
import { Effect, Fiber, Ref, TestClock, Duration, Deferred } from "effect"
describe("Concurrent Code Testing", () => {
// ============================================
// 1. Test parallel execution
// ============================================
it("should run effects in parallel", async () => {
const executionOrder: string[] = []
const task1 = Effect.gen(function* () {
yield* Effect.sleep("100 millis")
executionOrder.push("task1")
return 1
})
const task2 = Effect.gen(function* () {
yield* Effect.sleep("50 millis")
executionOrder.push("task2")
return 2
})
const program = Effect.all([task1, task2], { concurrency: 2 })
// Use TestClock to control time
const result = await Effect.runPromise(
Effect.gen(function* () {
const fiber = yield* Effect.fork(program)
// Advance time to trigger both tasks
yield* TestClock.adjust("100 millis")
return yield* Fiber.join(fiber)
}).pipe(Effect.provide(TestClock.live))
)
expect(result).toEqual([1, 2])
// With real time, task2 would complete first
expect(executionOrder).toContain("task1")
expect(executionOrder).toContain("task2")
})
// ============================================
// 2. Test race conditions
// ============================================
it("should handle race condition correctly", async () => {
const counter = await Effect.runPromise(
Effect.gen(function* () {
const ref = yield* Ref.make(0)
// Simulate concurrent increments
const increment = Ref.update(ref, (n) => n + 1)
// Run 100 concurrent increments
yield* Effect.all(
Array.from({ length: 100 }, () => increment),
{ concurrency: "unbounded" }
)
return yield* Ref.get(ref)
})
)
// Ref is atomic, so all increments should be counted
expect(counter).toBe(100)
})
// ============================================
// 3. Test with controlled fiber execution
// ============================================
it("should test fiber lifecycle", async () => {
const events: string[] = []
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(
Effect.gen(function* () {
events.push("started")
yield* Effect.sleep("1 second")
events.push("completed")
return "result"
})
)
events.push("forked")
// Interrupt the fiber
yield* Fiber.interrupt(fiber)
events.push("interrupted")
const exit = yield* Fiber.await(fiber)
return exit
})
await Effect.runPromise(program)
expect(events).toEqual(["forked", "started", "interrupted"])
expect(events).not.toContain("completed")
})
// ============================================
// 4. Test timeout behavior
// ============================================
it("should timeout slow operations", async () => {
const slowOperation = Effect.gen(function* () {
yield* Effect.sleep("10 seconds")
return "completed"
})
const result = await Effect.runPromise(
Effect.gen(function* () {
const fiber = yield* Effect.fork(
slowOperation.pipe(Effect.timeout("1 second"))
)
// Advance past the timeout
yield* TestClock.adjust("2 seconds")
return yield* Fiber.join(fiber)
}).pipe(Effect.provide(TestClock.live))
)
// Result is Option.None due to timeout
expect(result._tag).toBe("None")
})
// ============================================
// 5. Test with Deferred for synchronization
// ============================================
it("should synchronize fibers correctly", async () => {
const result = await Effect.runPromise(
Effect.gen(function* () {
const deferred = yield* Deferred.make<string>()
const results: string[] = []
// Consumer waits for producer
const consumer = Effect.fork(
Effect.gen(function* () {
const value = yield* Deferred.await(deferred)
results.push(`consumed: ${value}`)
})
)
// Producer completes the deferred
const producer = Effect.gen(function* () {
results.push("producing")
yield* Deferred.succeed(deferred, "data")
results.push("produced")
})
yield* consumer
yield* producer
// Wait for consumer to process
yield* Effect.sleep("10 millis")
return results
})
)
expect(result).toContain("producing")
expect(result).toContain("produced")
expect(result).toContain("consumed: data")
})
// ============================================
// 6. Test for absence of deadlocks
// ============================================
it("should not deadlock with proper resource ordering", async () => {
const result = await Effect.runPromise(
Effect.gen(function* () {
const ref1 = yield* Ref.make(0)
const ref2 = yield* Ref.make(0)
// Two fibers accessing refs in same order (no deadlock)
const fiber1 = yield* Effect.fork(
Effect.gen(function* () {
yield* Ref.update(ref1, (n) => n + 1)
yield* Ref.update(ref2, (n) => n + 1)
})
)
const fiber2 = yield* Effect.fork(
Effect.gen(function* () {
yield* Ref.update(ref1, (n) => n + 1)
yield* Ref.update(ref2, (n) => n + 1)
})
)
yield* Fiber.join(fiber1)
yield* Fiber.join(fiber2)
return [yield* Ref.get(ref1), yield* Ref.get(ref2)]
}).pipe(Effect.timeout("1 second"))
)
expect(result._tag).toBe("Some")
expect(result.value).toEqual([2, 2])
})
})| Scenario | Approach |
|---|---|
| Race conditions | Use Ref (atomic) and verify counts |
| Timing | Use TestClock to control time |
| Fiber lifecycle | Fork, interrupt, join |
| Synchronization | Use Deferred |
| Deadlocks | Timeout tests |
| Utility | Purpose |
|---|---|
TestClock |
Control time in tests |
Fiber.fork/join |
Control fiber execution |
Deferred |
Synchronize between fibers |
Ref |
Atomic state |
Effect.timeout |
Detect hangs |