Skip to content

Conversation

@alco
Copy link
Member

@alco alco commented Jan 20, 2026

Summary

Implements fragment-direct streaming for shape consumers, allowing transaction fragments to be written directly to storage as they arrive instead of buffering the entire transaction in memory until commit.

Closes #3415

Motivation

Previously, the consumer would buffer all changes in a transaction in memory until a Commit message arrived. For large transactions, this could cause significant memory pressure. This PR enables shapes without subquery dependencies to stream fragments directly to storage, reducing memory usage while maintaining crash-safety guarantees.

Key Changes

  • Fragment-direct mode: Shapes without subquery dependencies now write transaction fragments directly to storage via append_fragment_to_log!/2
  • Crash-safety invariant: fetch_latest_offset only returns committed transaction offsets, ensuring recovery correctness even if uncommitted fragment data is flushed to disk
  • Fallback to TransactionBuilder: Shapes with dependencies, during initial filtering, or with materializer subscriptions continue using the existing buffered approach

Limitations

Fragment-direct mode is only enabled when:

  • Shape has no subquery dependencies
  • Not buffering for initial snapshot
  • No materializer subscribed (inner shapes need full transaction handling)
  • Initial snapshot filtering is complete

@coderabbitai
Copy link

coderabbitai bot commented Jan 20, 2026

Important

Review skipped

Auto reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

  • 🔍 Trigger a full review

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov
Copy link

codecov bot commented Jan 20, 2026

❌ 36 Tests Failed:

