| title | Stream Pattern 8: Advanced Stream Transformations | ||||||
|---|---|---|---|---|---|---|---|
| id | stream-pattern-advanced-transformations | ||||||
| skillLevel | advanced | ||||||
| applicationPatternId | streams | ||||||
| summary | Apply complex transformations across streams including custom operators, effect-based transformations, and composition patterns. | ||||||
| tags |
|
||||||
| rule |
|
||||||
| related |
|
||||||
| author | effect_website | ||||||
| lessonOrder | 4 |
Advanced transformations enable complex data flows:
- Custom operators: Build reusable transformations
- Effect-based: Transformations with side effects
- Lazy evaluation: Compute only what's needed
- Fusion: Optimize composed operations
- Staging: Multiple transformation layers
- Composition: Combine operators cleanly
Pattern: Stream.mapEffect(), Stream.map(), pipe composition
Simple transformations don't scale:
Problem 1: Performance degradation
- Each layer creates intermediate collection
- 10 transformations = 10 allocations
- Process 1M items = 10M allocations
- GC pressure, memory exhaustion
Problem 2: Complex logic scattered
- Validation here, enrichment there, filtering elsewhere
- Hard to maintain
- Changes break other parts
- No clear data flow
Problem 3: Effect handling
- Transformations need side effects
- Network calls, database queries
- Naive approach: load all, transform sequentially
- Slow, inefficient
Problem 4: Reusability
- Custom transformation used once
- Next time, rewrite from scratch
- Code duplication
- Bugs replicated
Solutions:
Custom operators:
- Encapsulate transformation logic
- Reusable across projects
- Testable in isolation
- Composable
Lazy evaluation:
- Compute as elements flow
- No intermediate collections
- Constant memory
- Only compute what's used
Fusion:
- Combine multiple maps/filters
- Single pass through data
- No intermediate collections
- Compiler/library optimizes
Effect composition:
- Chain effects naturally
- Error propagation automatic
- Resource cleanup guaranteed
- Readable code
This example demonstrates advanced stream transformations.
import { Effect, Stream, Ref, Chunk } from "effect";
interface LogEntry {
timestamp: Date;
level: "info" | "warn" | "error";
message: string;
context?: Record<string, unknown>;
}
interface Metric {
name: string;
value: number;
tags: Record<string, string>;
}
const program = Effect.gen(function* () {
console.log(`\n[ADVANCED STREAM TRANSFORMATIONS] Complex data flows\n`);
// Example 1: Custom filter operator
console.log(`[1] Custom filter with effect-based logic:\n`);
const filterByEffect = <A,>(
predicate: (a: A) => Effect.Effect<boolean>
) =>
(stream: Stream.Stream<A>) =>
stream.pipe(
Stream.mapEffect((value) =>
predicate(value).pipe(
Effect.map((keep) => (keep ? value : null))
)
),
Stream.filter((value) => value !== null)
);
const isValid = (num: number): Effect.Effect<boolean> =>
Effect.gen(function* () {
// Simulate validation effect (e.g., API call)
return num > 0 && num < 100;
});
const numbers = [50, 150, 25, -10, 75];
const validNumbers = yield* Stream.fromIterable(numbers).pipe(
filterByEffect(isValid),
Stream.runCollect
);
yield* Effect.log(`[VALID] ${validNumbers.join(", ")}\n`);
// Example 2: Enrichment transformation
console.log(`[2] Enriching records with additional data:\n`);
interface RawRecord {
id: string;
value: number;
}
interface EnrichedRecord {
id: string;
value: number;
validated: boolean;
processed: Date;
metadata: Record<string, unknown>;
}
const enrich = (record: RawRecord): Effect.Effect<EnrichedRecord> =>
Effect.gen(function* () {
// Simulate lookup/validation
const validated = record.value > 0;
return {
id: record.id,
value: record.value,
validated,
processed: new Date(),
metadata: { source: "stream" },
};
});
const rawData = [
{ id: "r1", value: 10 },
{ id: "r2", value: -5 },
{ id: "r3", value: 20 },
];
const enriched = yield* Stream.fromIterable(rawData).pipe(
Stream.mapEffect((record) => enrich(record)),
Stream.runCollect
);
yield* Effect.log(`[ENRICHED] ${enriched.length} records enriched\n`);
// Example 3: Demultiplexing (split one stream into multiple)
console.log(`[3] Demultiplexing by category:\n`);
interface Event {
id: string;
type: "click" | "view" | "purchase";
data: unknown;
}
const events: Event[] = [
{ id: "e1", type: "click", data: { x: 100, y: 200 } },
{ id: "e2", type: "view", data: { url: "/" } },
{ id: "e3", type: "purchase", data: { amount: 99.99 } },
{ id: "e4", type: "click", data: { x: 50, y: 100 } },
];
const clicks = yield* Stream.fromIterable(events).pipe(
Stream.filter((e) => e.type === "click"),
Stream.runCollect
);
const views = yield* Stream.fromIterable(events).pipe(
Stream.filter((e) => e.type === "view"),
Stream.runCollect
);
const purchases = yield* Stream.fromIterable(events).pipe(
Stream.filter((e) => e.type === "purchase"),
Stream.runCollect
);
yield* Effect.log(
`[DEMUX] Clicks: ${clicks.length}, Views: ${views.length}, Purchases: ${purchases.length}\n`
);
// Example 4: Chunked processing (batch transformation)
console.log(`[4] Chunked processing (batches of N):\n`);
const processChunk = (chunk: Array<{ id: string; value: number }>) =>
Effect.gen(function* () {
const sum = chunk.reduce((s, r) => s + r.value, 0);
const avg = sum / chunk.length;
yield* Effect.log(
`[CHUNK] ${chunk.length} items, avg: ${avg.toFixed(2)}`
);
return { size: chunk.length, sum, avg };
});
const data = Array.from({ length: 10 }, (_, i) => ({
id: `d${i}`,
value: i + 1,
}));
const chunkSize = 3;
const chunks = [];
for (let i = 0; i < data.length; i += chunkSize) {
const chunk = data.slice(i, i + chunkSize);
chunks.push(chunk);
}
const chunkResults = yield* Effect.all(
chunks.map((chunk) => processChunk(chunk))
);
yield* Effect.log(
`[CHUNKS] Processed ${chunkResults.length} batches\n`
);
// Example 5: Multi-stage transformation pipeline
console.log(`[5] Multi-stage pipeline (parse → validate → transform):\n`);
const rawStrings = ["10", "twenty", "30", "-5", "50"];
// Stage 1: Parse
const parsed = yield* Stream.fromIterable(rawStrings).pipe(
Stream.mapEffect((s) =>
Effect.gen(function* () {
try {
return parseInt(s);
} catch (error) {
yield* Effect.fail(
new Error(`Failed to parse: ${s}`)
);
}
}).pipe(
Effect.catchAll((error) =>
Effect.gen(function* () {
yield* Effect.log(`[PARSE ERROR] ${error.message}`);
return null;
})
)
)
),
Stream.filter((n) => n !== null),
Stream.runCollect
);
yield* Effect.log(`[STAGE 1] Parsed: ${parsed.join(", ")}`);
// Stage 2: Validate
const validated = parsed.filter((n) => n > 0);
yield* Effect.log(`[STAGE 2] Validated: ${validated.join(", ")}`);
// Stage 3: Transform
const transformed = validated.map((n) => n * 2);
yield* Effect.log(`[STAGE 3] Transformed: ${transformed.join(", ")}\n`);
// Example 6: Composition of custom operators
console.log(`[6] Composable transformation pipeline:\n`);
// Define custom operator
const withLogging = <A,>(label: string) =>
(stream: Stream.Stream<A>) =>
stream.pipe(
Stream.tap((value) =>
Effect.log(`[${label}] Processing: ${JSON.stringify(value)}`)
)
);
const filterPositive = (stream: Stream.Stream<number>) =>
stream.pipe(
Stream.filter((n) => n > 0),
Stream.tap(() => Effect.log(`[FILTER] Kept positive`))
);
const scaleUp = (factor: number) =>
(stream: Stream.Stream<number>) =>
stream.pipe(
Stream.map((n) => n * factor),
Stream.tap((n) =>
Effect.log(`[SCALE] Scaled to ${n}`)
)
);
const testData = [10, -5, 20, -3, 30];
const pipeline = yield* Stream.fromIterable(testData).pipe(
withLogging("INPUT"),
filterPositive,
scaleUp(10),
Stream.runCollect
);
yield* Effect.log(`[RESULT] Final: ${pipeline.join(", ")}\n`);
// Example 7: Stateful transformation
console.log(`[7] Stateful transformation (running total):\n`);
const runningTotal = yield* Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.scan(0, (acc, value) => acc + value),
Stream.runCollect
);
yield* Effect.log(`[TOTALS] ${runningTotal.join(", ")}\n`);
// Example 8: Conditional transformation
console.log(`[8] Conditional transformation (different paths):\n`);
interface Item {
id: string;
priority: "high" | "normal" | "low";
}
const transformByPriority = (item: Item): Effect.Effect<{
id: string;
processed: string;
}> =>
Effect.gen(function* () {
switch (item.priority) {
case "high":
yield* Effect.log(`[HIGH] Priority processing for ${item.id}`);
return { id: item.id, processed: "urgent" };
case "normal":
yield* Effect.log(
`[NORMAL] Standard processing for ${item.id}`
);
return { id: item.id, processed: "standard" };
case "low":
yield* Effect.log(`[LOW] Deferred processing for ${item.id}`);
return { id: item.id, processed: "deferred" };
}
});
const items: Item[] = [
{ id: "i1", priority: "normal" },
{ id: "i2", priority: "high" },
{ id: "i3", priority: "low" },
];
const processed = yield* Stream.fromIterable(items).pipe(
Stream.mapEffect((item) => transformByPriority(item)),
Stream.runCollect
);
yield* Effect.log(
`[CONDITIONAL] Processed ${processed.length} items\n`
);
// Example 9: Performance-optimized transformation
console.log(`[9] Optimized for performance:\n`);
const largeDataset = Array.from({ length: 1000 }, (_, i) => i);
const startTime = Date.now();
// Use efficient operators
const result = yield* Stream.fromIterable(largeDataset).pipe(
Stream.filter((n) => n % 2 === 0), // Keep even
Stream.take(100), // Limit to first 100
Stream.map((n) => n * 2), // Transform
Stream.runCollect
);
const elapsed = Date.now() - startTime;
yield* Effect.log(
`[PERF] Processed 1000 items in ${elapsed}ms, kept ${result.length} items`
);
});
Effect.runPromise(program);Build reusable operators:
const throttleMap = <A, B,>(
f: (a: A) => Effect.Effect<B>,
delayMs: number
) =>
(stream: Stream.Stream<A>) =>
stream.pipe(
Stream.mapEffect((value) =>
f(value).pipe(
Effect.tap(() =>
Effect.sleep(`${delayMs} millis`)
)
)
)
);
// Usage
const throttled = stream.pipe(
throttleMap((x) => Effect.succeed(x * 2), 10)
);Combine operators elegantly:
const compose = <A,>(
...operators: Array<(s: Stream.Stream<A>) => Stream.Stream<A>>
) =>
(stream: Stream.Stream<A>) =>
operators.reduce((s, op) => op(s), stream);
// Usage
const pipeline = compose(
filterPositive,
scaleUp(10),
withLogging("OUTPUT")
)(stream);✅ Use advanced transformations when:
- Complex data flows
- Performance critical
- Reusable operators needed
- Effect-based transformations
- Multiple transformation layers
✅ Use composition when:
- Building pipelines
- Reusing operators
- Readable code needed
- Testing individual steps
- More abstraction
- Debugging complexity
- Learning curve
- Potential overhead
| Pattern | When | Benefit |
|---|---|---|
| map | Simple transform | Direct |
| mapEffect | Effects needed | Composable |
| filter | Remove items | Selective |
| scan | State + output | Stateful |
| chunk | Batch processing | Efficient |
- Stream Pattern 1: Map & Filter - Basic transforms
- Stream Pattern 4: Stateful Operations - State in streams
- Stream Pattern 7: Error Handling - Error transforms
- Concurrency Pattern 5: PubSub - Event streams