Skip to content

Latest commit

 

History

History
522 lines (410 loc) · 12.8 KB

File metadata and controls

522 lines (410 loc) · 12.8 KB
title Stream Pattern 8: Advanced Stream Transformations
id stream-pattern-advanced-transformations
skillLevel advanced
applicationPatternId streams
summary Apply complex transformations across streams including custom operators, effect-based transformations, and composition patterns.
tags
streams
transformations
composition
effect-based
advanced-patterns
performance
rule
description
Use advanced stream operators to build sophisticated data pipelines that compose elegantly and maintain performance at scale.
related
stream-pattern-map-filter-transformations
stream-pattern-stateful-operations
stream-pattern-error-handling
author effect_website
lessonOrder 4

Guideline

Advanced transformations enable complex data flows:

  • Custom operators: Build reusable transformations
  • Effect-based: Transformations with side effects
  • Lazy evaluation: Compute only what's needed
  • Fusion: Optimize composed operations
  • Staging: Multiple transformation layers
  • Composition: Combine operators cleanly

Pattern: Stream.mapEffect(), Stream.map(), pipe composition


Rationale

Simple transformations don't scale:

Problem 1: Performance degradation

  • Each layer creates intermediate collection
  • 10 transformations = 10 allocations
  • Process 1M items = 10M allocations
  • GC pressure, memory exhaustion

Problem 2: Complex logic scattered

  • Validation here, enrichment there, filtering elsewhere
  • Hard to maintain
  • Changes break other parts
  • No clear data flow

Problem 3: Effect handling

  • Transformations need side effects
  • Network calls, database queries
  • Naive approach: load all, transform sequentially
  • Slow, inefficient

Problem 4: Reusability

  • Custom transformation used once
  • Next time, rewrite from scratch
  • Code duplication
  • Bugs replicated

Solutions:

Custom operators:

  • Encapsulate transformation logic
  • Reusable across projects
  • Testable in isolation
  • Composable

Lazy evaluation:

  • Compute as elements flow
  • No intermediate collections
  • Constant memory
  • Only compute what's used

Fusion:

  • Combine multiple maps/filters
  • Single pass through data
  • No intermediate collections
  • Compiler/library optimizes

Effect composition:

  • Chain effects naturally
  • Error propagation automatic
  • Resource cleanup guaranteed
  • Readable code

Good Example

This example demonstrates advanced stream transformations.

import { Effect, Stream, Ref, Chunk } from "effect";

interface LogEntry {
  timestamp: Date;
  level: "info" | "warn" | "error";
  message: string;
  context?: Record<string, unknown>;
}

interface Metric {
  name: string;
  value: number;
  tags: Record<string, string>;
}