Tests completed Failed Passed Skipped
2146 36 2110 0
View the top 3 failed test(s) by shortest run time
Elixir.Electric.Shapes.ConsumerTest::test fragment-direct streaming uncommitted fragments flushed to disk do not advance latest_offset
Stack Traces | 0.0793s run time
3) test fragment-direct streaming uncommitted fragments flushed to disk do not advance latest_offset (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:2111
     Assertion with == failed
     code:  assert offset_after_committed == LogOffset.new(lsn_init, 0)
     left:  LogOffset.last_before_real_offsets()
     right: LogOffset.new(50, 0)
     stacktrace:
       .../electric/shapes/consumer_test.exs:2144: (test)
Elixir.Electric.Shapes.ConsumerTest::test fragment-direct streaming crash/restart with partial fragments persisted recovers correctly
Stack Traces | 0.105s run time
22) test fragment-direct streaming crash/restart with partial fragments persisted recovers correctly (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:1703
     Assertion with == failed
     code:  assert latest_offset == initial_offset
     left:  LogOffset.last_before_real_offsets()
     right: LogOffset.new(0, 0)
     stacktrace:
       .../electric/shapes/consumer_test.exs:1757: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes DELETE forces the shape handle to be different on reconnect and new snapshot to be created
Stack Traces | 0.12s run time
52) test /v1/shapes DELETE forces the shape handle to be different on reconnect and new snapshot to be created (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:246
     match (=) failed
     code:  assert %{status: 200} = conn
     left:  %{status: 200}
     right: %Plug.Conn{adapter: {Plug.Adapters.Test.Conn, :...}, assigns: %{request: %Electric.Shapes.Api.Request{chunk_end_offset: LogOffset.new(0, 0), handle: "23292935-1769515187036828", last_offset: LogOffset.last_before_real_offsets(), global_last_seen_lsn: 89913832, new_changes_ref: nil, new_changes_pid: nil, api: %Electric.Shapes.Api{inspector: {Electric.Postgres.Inspector.EtsInspector, [stack_id: "Electric.Plug.RouterTest test /v1/shapes DELETE forces the shape handle to be different on reconnect and new snapshot to be created", server: {:via, Registry, {:"Electric.ProcessRegistry:Electric.Plug.RouterTest test /v1/shapes DELETE forces the shape handle to be different on reconnect and new snapshot to be created", {Electric.Postgres.Inspector.EtsInspector, nil}}}]}, shape: nil, stack_id: "Electric.Plug.RouterTest test /v1/shapes DELETE forces the shape handle to be different on reconnect and new snapshot to be created", feature_flags: ["allow_subqueries", "tagged_subqueries"], max_concurrent_requests: %{existing: 1000, initial: 300}, allow_shape_deletion: true, keepalive_interval: 21000, long_poll_timeout: 4000, sse_timeout: 60000, max_age: 60, stack_ready_timeout: 5000, stale_age: 300, send_cache_headers?: true, encoder: Electric.Shapes.Api.Encoder.JSON, sse_encoder: Electric.Shapes.Api.Encoder.SSE, configured: true}, params: %Electric.Shapes.Api.Params{table: "items", offset: LogOffset.before_all(), handle: nil, live: false, where: nil, columns: nil, shape_definition: Shape.new!({21924, "public.items"}), replica: :default, params: %{}, experimental_compaction: false, live_sse: false, log: :full, subset: nil}, response: %Electric.Shapes.Api.Response{handle: "23292935-1769515187036828", offset: LogOffset.new(0, 0), shape_definition: Shape.new!({21924, "public.items"}), known_error: nil, retry_after: nil, api: %Electric.Shapes.Api{inspector: {Electric.Postgres.Inspector.EtsInspector, [stack_id: "Electric.Plug.RouterTest test /v1/shapes DELETE forces the shape handle to be different on reconnect and new snapshot to be created", server: {:via, Registry, {:"Electric.ProcessRegistry:Electric.Plug.RouterTest test /v1/shapes DELETE forces the shape handle to be different on reconnect and new snapshot to be created", {Electric.Postgres.Inspector.EtsInspector, nil}}}]}, shape: nil, stack_id: "Electric.Plug.RouterTest test /v1/shapes DELETE forces the shape handle to be different on reconnect and new snapshot to be created", feature_flags: ["allow_subqueries", "tagged_subqueries"], max_concurrent_requests: %{existing: 1000, initial: 300}, allow_shape_deletion: true, keepalive_interval: 21000, long_poll_timeout: 4000, sse_timeout: 60000, max_age: 60, stack_ready_timeout: 5000, stale_age: 300, send_cache_headers?: true, encoder: Electric.Shapes.Api.Encoder.JSON, sse_encoder: Electric.Shapes.Api.Encoder.SSE, configured: true}, chunked: false, up_to_date: false, no_changes: false, response_type: :normal_log, params: %Electric.Shapes.Api.Params{...}, ...}}, ...}, ...}
     stacktrace:
       .../electric/plug/router_test.exs:278: (test)
Elixir.Electric.Shapes.ConsumerTest::test transaction handling with real storage should hibernate not suspend if has dependencies
Stack Traces | 0.165s run time
32) test transaction handling with real storage should hibernate not suspend if has dependencies (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:851
     match (=) failed
     code:  assert {:current_function, {:gen_server, :loop_hibernate, 4}} =
              Process.info(consumer_pid, :current_function)
     left:  {:current_function, {:gen_server, :loop_hibernate, 4}}
     right: {:current_function, {:gen_server, :loop, 5}}
     stacktrace:
       .../electric/shapes/consumer_test.exs:884: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes GET receives only specified columns out of wide table
Stack Traces | 0.215s run time
65) test /v1/shapes GET receives only specified columns out of wide table (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:1016
     Unexpectedly received message :got_response (which matched :got_response)
     code: refute_receive :got_response, 1000
     stacktrace:
       .../electric/plug/router_test.exs:1053: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries subqueries work with params
Stack Traces | 0.219s run time
10) test /v1/shapes - subqueries subqueries work with params (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2745
     match (=) failed
     code:  assert {req, 200, [%{"value" => %{"id" => "2", "other_value" => "20"}}, _]} = Task.await(task)
     left:  {req, 200, [%{"value" => %{"id" => "2", "other_value" => "20"}}, _]}
     right: {%{handle: "123774910-1769515198816734", offset: "0_0", table: "child", where: "value in (SELECT value FROM parent WHERE other_value >= $2) AND other_value >= $1", params: %{"1" => "10", "2" => "6"}, live: true}, 409, [%{"headers" => %{"control" => "must-refetch"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2767: (test)
Elixir.Electric.Shapes.ConsumerTest::test transaction handling with real storage transactions are buffered until snapshot xmin is known
Stack Traces | 0.234s run time
17) test transaction handling with real storage transactions are buffered until snapshot xmin is known (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:660
     ** (MatchError) no match of right hand side value:

         {:error, "Shape terminated before snapshot was ready"}

     code: :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id)
     stacktrace:
       .../electric/shapes/consumer_test.exs:703: (test)
Elixir.Electric.Integration.SubqueryMoveOutTest::test tag handling during updates deactivating old parent after child changed parents should not generate delete
Stack Traces | 0.25s run time
9) test tag handling during updates deactivating old parent after child changed parents should not generate delete (Electric.Integration.SubqueryMoveOutTest)
     test/integration/subquery_move_out_test.exs:225
     Received unexpected message: %Electric.Client.Message.ControlMessage{control: :must_refetch, global_last_seen_lsn: nil, handle: "125772287-1769515205986111-next", request_timestamp: ~U[2026-01-27 12:00:06.060571Z]}
     code: assert_update(consumer, %{"id" => "child-1"})
     stacktrace:
       test/integration/subquery_move_out_test.exs:251: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries a move-in from the inner shape causes a query and new entries in the outer shape
Stack Traces | 0.254s run time
19) test /v1/shapes - subqueries a move-in from the inner shape causes a query and new entries in the outer shape (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2294
     match (=) failed
     code:  assert {_, 200, [data, %{"headers" => %{"control" => "snapshot-end"}}, up_to_date_ctl()]} =
              Task.await(task)
     left:  {_, 200, [data, %{"headers" => %{"control" => "snapshot-end"}}, up_to_date_ctl()]}
     right: {%{handle: "59615724-1769515192779865", offset: "0_0", table: "child", where: "parent_id in (SELECT id FROM parent WHERE value = 1)", live: true}, 409, [%{"headers" => %{"control" => "must-refetch"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2319: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries allows subquery in where clause
Stack Traces | 0.259s run time
33) test /v1/shapes - subqueries allows subquery in where clause (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2198
     match (=) failed
     code:  assert %{status: 200} = conn = Task.await(task)
     left:  %{status: 200}
     right: %Plug.Conn{adapter: {Plug.Adapters.Test.Conn, :...}, assigns: %{request: %Electric.Shapes.Api.Request{chunk_end_offset: LogOffset.last_before_real_offsets(), handle: "8128451-1769515174116412", last_offset: LogOffset.last_before_real_offsets(), global_last_seen_lsn: 0, new_changes_ref: #Reference<0.3672906850.3497263106.186721>, new_changes_pid: #PID<0.26878.0>, api: %Electric.Shapes.Api{inspector: {Electric.Postgres.Inspector.EtsInspector, [stack_id: "Electric.Plug.RouterTest test /v1/shapes - subqueries allows subquery in where clause", server: {:via, Registry, {:"Electric.ProcessRegistry:Electric.Plug.RouterTest test /v1/shapes - subqueries allows subquery in where clause", {Electric.Postgres.Inspector.EtsInspector, nil}}}]}, shape: nil, stack_id: "Electric.Plug.RouterTest test /v1/shapes - subqueries allows subquery in where clause", feature_flags: ["allow_subqueries", "tagged_subqueries"], max_concurrent_requests: %{existing: 1000, initial: 300}, allow_shape_deletion: true, keepalive_interval: 21000, long_poll_timeout: 4000, sse_timeout: 60000, max_age: 60, stack_ready_timeout: 5000, stale_age: 300, send_cache_headers?: true, encoder: Electric.Shapes.Api.Encoder.JSON, sse_encoder: Electric.Shapes.Api.Encoder.SSE, configured: true}, params: %Electric.Shapes.Api.Params{table: "child", offset: LogOffset.new(0, 0), handle: "8128451-1769515174116412", live: true, where: "parent_id in (SELECT id FROM parent WHERE value = 1)", columns: nil, shape_definition: Shape.new!({21647, "public.child"}, where: "parent_id IN (SELECT id FROM public.parent WHERE value = 1)", deps: [Shape.new!({21642, "public.parent"}, where: "value = 1", columns: ["id"])]), replica: :default, params: %{}, experimental_compaction: false, live_sse: false, log: :full, subset: nil}, response: %Electric.Shapes.Api.Response{handle: "8128451-1769515174116412", offset: LogOffset.last_before_real_offsets(), shape_definition: Shape.new!({21647, "public.child"}, where: "parent_id IN (SELECT id FROM public.parent WHERE value = 1)", deps: [Shape.new!({21642, "public.parent"}, where: "value = 1", columns: ["id"])]), known_error: nil, retry_after: nil, api: %Electric.Shapes.Api{inspector: {Electric.Postgres.Inspector.EtsInspector, [stack_id: "Electric.Plug.RouterTest test /v1/shapes - subqueries allows subquery in where clause", server: {:via, Registry, {:"Electric.ProcessRegistry:Electric.Plug.RouterTest test /v1/shapes - subqueries allows subquery in where clause", {Electric.Postgres.Inspector.EtsInspector, nil}}}]}, shape: nil, stack_id: "Electric.Plug.RouterTest test /v1/shapes - subqueries allows subquery in where clause", feature_flags: ["allow_subqueries", "tagged_subqueries"], max_concurrent_requests: %{existing: 1000, initial: 300}, allow_shape_deletion: true, keepalive_interval: 21000, long_poll_timeout: 4000, sse_timeout: 60000, max_age: 60, stack_ready_timeout: 5000, stale_age: 300, send_cache_headers?: true, encoder: Electric.Shapes.Api.Encoder.JSON, sse_encoder: Electric.Shapes.Api.Encoder.SSE, configured: true}, chunked: false, up_to_date: true, no_changes: false, response_type: :normal_log, params: %Electric.Shapes.Api.Params{...}, ...}}, ...}, ...}
     stacktrace:
       .../electric/plug/router_test.exs:2238: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries allows subquery in where clauses that reference non-PK columns
Stack Traces | 0.279s run time
77) test /v1/shapes - subqueries allows subquery in where clauses that reference non-PK columns (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2373
     match (=) failed
     code:  assert {req, 200, [%{"value" => %{"id" => "1", "other_value" => "2"}}, _]} = Task.await(task)
     left:  {req, 200, [%{"value" => %{"id" => "1", "other_value" => "2"}}, _]}
     right: {%{handle: "44301228-1769515132823972", offset: "0_0", table: "child", where: "value in (SELECT value FROM parent WHERE other_value >= 10)", live: true}, 409, [%{"headers" => %{"control" => "must-refetch"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2398: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries table with a composite PK can be used in a subquery
Stack Traces | 0.287s run time
28) test /v1/shapes - subqueries table with a composite PK can be used in a subquery (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2599
     match (=) failed
     code:  assert {req, 200, [%{"value" => %{"id" => "1", "name" => "Team C"}}, _]} = Task.await(task)
     left:  {req, 200, [%{"value" => %{"id" => "1", "name" => "Team C"}}, _]}
     right: {%{handle: "124011207-1769515180711527", offset: "0_0", table: "teams", where: "id IN (SELECT team_id FROM members WHERE user_id = 1)", live: true}, 409, [%{"headers" => %{"control" => "must-refetch"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2619: (test)
Elixir.Electric.Integration.SubqueryMoveOutTest::test tag handling during updates update that changes parent reference updates tags
Stack Traces | 0.318s run time
7) test tag handling during updates update that changes parent reference updates tags (Electric.Integration.SubqueryMoveOutTest)
     test/integration/subquery_move_out_test.exs:186
     Received unexpected message: %Electric.Client.Message.ControlMessage{control: :must_refetch, global_last_seen_lsn: nil, handle: "93295670-1769515206548916-next", request_timestamp: ~U[2026-01-27 12:00:06.656240Z]}
     code: update_msg = assert_update(consumer, %{"id" => "child-1"})
     stacktrace:
       test/integration/subquery_move_out_test.exs:209: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries subqueries can reference composite PKs
Stack Traces | 0.323s run time
66) test /v1/shapes - subqueries subqueries can reference composite PKs (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2672
     match (=) failed
     code:  assert {req, 200, [%{"value" => %{"id" => "1", "role" => "Admin"}}, _]} = Task.await(task)
     left:  {req, 200, [%{"value" => %{"id" => "1", "role" => "Admin"}}, _]}
     right: {%{handle: "79670753-1769515146118117", offset: "0_0", table: "member_details", where: "(user_id, team_id) IN (SELECT user_id, team_id FROM members WHERE flag = TRUE)", live: true}, 409, [%{"headers" => %{"control" => "must-refetch"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2692: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries move-outs while processing move-ins are handled correctly
Stack Traces | 0.342s run time
30) test /v1/shapes - subqueries move-outs while processing move-ins are handled correctly (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2938
     match (=) failed
     The following variables were pinned:
       tag = "7ac963f0c1f021bd70a3c83f50744988"
     code:  assert {_req, 200,
             [
               %{"headers" => %{"event" => "move-out", "patterns" => [%{"pos" => 0, "value" => ^tag}]}},
               up_to_date_ctl()
             ]} = Task.await(task)
     left:  {_req, 200,
             [
               %{"headers" => %{"event" => "move-out", "patterns" => [%{"pos" => 0, "value" => ^tag}]}},
               up_to_date_ctl()
             ]}
     right: {%{handle: "82874018-1769515176061125", offset: "1_1", table: "child", where: "parent_id in (SELECT id FROM parent WHERE value = 1)", live: true}, 409, [%{"headers" => %{"control" => "must-refetch"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2964: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries multiple actions result in a correct event sequence
Stack Traces | 0.429s run time
64) test /v1/shapes - subqueries multiple actions result in a correct event sequence (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2823
     match (=) failed
     The following variables were pinned:
       tag = "13af60aff16d13d17b8c47731e965c13"
     code:  assert {req, 200,
             [
               %{"headers" => %{"event" => "move-out"}},
               %{
                 "headers" => %{"operation" => "insert", "is_move_in" => true, "tags" => [tag2]},
                 "value" => %{"parent_id" => "2", "value" => "12"}
               },
               %{"headers" => %{"control" => "snapshot-end"}},
               %{
                 "headers" => %{"operation" => "insert", "is_move_in" => true, "tags" => [^tag]},
                 "value" => %{"id" => "1", "parent_id" => "1", "value" => "13"}
               },
               %{"headers" => %{"control" => "snapshot-end"}},
               up_to_date_ctl()
             ]} = shape_req(req, ctx.opts)
     left:  {req, 200,
             [
               %{"headers" => %{"event" => "move-out"}},
               %{
                 "headers" => %{"operation" => "insert", "is_move_in" => true, "tags" => [tag2]},
                 "value" => %{"parent_id" => "2", "value" => "12"}
               },
               %{"headers" => %{"control" => "snapshot-end"}},
               %{
                 "headers" => %{"operation" => "insert", "is_move_in" => true, "tags" => [^tag]},
                 "value" => %{"id" => "1", "parent_id" => "1", "value" => "13"}
               },
               %{"headers" => %{"control" => "snapshot-end"}},
               up_to_date_ctl()
             ]}
     right: {%{handle: "95949410-1769515146666308", offset: "0_0", table: "child", where: "parent_id in (SELECT id FROM parent WHERE value = 1)", live: false}, 409, [%{"headers" => %{"control" => "must-refetch"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2848: (test)
Elixir.Electric.Shapes.ConsumerTest::test event handling doesn't append to log when change is irrelevant for active shapes
Stack Traces | 0.429s run time
24) test event handling doesn't append to log when change is irrelevant for active shapes (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:286
     Assertion failed, no matching message after 400ms
     Showing 10 of 14 messages in the mailbox
     code: assert_receive {Support.TestStorage, :append_to_log!, @shape_handle2, _}
     mailbox:
       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "Electric.Shapes.ConsumerTest-shape1"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :for_shape, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :fetch_latest_offset, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :fetch_pg_snapshot, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :snapshot_started?, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :set_pg_snapshot, "Electric.Shapes.ConsumerTest-shape2", %{xmin: 120, xmax: 121, xip_list: ~c"x", filter_txns?: false}}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :append_fragment_to_log!, "Electric.Shapes.ConsumerTest-shape2", [{LogOffset.new(16, 0), "\"public\".\"test_table\"/\"1\"", :insert, "{\"value\":{\"id\":\"1\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"1\\\"\",\"headers\":{\"last\":true,\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"16\",\"txids\":[150],\"op_position\":0}}"}]}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :signal_txn_commit!, "Electric.Shapes.ConsumerTest-shape2", 150}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {#Reference<0.3672906850.3497000964.59019>, :new_changes, LogOffset.new(16, 0)}
     stacktrace:
       .../electric/shapes/consumer_test.exs:305: (test)
Elixir.Electric.Shapes.ConsumerTest::test event handling correctly writes only relevant changes to multiple shape logs
Stack Traces | 0.435s run time
10) test event handling correctly writes only relevant changes to multiple shape logs (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:229
     Assertion failed, no matching message after 400ms
     Showing 10 of 16 messages in the mailbox
     code: assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1,
            [{_offset, _key, _type, serialized_record}]}
     mailbox:
       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :fetch_latest_offset, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :fetch_pg_snapshot, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :snapshot_started?, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :set_pg_snapshot, "Electric.Shapes.ConsumerTest-shape1", %{xmin: 100, xmax: 101, xip_list: ~c"d", filter_txns?: false}}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :append_fragment_to_log!, "Electric.Shapes.ConsumerTest-shape1", [{LogOffset.new(16, 0), "\"public\".\"test_table\"/\"1\"", :insert, "{\"value\":{\"id\":\"1\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"1\\\"\",\"headers\":{\"last\":true,\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"16\",\"txids\":[150],\"op_position\":0}}"}]}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :signal_txn_commit!, "Electric.Shapes.ConsumerTest-shape1", 150}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :set_pg_snapshot, "Electric.Shapes.ConsumerTest-shape2", %{xmin: 120, xmax: 121, xip_list: ~c"x", filter_txns?: false}}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :append_fragment_to_log!, "Electric.Shapes.ConsumerTest-shape2", [{LogOffset.new(16, 1), "\"public\".\"other_table\"/\"2\"", :insert, "{\"value\":{\"id\":\"2\"},\"key\":\"\\\"public\\\".\\\"other_table\\\"/\\\"2\\\"\",\"headers\":{\"last\":true,\"relation\":[\"public\",\"other_table\"],\"operation\":\"insert\",\"lsn\":\"16\",\"txids\":[150],\"op_position\":1}}"}]}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :signal_txn_commit!, "Electric.Shapes.ConsumerTest-shape2", 150}
     stacktrace:
       .../electric/shapes/consumer_test.exs:269: (test)
Elixir.Electric.Shapes.ConsumerTest::test event handling appends to log when xid >= xmin
Stack Traces | 0.481s run time
9) test event handling appends to log when xid >= xmin (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:188
     Assertion failed, no matching message after 400ms
     Showing 10 of 12 messages in the mailbox
     code: assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _}
     mailbox:
       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :fetch_pg_snapshot, "Electric.Shapes.ConsumerTest-shape1"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :snapshot_started?, "Electric.Shapes.ConsumerTest-shape1"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "Electric.Shapes.ConsumerTest-shape1"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :for_shape, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :fetch_latest_offset, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :fetch_pg_snapshot, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :snapshot_started?, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :append_fragment_to_log!, "Electric.Shapes.ConsumerTest-shape1", [{LogOffset.new(16, 0), "\"public\".\"test_table\"/\"1\"", :insert, "{\"value\":{\"id\":\"1\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"1\\\"\",\"headers\":{\"last\":true,\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"16\",\"txids\":[100],\"op_position\":0}}"}]}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :signal_txn_commit!, "Electric.Shapes.ConsumerTest-shape1", 100}
     stacktrace:
       .../electric/shapes/consumer_test.exs:211: (test)
Elixir.Electric.Shapes.ConsumerTest::test event handling notifies listeners of new changes
Stack Traces | 0.507s run time
28) test event handling notifies listeners of new changes (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:394
     Assertion failed, no matching message after 400ms
     Showing 10 of 13 messages in the mailbox
     code: assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _}
     mailbox:
       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :snapshot_started?, "Electric.Shapes.ConsumerTest-shape1"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "Electric.Shapes.ConsumerTest-shape1"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :for_shape, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :fetch_latest_offset, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :fetch_pg_snapshot, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :snapshot_started?, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :set_pg_snapshot, "Electric.Shapes.ConsumerTest-shape1", %{xmin: 100, xmax: 101, xip_list: ~c"d", filter_txns?: false}}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :append_fragment_to_log!, "Electric.Shapes.ConsumerTest-shape1", [{LogOffset.new(16, 0), "\"public\".\"test_table\"/\"1\"", :insert, "{\"value\":{\"id\":\"1\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"1\\\"\",\"headers\":{\"last\":true,\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"16\",\"txids\":[150],\"op_position\":0}}"}]}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :signal_txn_commit!, "Electric.Shapes.ConsumerTest-shape1", 150}
     stacktrace:
       .../electric/shapes/consumer_test.exs:413: (test)
Elixir.Electric.Shapes.ConsumerTest::test transaction handling with real storage should terminate after :hibernate_after ms
Stack Traces | 1.02s run time
7) test transaction handling with real storage should terminate after :hibernate_after ms (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:816
     Assertion failed, no matching message after 1000ms
     Showing 1 of 1 message in the mailbox
     code: assert_receive {:flush_boundary_updated, 300}
     mailbox:
       pattern: {:flush_boundary_updated, 300}
       value:   {:snapshot, "27335419-1769515115703161"}
     stacktrace:
       .../electric/shapes/consumer_test.exs:840: (test)
Elixir.Electric.Shapes.ConsumerTest::test transaction handling with real storage should notify txns skipped because of xmin/xip as flushed
Stack Traces | 1.03s run time
6) test transaction handling with real storage should notify txns skipped because of xmin/xip as flushed (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:779
     Assertion failed, no matching message after 1000ms
     Showing 1 of 1 message in the mailbox
     code: assert_receive {:flush_boundary_updated, 300}
     mailbox:
       pattern: {:flush_boundary_updated, 300}
       value:   {:snapshot, "27335419-1769515116847856"}
     stacktrace:
       .../electric/shapes/consumer_test.exs:810: (test)
Elixir.Electric.Shapes.ConsumerTest::test fragment-direct streaming interleaved begin fragments raise an error
Stack Traces | 1.05s run time
27) test fragment-direct streaming interleaved begin fragments raise an error (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:1619
     match (=) failed
     code:  assert {%RuntimeError{message: message}, _stacktrace} = reason
     left:  {%RuntimeError{message: message}, _stacktrace}
     right: {:function_clause, [{Electric.ShapeCache.PureFileStorage, :"REPATCH-perform_scheduled_flush", [{#PID<0.15740.0>, "27335419-1769515104290909", {[], {Electric.ShapeCache.PureFileStorage, %{tmp_dir: ".../sync-service/tmp/Electric.Shapes.ConsumerTest/test-fragment-direct-streaming-interleaved-begin-fragments-raise-an-error-267273f7/.tmp/Electric.Shapes.ConsumerTest test fragment-direct streaming interleaved begin fragments raise an error", base_path: ".../sync-service/tmp/Electric.Shapes.ConsumerTest/test-fragment-direct-streaming-interleaved-begin-fragments-raise-an-error-267273f7/Electric.Shapes.ConsumerTest test fragment-direct streaming interleaved begin fragments raise an error", stack_id: "Electric.Shapes.ConsumerTest test fragment-direct streaming interleaved begin fragments raise an error", chunk_bytes_threshold: 10485760, flush_period: 1000, compaction_config: %{period: 600000, keep_complete_chunks: 2}}}}, {Electric.ShapeCache.PureFileStorage, {:writer_state, {:writer_acc, [{{200, 0}, "{\"value\":{\"id\":\"2\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"2\\\"\",\"headers\":{\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"200\",\"txids\":[60],\"op_position\":0}}"}, {{100, 0}, "{\"value\":{\"id\":\"1\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"1\\\"\",\"headers\":{\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"100\",\"txids\":[50],\"op_position\":0}}"}, {{50, 0}, "{\"value\":{\"id\":\"init\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"init\\\"\",\"headers\":{\"last\":true,\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"50\",\"txids\":[100],\"op_position\":0}}"}], [[[[], <<0, 0, 0, 0, 0, 0, 0, 50, 0, 0, 0, 0, 0, 0, 0, 0>>, <<0, 0, 0, 28>>, "\"public\".\"test_table\"/\"init\"", <<105, 0, 0, 0, 0, 0, 0, 0, 0, 187>>, "{\"value\":{\"id\":\"init\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"init\\\"\",\"headers\":{\"last\":true,\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"50\",\"txids\":[100],\"op_position\":0}}"], <<0, 0, 0, 0, 0, 0, 0, 100, 0, 0, 0, 0, 0, 0, 0, 0>>, ...], ...], ...}, ...}}}, ...], ...}, ...]}
     stacktrace:
       .../electric/shapes/consumer_test.exs:1694: (test)
Elixir.Electric.Shapes.ConsumerTest::test fragment-direct streaming fragment-direct mode disabled during initial filtering phase
Stack Traces | 1.05s run time
18) test fragment-direct streaming fragment-direct mode disabled during initial filtering phase (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:1469
     Assertion failed, no matching message after 1000ms
     Showing 9 of 9 messages in the mailbox
     code: assert_receive {:flush_boundary_updated, 100}
     mailbox:
       pattern: {:flush_boundary_updated, 100}
       value:   {Support.TestStorage, :for_shape, "27335419-1769515109741644"}

       pattern: {:flush_boundary_updated, 100}
       value:   {Support.TestStorage, :init_writer!, "27335419-1769515109741644", Shape.new!({36849815, "public.test_table"})}

       pattern: {:flush_boundary_updated, 100}
       value:   {Support.TestStorage, :fetch_latest_offset, "27335419-1769515109741644"}

       pattern: {:flush_boundary_updated, 100}
       value:   {Support.TestStorage, :fetch_pg_snapshot, "27335419-1769515109741644"}

       pattern: {:flush_boundary_updated, 100}
       value:   {Support.TestStorage, :snapshot_started?, "27335419-1769515109741644"}

       pattern: {:flush_boundary_updated, 100}
       value:   {:snapshot, "27335419-1769515109741644"}

       pattern: {:flush_boundary_updated, 100}
       value:   {Support.TestStorage, :make_new_snapshot!, "27335419-1769515109741644", []}

       pattern: {:flush_boundary_updated, 100}
       value:   {Support.TestStorage, :set_pg_snapshot, "27335419-1769515109741644", %{xmin: 10, xmax: 15, xip_list: ~c"\f", filter_txns?: true}}

       pattern: {:flush_boundary_updated, 100}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "27335419-1769515109741644"}
     stacktrace:
       .../electric/shapes/consumer_test.exs:1504: (test)
Elixir.Electric.Shapes.ConsumerTest::test transaction handling with real storage ConsumerRegistry.enable_suspend should suspend hibernated consumers
Stack Traces | 1.06s run time
25) test transaction handling with real storage ConsumerRegistry.enable_suspend should suspend hibernated consumers (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:904
     Assertion failed, no matching message after 1000ms
     Showing 1 of 1 message in the mailbox
     code: assert_receive {:flush_boundary_updated, 300}
     mailbox:
       pattern: {:flush_boundary_updated, 300}
       value:   {:snapshot, "27335419-1769515105759375"}
     stacktrace:
       .../electric/shapes/consumer_test.exs:928: (test)
Elixir.Electric.Shapes.ConsumerTest::test transaction handling with real storage UPDATE during pending move-in is converted to INSERT and query result skips duplicate key
Stack Traces | 1.17s run time
20) test transaction handling with real storage UPDATE during pending move-in is converted to INSERT and query result skips duplicate key (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:987
     Assertion failed, no matching message after 1000ms
     The following variables were pinned:
       ref = #Reference<0.3672906850.3497000962.56677>
     Showing 4 of 4 messages in the mailbox
     code: assert_receive {^ref, :new_changes, _offset}
     mailbox:
       pattern: {^ref, :new_changes, _offset}
       value:   {:snapshot, "26585834-1769515108045286"}

       pattern: {^ref, :new_changes, _offset}
       value:   {:snapshot, "51496110-1769515108048909"}

       pattern: {^ref, :new_changes, _offset}
       value:   {#Reference<0.3672906850.3497000962.56677>, :shape_rotation}

       pattern: {^ref, :new_changes, _offset}
       value:   {Electric.ShapeCache.ShapeCleaner, :cleanup, "51496110-1769515108048909"}
     stacktrace:
       .../electric/shapes/consumer_test.exs:1078: (test)
Elixir.Electric.Shapes.ConsumerTest::test fragment-direct streaming commit-only fragment with no relevant changes still signals commit
Stack Traces | 1.2s run time
35) test fragment-direct streaming commit-only fragment with no relevant changes still signals commit (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:1798
     Assertion failed, no matching message after 1000ms
     The following variables were pinned:
       ref = #Reference<0.3672906850.3497000962.35546>
     Showing 10 of 16 messages in the mailbox
     code: assert_receive {^ref, :new_changes, offset}
     mailbox:
       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :make_new_snapshot!, "44903071-1769515101384795", []}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :set_pg_snapshot, "44903071-1769515101384795", %{xmin: 10, xmax: 11, xip_list: ~c"\n", filter_txns?: true}}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "44903071-1769515101384795"}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :set_pg_snapshot, "44903071-1769515101384795", %{xmin: 10, xmax: 11, xip_list: ~c"\n", filter_txns?: false}}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :append_fragment_to_log!, "44903071-1769515101384795", [{LogOffset.new(100, 0), "\"public\".\"test_table\"/\"1\"", :insert, "{\"value\":{\"id\":\"1\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"1\\\"\",\"headers\":{\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"100\",\"txids\":[50],\"op_position\":0}}"}]}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :terminate, "44903071-1769515101384795"}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :cleanup!, "44903071-1769515101384795"}

       pattern: {^ref, :new_changes, offset}
       value:   {#Reference<0.3672906850.3497000962.35546>, :shape_rotation}

       pattern: {^ref, :new_changes, offset}
       value:   {Electric.ShapeCache.ShapeCleaner, :cleanup, "44903071-1769515101384795"}

       pattern: {^ref, :new_changes, offset}
       value:   {:flush_boundary_updated, 100}
     stacktrace:
       .../electric/shapes/consumer_test.exs:1851: (test)
Elixir.Electric.Shapes.ConsumerTest::test fragment-direct streaming skipped fragments during recovery still notify flush boundary
Stack Traces | 1.22s run time
13) test fragment-direct streaming skipped fragments during recovery still notify flush boundary (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:1364
     Assertion failed, no matching message after 1000ms
     The following variables were pinned:
       ref = #Reference<0.3672906850.3497000962.62895>
     Showing 10 of 17 messages in the mailbox
     code: assert_receive {^ref, :new_changes, _}
     mailbox:
       pattern: {^ref, :new_changes, _}
       value:   {Support.TestStorage, :set_pg_snapshot, "27335419-1769515112410393", %{xmin: 10, xmax: 11, xip_list: ~c"\n", filter_txns?: true}}

       pattern: {^ref, :new_changes, _}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "27335419-1769515112410393"}

       pattern: {^ref, :new_changes, _}
       value:   {Support.TestStorage, :set_pg_snapshot, "27335419-1769515112410393", %{xmin: 10, xmax: 11, xip_list: ~c"\n", filter_txns?: false}}

       pattern: {^ref, :new_changes, _}
       value:   {Support.TestStorage, :append_fragment_to_log!, "27335419-1769515112410393", [{LogOffset.new(100, 0), "\"public\".\"test_table\"/\"1\"", :insert, "{\"value\":{\"id\":\"1\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"1\\\"\",\"headers\":{\"last\":true,\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"100\",\"txids\":[50],\"op_position\":0}}"}]}

       pattern: {^ref, :new_changes, _}
       value:   {Support.TestStorage, :signal_txn_commit!, "27335419-1769515112410393", 50}

       pattern: {^ref, :new_changes, _}
       value:   {Support.TestStorage, :terminate, "27335419-1769515112410393"}

       pattern: {^ref, :new_changes, _}
       value:   {Support.TestStorage, :cleanup!, "27335419-1769515112410393"}

       pattern: {^ref, :new_changes, _}
       value:   {#Reference<0.3672906850.3497000962.62895>, :shape_rotation}

       pattern: {^ref, :new_changes, _}
       value:   {Electric.ShapeCache.ShapeCleaner, :cleanup, "27335419-1769515112410393"}

       pattern: {^ref, :new_changes, _}
       value:   {:flush_boundary_updated, 200}
     stacktrace:
       .../electric/shapes/consumer_test.exs:1403: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries move-in into move-out into move-in of the same parent results in a 
Stack Traces | 1.3s run time
52) test /v1/shapes - subqueries move-in into move-out into move-in of the same parent results in a  (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2890
     match (=) failed
     code:  assert {_req, 200,
             [
               %{"headers" => %{"event" => "move-out", "patterns" => p1}},
               %{"headers" => %{"event" => "move-out", "patterns" => p1}},
               %{"headers" => %{"control" => "snapshot-end"}},
               %{
                 "headers" => %{"operation" => "insert", "is_move_in" => true},
                 "value" => %{"id" => "3", "parent_id" => "3", "value" => "30"}
               },
               %{"headers" => %{"control" => "snapshot-end"}},
               up_to_date_ctl()
             ]} = shape_req(req, ctx.opts)
     left:  {_req, 200,
             [
               %{"headers" => %{"event" => "move-out", "patterns" => p1}},
               %{"headers" => %{"event" => "move-out", "patterns" => p1}},
               %{"headers" => %{"control" => "snapshot-end"}},
               %{
                 "headers" => %{"operation" => "insert", "is_move_in" => true},
                 "value" => %{"id" => "3", "parent_id" => "3", "value" => "30"}
               },
               %{"headers" => %{"control" => "snapshot-end"}},
               up_to_date_ctl()
             ]}
     right: {%{handle: "24658651-1769515158664930", offset: "0_0", table: "child", where: "parent_id in (SELECT id FROM parent WHERE value = 1)", live: false}, 409, [%{"headers" => %{"control" => "must-refetch"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2920: (test)
Elixir.Electric.Integration.SubqueryMoveOutTest::test subquery move-out with parent/child tables receives move-out control message when parent is deactivated
Stack Traces | 4.21s run time
3) test subquery move-out with parent/child tables receives move-out control message when parent is deactivated (Electric.Integration.SubqueryMoveOutTest)
     test/integration/subquery_move_out_test.exs:55
     Received unexpected message: %Electric.Client.Message.ControlMessage{control: :up_to_date, global_last_seen_lsn: 823507584, handle: "37387940-1769515219669580", request_timestamp: ~U[2026-01-27 12:00:19.719365Z]}
     code: assert_delete(consumer, %{"id" => "child-1"})
     stacktrace:
       test/integration/subquery_move_out_test.exs:71: (test)
Elixir.Electric.Integration.SubqueryMoveOutTest::test subquery move-out with parent/child tables move-in after row becomes visible through different parent
Stack Traces | 4.25s run time
4) test subquery move-out with parent/child tables move-in after row becomes visible through different parent (Electric.Integration.SubqueryMoveOutTest)
     test/integration/subquery_move_out_test.exs:138
     Received unexpected message: %Electric.Client.Message.ControlMessage{control: :up_to_date, global_last_seen_lsn: 819024800, handle: "109545532-1769515215421029", request_timestamp: ~U[2026-01-27 12:00:15.503140Z]}
     code: assert_delete(consumer, %{"id" => "child-1"})
     stacktrace:
       test/integration/subquery_move_out_test.exs:154: (test)
Elixir.Electric.Integration.SubqueryMoveOutTest::test resume preserves move-out state move-out after resume generates synthetic delete
Stack Traces | 4.25s run time
1) test resume preserves move-out state move-out after resume generates synthetic delete (Electric.Integration.SubqueryMoveOutTest)
     test/integration/subquery_move_out_test.exs:330
     Received unexpected message: %Electric.Client.Message.ControlMessage{control: :up_to_date, global_last_seen_lsn: 832473000, handle: "123864803-1769515228082067", request_timestamp: ~U[2026-01-27 12:00:28.182975Z]}
     code: assert_delete(consumer, %{"id" => "child-1"})
     stacktrace:
       test/integration/subquery_move_out_test.exs:355: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries a move-out from the inner shape is propagated to the outer shape
Stack Traces | 4.27s run time
12) test /v1/shapes - subqueries a move-out from the inner shape is propagated to the outer shape (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2259
     match (=) failed
     code:  assert {_req, 200, [data, %{"headers" => %{"control" => "up-to-date"}}]} = Task.await(task)
     left:  {_req, 200, [data, %{"headers" => %{"control" => "up-to-date"}}]}
     right: {%{handle: "76433792-1769515194355809", offset: "0_inf", table: "child", where: "parent_id in (SELECT id FROM parent WHERE value = 1)", live: true}, 200, [%{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => "742819208"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2278: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries subquery combined with OR should return a 409 on move-out
Stack Traces | 4.31s run time
29) test /v1/shapes - subqueries subquery combined with OR should return a 409 on move-out (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2495
     match (=) failed
     code:  assert {_req, 409, _response} = Task.await(task)
     left:  {_req, 409, _response}
     right: {%{handle: "76786994-1769515176399844", offset: "0_inf", table: "child", where: "parent_id in (SELECT id FROM parent WHERE include_parent = true) OR include_child = true", live: true}, 200, [%{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => "647816544"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2518: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries allows 3 level subquery in where clauses
Stack Traces | 4.33s run time
27) test /v1/shapes - subqueries allows 3 level subquery in where clauses (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2431
     match (=) failed
     code:  assert {req, 200, [%{"value" => %{"id" => "1", "value" => "2"}}, _]} = Task.await(task)
     left:  {req, 200, [%{"value" => %{"id" => "1", "value" => "2"}}, _]}
     right: {%{handle: "34174327-1769515180985577", offset: "656864816_0", table: "child", where: "parent_id in (SELECT id FROM parent WHERE grandparent_id in (SELECT id FROM grandparent WHERE value = 10))", live: true}, 200, [%{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => "656864816"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2455: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries supports two subqueries at the same level but returns 409 on move-in
Stack Traces | 4.36s run time
55) test /v1/shapes - subqueries supports two subqueries at the same level but returns 409 on move-in (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2985
     match (=) failed
     code:  assert [
              %{
                "headers" => %{"operation" => "insert"},
                "value" => %{"id" => "3", "workspace_id" => "1", "name" => "project 3"}
              },
              _
            ] = Jason.decode!(conn.resp_body)
     left:  [
              %{
                "headers" => %{"operation" => "insert"},
                "value" => %{"id" => "3", "workspace_id" => "1", "name" => "project 3"}
              },
              _
            ]
     right: [%{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => "530929944"}}]
     stacktrace:
       .../electric/plug/router_test.exs:3026: (test)
Elixir.Electric.Integration.SubqueryMoveOutTest::test subquery move-out with parent/child tables move-out generates synthetic deletes for all affected child rows
Stack Traces | 5.26s run time
6) test subquery move-out with parent/child tables move-out generates synthetic deletes for all affected child rows (Electric.Integration.SubqueryMoveOutTest)
     test/integration/subquery_move_out_test.exs:80
     ** (MatchError) no match of right hand side value:

         {:error, :timeout}

     code: {:ok, deletes} =
     stacktrace:
       test/integration/subquery_move_out_test.exs:97: (test)

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@blacksmith-sh

This comment has been minimized.

@alco alco force-pushed the alco/consumer-write-txn-fragment-to-storage branch from 48b22be to e6b7397 Compare January 20, 2026 14:40
@alco alco changed the title Alco/consumer write txn fragment to storage feat(sync-service): Write transaction fragments directly to storage Jan 20, 2026
@alco alco force-pushed the alco/consumer-write-txn-fragment-to-storage branch from e6b7397 to 9a65547 Compare January 22, 2026 13:48
@netlify
Copy link

netlify bot commented Jan 22, 2026

Deploy Preview for electric-next ready!

Name Link
🔨 Latest commit b5e942a
🔍 Latest deploy log https://app.netlify.com/projects/electric-next/deploys/6978a8103d44fa0008786a37
😎 Deploy Preview https://deploy-preview-3740--electric-next.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@blacksmith-sh
Copy link
Contributor

blacksmith-sh bot commented Jan 22, 2026

Found 109 test failures on Blacksmith runners:

Failures

Test View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test resume preserves move-out state move-out after resume generates synthetic delete
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test resume preserves move-out state move-out after resume generates synthetic delete
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test resume preserves move-out state move-out after resume generates synthetic delete
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables move-in after row becomes visible through different parent
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables move-in after row becomes visible through different parent
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables move-in after row becomes visible through different parent
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables move-out generates synthetic deletes for all affected child rows
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables move-out generates synthetic deletes for all affected child rows
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables move-out generates synthetic deletes for all affected child rows
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables receives move-out control message when parent is deactivated
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables receives move-out control message when parent is deactivated
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables receives move-out control message when parent is deactivated
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test tag handling during updates deactivating old parent after child changed parents sh
ould not generate delete
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test tag handling during updates deactivating old parent after child changed parents sh
ould not generate delete
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test tag handling during updates deactivating old parent after child changed parents sh
ould not generate delete
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test tag handling during updates update that changes parent reference updates tags
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test tag handling during updates update that changes parent reference updates tags
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test tag handling during updates update that changes parent reference updates tags
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries a move-in from the inner shape causes a query and new entries in th
e outer shape
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries a move-in from the inner shape causes a query and new entries in th
e outer shape
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries a move-in from the inner shape causes a query and new entries in th
e outer shape
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries a move-out from the inner shape is propagated to the outer shape
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries a move-out from the inner shape is propagated to the outer shape
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries a move-out from the inner shape is propagated to the outer shape
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries allows 3 level subquery in where clauses
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries allows 3 level subquery in where clauses
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries allows 3 level subquery in where clauses
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries allows subquery in where clause
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries allows subquery in where clause
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries allows subquery in where clause
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries allows subquery in where clauses that reference non-PK columns
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries allows subquery in where clauses that reference non-PK columns
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries allows subquery in where clauses that reference non-PK columns
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries move-in into move-out into move-in of the same parent results in a
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries move-in into move-out into move-in of the same parent results in a
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries move-in into move-out into move-in of the same parent results in a
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries move-outs while processing move-ins are handled correctly
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries move-outs while processing move-ins are handled correctly
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries move-outs while processing move-ins are handled correctly
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries multiple actions result in a correct event sequence
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries multiple actions result in a correct event sequence
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries multiple actions result in a correct event sequence
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subqueries can reference composite PKs
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subqueries can reference composite PKs
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subqueries can reference composite PKs
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subqueries work with params
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subqueries work with params
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subqueries work with params
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subquery combined with OR should return a 409 on move-out
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subquery combined with OR should return a 409 on move-out
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subquery combined with OR should return a 409 on move-out
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries supports two subqueries at the same level but returns 409 on move-i
n
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries supports two subqueries at the same level but returns 409 on move-i
n
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries supports two subqueries at the same level but returns 409 on move-i
n
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries table with a composite PK can be used in a subquery
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries table with a composite PK can be used in a subquery
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries table with a composite PK can be used in a subquery
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes DELETE forces the shape handle to be different on reconnect and new snapshot to
be created
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET receives only specified columns out of wide table
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET receives only specified columns out of wide table
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET receives only specified columns out of wide table
View Logs
Elixir.Electric.Shapes.ConsumerTest/test event handling appends to log when xid >= xmin View Logs
Elixir.Electric.Shapes.ConsumerTest/test event handling appends to log when xid >= xmin View Logs
Elixir.Electric.Shapes.ConsumerTest/test event handling appends to log when xid >= xmin View Logs
Elixir.Electric.Shapes.ConsumerTest/
test event handling correctly writes only relevant changes to multiple shape logs
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test event handling correctly writes only relevant changes to multiple shape logs
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test event handling correctly writes only relevant changes to multiple shape logs
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test event handling doesn't append to log when change is irrelevant for active shapes
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test event handling doesn't append to log when change is irrelevant for active shapes
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test event handling doesn't append to log when change is irrelevant for active shapes
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test event handling notifies listeners of new changes
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test event handling notifies listeners of new changes
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test event handling notifies listeners of new changes
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming commit-only fragment with no relevant changes still sign
als commit
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming commit-only fragment with no relevant changes still sign
als commit
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming commit-only fragment with no relevant changes still sign
als commit
View Logs
Elixir.Electric.Shapes.ConsumerTest/test fragment-direct streaming crash/
restart with partial fragments persisted recovers correctly
View Logs
Elixir.Electric.Shapes.ConsumerTest/test fragment-direct streaming crash/
restart with partial fragments persisted recovers correctly
View Logs
Elixir.Electric.Shapes.ConsumerTest/test fragment-direct streaming crash/
restart with partial fragments persisted recovers correctly
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming fragment-direct mode disabled during initial filtering p
hase
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming fragment-direct mode disabled during initial filtering p
hase
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming fragment-direct mode disabled during initial filtering p
hase
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming interleaved begin fragments raise an error
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming interleaved begin fragments raise an error
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming interleaved begin fragments raise an error
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming skipped fragments during recovery still notify flush bou
ndary
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming skipped fragments during recovery still notify flush bou
ndary
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming skipped fragments during recovery still notify flush bou
ndary
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming uncommitted fragments flushed to disk do not advance lat
est_offset
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming uncommitted fragments flushed to disk do not advance lat
est_offset
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming uncommitted fragments flushed to disk do not advance lat
est_offset
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage ConsumerRegistry.enable_suspend should susp
end hibernated consumers
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage ConsumerRegistry.enable_suspend should susp
end hibernated consumers
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage ConsumerRegistry.enable_suspend should susp
end hibernated consumers
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage should hibernate not suspend if has depende
ncies
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage should hibernate not suspend if has depende
ncies
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage should hibernate not suspend if has depende
ncies
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage should notify txns skipped because of xmin/
xip as flushed
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage should notify txns skipped because of xmin/
xip as flushed
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage should notify txns skipped because of xmin/
xip as flushed
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage should terminate after :hibernate_after ms
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage should terminate after :hibernate_after ms
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage should terminate after :hibernate_after ms
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage transactions are buffered until snapshot xm
in is known
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage transactions are buffered until snapshot xm
in is known
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage transactions are buffered until snapshot xm
in is known
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage UPDATE during pending move-in is converted
to INSERT and query result skips duplicate key
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage UPDATE during pending move-in is converted
to INSERT and query result skips duplicate key
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage UPDATE during pending move-in is converted
to INSERT and query result skips duplicate key
View Logs

Fix in Cursor

alco added 22 commits January 27, 2026 00:30
Add append_fragment_to_log!/2 and signal_txn_commit!/2 callbacks to
Storage behaviour for streaming transaction fragments directly to
storage without waiting for commit.

- Storage.ex: Define new callbacks in behaviour
- PureFileStorage: Implement callbacks (delegate to append_to_log! for now)
- InMemoryStorage: Implement callbacks
- CrashingFileStorage: Delegate to PureFileStorage
- TestStorage: Add wrapper implementations for testing

This lays groundwork for consumers without materializers to stream
fragments directly to storage, reducing memory usage for large
transactions.
Add state support for streaming transaction fragments directly to
storage for shapes without subquery dependencies.

- Create PendingTxn module to track in-progress transaction metadata
- Add fragment_direct? flag to State (true when no shape_dependencies)
- Add pending_txn field to track current incomplete transaction

The fragment_direct? flag is automatically set based on whether the
shape has subquery dependencies. Shapes without dependencies can
stream directly to storage.
…t dependencies

Add fragment-direct streaming mode that writes transaction fragments directly
to storage without buffering the complete transaction in memory. This reduces
memory pressure for shapes that don't have subquery dependencies.

Key changes:
- Add handle_fragment_direct/2 to process fragments directly to storage
- Add can_use_fragment_direct?/1 guard that checks 4 conditions:
  1. Shape has no subquery dependencies (fragment_direct? flag)
  2. Not buffering for initial snapshot
  3. No materializer subscribed (inner shapes need full txn handling)
  4. Initial snapshot filtering is complete
- Add helper functions for fragment processing:
  - skip_fragment?/2 - skip already processed fragments
  - maybe_start_pending_txn/2 - initialize PendingTxn on begin
  - write_fragment_to_storage/2 - write changes via append_fragment_to_log!
  - maybe_complete_pending_txn/2 - finalize on commit, notify clients
  - notify_clients_of_new_changes/2 - notify without materializer
- Update tests with assert_storage_append/refute_storage_append macros
  to handle both append_to_log! and append_fragment_to_log! calls
- Add consider_flushed_fragment/2 to properly notify flush boundaries when
  a transaction has no relevant changes (empty transaction handling)
- Handle nil pending_txn in write_fragment_to_storage/2 for recovery scenarios
  where a middle fragment arrives without the begin fragment
- Handle nil pending_txn in maybe_complete_pending_txn/2 for commit-only
  fragments or other recovery edge cases
Add new test suite for fragment-direct streaming edge cases:

- Multi-fragment transaction handling: Verifies that large transactions
  spanning multiple fragments are correctly accumulated and written
- Empty transaction flush boundary: Tests that transactions with no
  relevant changes still notify the flush boundary
- Truncate operation: Verifies truncate triggers shape removal
- Skipped fragments during recovery: Tests idempotency when fragments
  are replayed
- Shapes with subquery dependencies: Verifies inner shapes use
  TransactionBuilder (not fragment-direct mode)
- Initial filtering phase: Tests that fragment-direct mode is disabled
  during initial snapshot filtering
- Different table filtering: Verifies changes for other tables are
  filtered out
- Mixed fragments: Tests fragments with both relevant and irrelevant
  changes
Fragment-direct streaming was bypassing Shape.convert_change which handles:
1. Filtering changes for the shape's root table
2. Applying WHERE clause filtering
3. Converting UPDATEs to INSERTs/DELETEs for move-in/move-out scenarios

Also add maybe_mark_last_change/2 to set last?: true on the final change
in commit fragments, which is needed for clients to know when a transaction
is complete.

This fixes the router test 'GET returns correct INSERT and DELETE operations
that have been converted from UPDATEs' which was failing because UPDATEs
that moved rows in/out of a shape's filter were not being converted.
Remove repeated in-function alias calls for PendingTxn and add a single
alias at the top of the module with the other Consumer.* aliases.
Also alphabetize the Consumer.* alias block.
Update append_fragment_to_log! and signal_txn_commit! in both
PureFileStorage and InMemoryStorage to ensure that:

- append_fragment_to_log! writes log lines but does NOT advance
  last_seen_txn_offset or update @latest_offset_key
- signal_txn_commit! completes transaction tracking and updates
  the latest offset

This ensures that on crash/recovery, fetch_latest_offset returns
the last committed transaction offset, not a mid-transaction offset.
This is critical for correct recovery behavior when using fragment-
direct streaming mode.
Add explicit check in maybe_start_pending_txn/2 to raise an error when
receiving a begin fragment for a new transaction while another transaction
is still pending. This is a defensive measure to catch unexpected
replication behavior early rather than silently corrupting state.
Add tests to verify:
- Interleaved begin fragments raise an error
- Crash/restart with partial fragments persisted recovers correctly
- Commit-only fragment with no relevant changes still signals commit
- Flush-before-commit does not advance flush boundary beyond last
  committed offset

These tests ensure the commit-gated storage semantics work correctly
in the fragment-direct streaming mode.
…eaming

Update the expected log message from 'Txn received in Shapes.Consumer'
to 'Completed fragment-direct transaction' since the consumer now uses
fragment-direct streaming which emits different log messages.
It doesn't matter whether the xid of the pending txn and the newly
arrived one is. Seeing a Begin before a Commit is an error regardless.
- Test mid-fragment without prior begin creates pending_txn on-the-fly
- Test commit-only fragment with no prior fragments processes correctly
- Test uncommitted fragments flushed to disk do not advance latest_offset

These tests verify recovery scenarios and the core crash-safety invariant
that fetch_latest_offset only returns committed transaction offsets.
…assert_receive

Replace assert_storage_append and refute_storage_append helper macros with
direct assert_receive/refute_receive calls using :append_to_log!.

The tests in the 'event handling' describe block use :append_to_log! because
the Consumer does not use fragment-direct streaming during the initial
filtering phase (filtering? flag is true until a transaction with xid > xmax
is processed).
Multiple calls to Storage.for_stack(...) inside PureFileStorage weren't
prepared to deal with a possible return value of {TestStorage, ....}.

This change provides a more direct function for fetching the wrapped
storage options inside that storage's implementation module.
@alco alco force-pushed the alco/consumer-write-txn-fragment-to-storage branch from bdbdda8 to b5e942a Compare January 27, 2026 11:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Avoid holding whole transaction in consumer memory

2 participants