Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-stale-consumer-registry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Fix race condition in `ConsumerRegistry.unregister_name/1` that left stale PIDs in the ETS table. Uses atomic `:ets.match_delete/2` to remove the entry only if it still belongs to the dying process, preventing accidental deletion of a replacement consumer's entry.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,12 @@ defmodule Electric.Shapes.ConsumerRegistry do
if register_consumer!(pid, shape_handle, ets_name(stack_id)), do: :yes, else: :no
end

# don't unregister when the pid exits -- we have mechanisms to ensure that happens cleanly
def unregister_name({_stack_id, _shape_handle}) do
# Atomically remove the ETS entry only if it still belongs to the calling
# process (the dying consumer). If a replacement consumer has already
# registered under the same shape_handle, match_delete is a no-op because
# the pid won't match.
def unregister_name({stack_id, shape_handle}) do
:ets.match_delete(ets_name(stack_id), {shape_handle, self()})
:ok
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,75 @@ defmodule Electric.Shapes.ConsumerRegistryTest do
end
end

describe "unregister_name/1" do
test "removes entry when pid matches the calling process", ctx do
handle = "handle-1"
table = ctx.registry_state.table

# Register the current process
:ets.insert(table, {handle, self()})
assert [{^handle, pid}] = :ets.lookup(table, handle)
assert pid == self()

ConsumerRegistry.unregister_name({ctx.stack_id, handle})

assert :ets.lookup(table, handle) == []
end

test "does not remove entry belonging to a different process", ctx do
handle = "handle-1"
table = ctx.registry_state.table

other_pid = spawn(fn -> Process.sleep(:infinity) end)
:ets.insert(table, {handle, other_pid})

ConsumerRegistry.unregister_name({ctx.stack_id, handle})

assert [{^handle, ^other_pid}] = :ets.lookup(table, handle)
end

test "does not remove a replacement consumer's entry (race scenario)", ctx do
handle = "handle-1"
table = ctx.registry_state.table
parent = self()

# Simulate: old consumer (a spawned process) calls unregister_name,
# but a replacement has already registered under the same handle.
replacement_pid = spawn(fn -> Process.sleep(:infinity) end)

# First, register the old process
old_task =
Task.async(fn ->
:ets.insert(table, {handle, self()})
send(parent, :old_registered)

# Wait for the replacement to overwrite
receive do
:proceed_to_unregister -> :ok
end

# Now unregister_name should NOT delete the replacement's entry
ConsumerRegistry.unregister_name({ctx.stack_id, handle})
send(parent, :old_unregistered)
end)

assert_receive :old_registered

# Simulate the replacement consumer registering (delete old + insert new)
:ets.delete(table, handle)
:ets.insert(table, {handle, replacement_pid})

# Let the old process proceed with unregister
send(old_task.pid, :proceed_to_unregister)
assert_receive :old_unregistered

# The replacement's entry must still be there
assert [{^handle, ^replacement_pid}] = :ets.lookup(table, handle)

Task.await(old_task)
end
end

describe "broadcast/1" do
test "sends message to all subscribers" do
pid = self()
Expand Down
Loading