const program = Effect.gen(function* () {
  console.log(`\n[ADVANCED STREAM TRANSFORMATIONS] Complex data flows\n`);

  // Example 1: Custom filter operator
  console.log(`[1] Custom filter with effect-based logic:\n`);

  const filterByEffect = <A,>(
    predicate: (a: A) => Effect.Effect<boolean>
  ) =>
    (stream: Stream.Stream<A>) =>
      stream.pipe(
        Stream.mapEffect((value) =>
          predicate(value).pipe(
            Effect.map((keep) => (keep ? value : null))
          )
        ),
        Stream.filter((value) => value !== null)
      );

  const isValid = (num: number): Effect.Effect<boolean> =>
    Effect.gen(function* () {
      // Simulate validation effect (e.g., API call)
      return num > 0 && num < 100;
    });

  const numbers = [50, 150, 25, -10, 75];

  const validNumbers = yield* Stream.fromIterable(numbers).pipe(
    filterByEffect(isValid),
    Stream.runCollect
  );

  yield* Effect.log(`[VALID] ${validNumbers.join(", ")}\n`);

  // Example 2: Enrichment transformation
  console.log(`[2] Enriching records with additional data:\n`);

  interface RawRecord {
    id: string;
    value: number;
  }

  interface EnrichedRecord {
    id: string;
    value: number;
    validated: boolean;
    processed: Date;
    metadata: Record<string, unknown>;
  }

  const enrich = (record: RawRecord): Effect.Effect<EnrichedRecord> =>
    Effect.gen(function* () {
      // Simulate lookup/validation
      const validated = record.value > 0;

      return {
        id: record.id,
        value: record.value,
        validated,
        processed: new Date(),
        metadata: { source: "stream" },
      };
    });

  const rawData = [
    { id: "r1", value: 10 },
    { id: "r2", value: -5 },
    { id: "r3", value: 20 },
  ];

  const enriched = yield* Stream.fromIterable(rawData).pipe(
    Stream.mapEffect((record) => enrich(record)),
    Stream.runCollect
  );

  yield* Effect.log(`[ENRICHED] ${enriched.length} records enriched\n`);

  // Example 3: Demultiplexing (split one stream into multiple)
  console.log(`[3] Demultiplexing by category:\n`);

  interface Event {
    id: string;
    type: "click" | "view" | "purchase";
    data: unknown;
  }

  const events: Event[] = [
    { id: "e1", type: "click", data: { x: 100, y: 200 } },
    { id: "e2", type: "view", data: { url: "/" } },
    { id: "e3", type: "purchase", data: { amount: 99.99 } },
    { id: "e4", type: "click", data: { x: 50, y: 100 } },
  ];

  const clicks = yield* Stream.fromIterable(events).pipe(
    Stream.filter((e) => e.type === "click"),
    Stream.runCollect
  );

  const views = yield* Stream.fromIterable(events).pipe(
    Stream.filter((e) => e.type === "view"),
    Stream.runCollect
  );

  const purchases = yield* Stream.fromIterable(events).pipe(
    Stream.filter((e) => e.type === "purchase"),
    Stream.runCollect
  );

  yield* Effect.log(
    `[DEMUX] Clicks: ${clicks.length}, Views: ${views.length}, Purchases: ${purchases.length}\n`
  );

  // Example 4: Chunked processing (batch transformation)
  console.log(`[4] Chunked processing (batches of N):\n`);

  const processChunk = (chunk: Array<{ id: string; value: number }>) =>
    Effect.gen(function* () {
      const sum = chunk.reduce((s, r) => s + r.value, 0);
      const avg = sum / chunk.length;

      yield* Effect.log(
        `[CHUNK] ${chunk.length} items, avg: ${avg.toFixed(2)}`
      );

      return { size: chunk.length, sum, avg };
    });

  const data = Array.from({ length: 10 }, (_, i) => ({
    id: `d${i}`,
    value: i + 1,
  }));

  const chunkSize = 3;
  const chunks = [];

  for (let i = 0; i < data.length; i += chunkSize) {
    const chunk = data.slice(i, i + chunkSize);

    chunks.push(chunk);
  }

  const chunkResults = yield* Effect.all(
    chunks.map((chunk) => processChunk(chunk))
  );

  yield* Effect.log(
    `[CHUNKS] Processed ${chunkResults.length} batches\n`
  );

  // Example 5: Multi-stage transformation pipeline
  console.log(`[5] Multi-stage pipeline (parse → validate → transform):\n`);

  const rawStrings = ["10", "twenty", "30", "-5", "50"];

  // Stage 1: Parse
  const parsed = yield* Stream.fromIterable(rawStrings).pipe(
    Stream.mapEffect((s) =>
      Effect.gen(function* () {
        try {
          return parseInt(s);
        } catch (error) {
          yield* Effect.fail(
            new Error(`Failed to parse: ${s}`)
          );
        }
      }).pipe(
        Effect.catchAll((error) =>
          Effect.gen(function* () {
            yield* Effect.log(`[PARSE ERROR] ${error.message}`);

            return null;
          })
        )
      )
    ),
    Stream.filter((n) => n !== null),
    Stream.runCollect
  );

  yield* Effect.log(`[STAGE 1] Parsed: ${parsed.join(", ")}`);

  // Stage 2: Validate
  const validated = parsed.filter((n) => n > 0);

  yield* Effect.log(`[STAGE 2] Validated: ${validated.join(", ")}`);

  // Stage 3: Transform
  const transformed = validated.map((n) => n * 2);

  yield* Effect.log(`[STAGE 3] Transformed: ${transformed.join(", ")}\n`);

  // Example 6: Composition of custom operators
  console.log(`[6] Composable transformation pipeline:\n`);

  // Define custom operator
  const withLogging = <A,>(label: string) =>
    (stream: Stream.Stream<A>) =>
      stream.pipe(
        Stream.tap((value) =>
          Effect.log(`[${label}] Processing: ${JSON.stringify(value)}`)
        )
      );

  const filterPositive = (stream: Stream.Stream<number>) =>
    stream.pipe(
      Stream.filter((n) => n > 0),
      Stream.tap(() => Effect.log(`[FILTER] Kept positive`))
    );

  const scaleUp = (factor: number) =>
    (stream: Stream.Stream<number>) =>
      stream.pipe(
        Stream.map((n) => n * factor),
        Stream.tap((n) =>
          Effect.log(`[SCALE] Scaled to ${n}`)
        )
      );

  const testData = [10, -5, 20, -3, 30];

  const pipeline = yield* Stream.fromIterable(testData).pipe(
    withLogging("INPUT"),
    filterPositive,
    scaleUp(10),
    Stream.runCollect
  );

  yield* Effect.log(`[RESULT] Final: ${pipeline.join(", ")}\n`);

  // Example 7: Stateful transformation
  console.log(`[7] Stateful transformation (running total):\n`);

  const runningTotal = yield* Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
    Stream.scan(0, (acc, value) => acc + value),
    Stream.runCollect
  );

  yield* Effect.log(`[TOTALS] ${runningTotal.join(", ")}\n`);

  // Example 8: Conditional transformation
  console.log(`[8] Conditional transformation (different paths):\n`);

  interface Item {
    id: string;
    priority: "high" | "normal" | "low";
  }

  const transformByPriority = (item: Item): Effect.Effect<{
    id: string;
    processed: string;
  }> =>
    Effect.gen(function* () {
      switch (item.priority) {
        case "high":
          yield* Effect.log(`[HIGH] Priority processing for ${item.id}`);

          return { id: item.id, processed: "urgent" };

        case "normal":
          yield* Effect.log(
            `[NORMAL] Standard processing for ${item.id}`
          );

          return { id: item.id, processed: "standard" };

        case "low":
          yield* Effect.log(`[LOW] Deferred processing for ${item.id}`);

          return { id: item.id, processed: "deferred" };
      }
    });

  const items: Item[] = [
    { id: "i1", priority: "normal" },
    { id: "i2", priority: "high" },
    { id: "i3", priority: "low" },
  ];

  const processed = yield* Stream.fromIterable(items).pipe(
    Stream.mapEffect((item) => transformByPriority(item)),
    Stream.runCollect
  );

  yield* Effect.log(
    `[CONDITIONAL] Processed ${processed.length} items\n`
  );

  // Example 9: Performance-optimized transformation
  console.log(`[9] Optimized for performance:\n`);

  const largeDataset = Array.from({ length: 1000 }, (_, i) => i);

  const startTime = Date.now();

  // Use efficient operators
  const result = yield* Stream.fromIterable(largeDataset).pipe(
    Stream.filter((n) => n % 2 === 0), // Keep even
    Stream.take(100), // Limit to first 100
    Stream.map((n) => n * 2), // Transform
    Stream.runCollect
  );

  const elapsed = Date.now() - startTime;

  yield* Effect.log(
    `[PERF] Processed 1000 items in ${elapsed}ms, kept ${result.length} items`
  );
});

Effect.runPromise(program);

Advanced: Custom Stream Operator

Build reusable operators:

const throttleMap = <A, B,>(
  f: (a: A) => Effect.Effect<B>,
  delayMs: number
) =>
  (stream: Stream.Stream<A>) =>
    stream.pipe(
      Stream.mapEffect((value) =>
        f(value).pipe(
          Effect.tap(() =>
            Effect.sleep(`${delayMs} millis`)
          )
        )
      )
    );

// Usage
const throttled = stream.pipe(
  throttleMap((x) => Effect.succeed(x * 2), 10)
);

Advanced: Composition Patterns

Combine operators elegantly:

const compose = <A,>(
  ...operators: Array<(s: Stream.Stream<A>) => Stream.Stream<A>>
) =>
  (stream: Stream.Stream<A>) =>
    operators.reduce((s, op) => op(s), stream);

// Usage
const pipeline = compose(
  filterPositive,
  scaleUp(10),
  withLogging("OUTPUT")
)(stream);

When to Use This Pattern

Use advanced transformations when:

  • Complex data flows
  • Performance critical
  • Reusable operators needed
  • Effect-based transformations
  • Multiple transformation layers

Use composition when:

  • Building pipelines
  • Reusing operators
  • Readable code needed
  • Testing individual steps

⚠️ Trade-offs:

  • More abstraction
  • Debugging complexity
  • Learning curve
  • Potential overhead

Transformation Patterns

Pattern When Benefit
map Simple transform Direct
mapEffect Effects needed Composable
filter Remove items Selective
scan State + output Stateful
chunk Batch processing Efficient

See Also