Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/cyan-crews-camp.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Ignore errors caused by telemetry polling during initialization
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,16 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb do
end

def count_shapes!(stack_id) do
stack_id |> count_shapes() |> raise_on_error!(:count_shapes)
try do
stack_id |> count_shapes() |> raise_on_error!(:count_shapes)
rescue
# the connection pool has its own registry, so attempting to checkout a
# connection will raise an ArgumentError if that registry isn't running
ArgumentError -> :error
catch
# connection pool has not started
:exit, {:noproc, {NimblePool, :checkout, _args}} -> :error
end
end

@doc false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.Statistics do
end

def current(stack_id) do
GenServer.call(name(stack_id), :current)
stack_id
|> name()
|> GenServer.whereis()
|> case do
nil -> {:error, "ShapeDb.Statistics for stack #{stack_id} is not running"}
pid when is_pid(pid) -> GenServer.call(pid, :current)
end
end

def worker_start(opts) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ defmodule Electric.ShapeCache.ShapeStatus.ShapeDb.WriteBuffer do
@doc "Returns the number of pending operations in the buffer"
def pending_operations_count(stack_id) do
:ets.info(operations_table_name(stack_id), :size)
rescue
ArgumentError -> 0
Comment thread
magnetised marked this conversation as resolved.
end

@doc "Gives the change to the total count of shapes in the database once all buffered writes are applied"
Expand Down
6 changes: 4 additions & 2 deletions packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -380,11 +380,13 @@ defmodule Electric.StackSupervisor do
registry_partitions =
Keyword.get(config.tweaks, :registry_partitions, System.schedulers_online())

telemetry_child = Electric.StackSupervisor.Telemetry.configure(config)
telemetry_spec = Electric.StackSupervisor.Telemetry.configure(config)

children =
[
telemetry_child,
# put telemetry processes first so that they are torn down last and can
# continue reporting while the stack terminates.
telemetry_spec,
{Electric.ProcessRegistry, partitions: registry_partitions, stack_id: stack_id},
{Electric.StackConfig,
stack_id: stack_id,
Expand Down
262 changes: 134 additions & 128 deletions packages/sync-service/lib/electric/stack_supervisor/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,140 @@ defmodule Electric.StackSupervisor.Telemetry do
child_spec(config)
end

@doc false
# used in tests
def default_periodic_measurements(%{stack_id: stack_id} = config) do
[
{__MODULE__, :count_shapes, [stack_id]},
{__MODULE__, :report_write_buffer_size, [stack_id]},
{__MODULE__, :report_retained_wal_size, [stack_id, config.replication_opts[:slot_name]]},
{__MODULE__, :report_disk_usage, [stack_id]},
{__MODULE__, :report_shape_db_stats, [stack_id]}
]
end

def count_shapes(stack_id, _telemetry_opts) do
# Telemetry is started before everything else in the stack, so we need to handle
# the case where the shape cache is not started yet.
with num_shapes when is_integer(num_shapes) <- Electric.ShapeCache.count_shapes(stack_id) do
Electric.Telemetry.OpenTelemetry.execute(
[:electric, :shapes, :total_shapes],
%{count: num_shapes},
%{stack_id: stack_id}
)
end

Electric.Telemetry.OpenTelemetry.execute(
[:electric, :shapes, :active_shapes],
%{count: Electric.Shapes.ConsumerRegistry.active_consumer_count(stack_id)},
%{stack_id: stack_id}
)
end

def report_write_buffer_size(stack_id, _telemetry_opts) do
alias Electric.ShapeCache.ShapeStatus.ShapeDb

pending_count = ShapeDb.pending_buffer_size(stack_id)

Electric.Telemetry.OpenTelemetry.execute(
[:electric, :shape_db, :write_buffer, :pending_writes],
%{count: pending_count},
%{stack_id: stack_id}
)
end

@min_signed_int8 -2 ** 63
@retained_wal_size_query """
SELECT
(pg_current_wal_lsn() - '0/0' + #{@min_signed_int8})::int8 AS pg_wal_offset,
pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)::int8 AS retained_wal_size,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)::int8 AS confirmed_flush_lsn_lag
FROM
pg_replication_slots
WHERE
slot_name = $1
"""

@doc false
@spec report_retained_wal_size(Electric.stack_id(), binary(), map()) :: :ok
def report_retained_wal_size(stack_id, slot_name, _telemetry_opts) do
try do
%Postgrex.Result{rows: [[pg_wal_offset, retained_wal_size, confirmed_flush_lsn_lag]]} =
Postgrex.query!(
Electric.Connection.Manager.admin_pool(stack_id),
@retained_wal_size_query,
[slot_name],
timeout: 3_000,
deadline: 3_000
)

# The query above can return `-1` for `confirmed_flush_lsn_lag` which means that Electric
# is caught up with Postgres' replication stream.
# This is a confusing stat if we're measuring in bytes, so use 0 as the bottom limit.

Electric.Telemetry.OpenTelemetry.execute(
[:electric, :postgres, :replication],
%{
# The absolute value of pg_current_wal_lsn() doesn't convey any useful info but by
# plotting its rate of change we can see how fast the WAL is growing.
#
# We shift the absolute value of pg_current_wal_lsn() by -2**63 in the query above
# to make sure it fits inside the signed 64-bit integer type expected by the
# OpenTelemetry Protocol,
pg_wal_offset: pg_wal_offset,
slot_retained_wal_size: retained_wal_size,
slot_confirmed_flush_lsn_lag: max(0, confirmed_flush_lsn_lag)
},
%{stack_id: stack_id}
)
catch
:exit, {:noproc, _} ->
:ok

# catch all errors to not log them as errors, those are reporing issues at best
type, reason ->
Logger.warning(
"Failed to query retained WAL size\nError: #{Exception.format(type, reason)}",
stack_id: stack_id,
slot_name: slot_name
)
end
end

if Code.ensure_loaded?(ElectricTelemetry.DiskUsage) do
def report_disk_usage(stack_id, _telemetry_opts) do
case ElectricTelemetry.DiskUsage.current(stack_id) do
{:ok, usage_bytes, measurement_duration} ->
Electric.Telemetry.OpenTelemetry.execute(
[:electric, :storage, :used],
%{bytes: usage_bytes, measurement_duration: measurement_duration},
%{stack_id: stack_id}
)

:pending ->
:ok
end
end
else
def report_disk_usage(_stack_id, _telemetry_opts) do
:ok
end
end

def report_shape_db_stats(stack_id, _telemetry_opts) do
case Electric.ShapeCache.ShapeStatus.ShapeDb.statistics(stack_id) do
{:ok, stats} ->
Electric.Telemetry.OpenTelemetry.execute(
[:electric, :shape_db, :sqlite],
stats,
%{stack_id: stack_id}
)

_ ->
:ok
end
end

def child_spec(%{stack_telemetry: stack_telemetry}), do: stack_telemetry

if Code.ensure_loaded?(ElectricTelemetry.StackTelemetry) do
Expand Down Expand Up @@ -56,134 +190,6 @@ defmodule Electric.StackSupervisor.Telemetry do
Telemetry.Metrics.last_value("electric.shape_db.sqlite.connections")
]
end

defp default_periodic_measurements(%{stack_id: stack_id} = config) do
[
{__MODULE__, :count_shapes, [stack_id]},
{__MODULE__, :report_write_buffer_size, [stack_id]},
{__MODULE__, :report_retained_wal_size, [stack_id, config.replication_opts[:slot_name]]},
{__MODULE__, :report_disk_usage, [stack_id]},
{__MODULE__, :report_shape_db_stats, [stack_id]}
]
end

def count_shapes(stack_id, _telemetry_opts) do
# Telemetry is started before everything else in the stack, so we need to handle
# the case where the shape cache is not started yet.
with num_shapes when is_integer(num_shapes) <- Electric.ShapeCache.count_shapes(stack_id) do
Electric.Telemetry.OpenTelemetry.execute(
[:electric, :shapes, :total_shapes],
%{count: num_shapes},
%{stack_id: stack_id}
)
end

Electric.Telemetry.OpenTelemetry.execute(
[:electric, :shapes, :active_shapes],
%{count: Electric.Shapes.ConsumerRegistry.active_consumer_count(stack_id)},
%{stack_id: stack_id}
)
end

def report_write_buffer_size(stack_id, _telemetry_opts) do
alias Electric.ShapeCache.ShapeStatus.ShapeDb

pending_count = ShapeDb.pending_buffer_size(stack_id)

Electric.Telemetry.OpenTelemetry.execute(
[:electric, :shape_db, :write_buffer, :pending_writes],
%{count: pending_count},
%{stack_id: stack_id}
)
rescue
ArgumentError -> :ok
end

@min_signed_int8 -2 ** 63
@retained_wal_size_query """
SELECT
(pg_current_wal_lsn() - '0/0' + #{@min_signed_int8})::int8 AS pg_wal_offset,
pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)::int8 AS retained_wal_size,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)::int8 AS confirmed_flush_lsn_lag
FROM
pg_replication_slots
WHERE
slot_name = $1
"""

@doc false
@spec report_retained_wal_size(Electric.stack_id(), binary(), map()) :: :ok
def report_retained_wal_size(stack_id, slot_name, _telemetry_opts) do
try do
%Postgrex.Result{rows: [[pg_wal_offset, retained_wal_size, confirmed_flush_lsn_lag]]} =
Postgrex.query!(
Electric.Connection.Manager.admin_pool(stack_id),
@retained_wal_size_query,
[slot_name],
timeout: 3_000,
deadline: 3_000
)

# The query above can return `-1` for `confirmed_flush_lsn_lag` which means that Electric
# is caught up with Postgres' replication stream.
# This is a confusing stat if we're measuring in bytes, so use 0 as the bottom limit.

Electric.Telemetry.OpenTelemetry.execute(
[:electric, :postgres, :replication],
%{
# The absolute value of pg_current_wal_lsn() doesn't convey any useful info but by
# plotting its rate of change we can see how fast the WAL is growing.
#
# We shift the absolute value of pg_current_wal_lsn() by -2**63 in the query above
# to make sure it fits inside the signed 64-bit integer type expected by the
# OpenTelemetry Protocol,
pg_wal_offset: pg_wal_offset,
slot_retained_wal_size: retained_wal_size,
slot_confirmed_flush_lsn_lag: max(0, confirmed_flush_lsn_lag)
},
%{stack_id: stack_id}
)
catch
:exit, {:noproc, _} ->
:ok

# catch all errors to not log them as errors, those are reporing issues at best
type, reason ->
Logger.warning(
"Failed to query retained WAL size\nError: #{Exception.format(type, reason)}",
stack_id: stack_id,
slot_name: slot_name
)
end
end

def report_disk_usage(stack_id, _telemetry_opts) do
case ElectricTelemetry.DiskUsage.current(stack_id) do
{:ok, usage_bytes, measurement_duration} ->
Electric.Telemetry.OpenTelemetry.execute(
[:electric, :storage, :used],
%{bytes: usage_bytes, measurement_duration: measurement_duration},
%{stack_id: stack_id}
)

:pending ->
:ok
end
end

def report_shape_db_stats(stack_id, _telemetry_opts) do
case Electric.ShapeCache.ShapeStatus.ShapeDb.statistics(stack_id) do
{:ok, stats} ->
Electric.Telemetry.OpenTelemetry.execute(
[:electric, :shape_db, :sqlite],
stats,
%{stack_id: stack_id}
)

_ ->
:ok
end
end
else
def child_spec(_), do: nil
end
Expand Down
21 changes: 21 additions & 0 deletions packages/sync-service/test/electric/stack_supervisor_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule Electric.StackSupervisorTest do
use ExUnit.Case, async: true

alias Electric.StackSupervisor

import Support.ComponentSetup

describe "Telemetry" do
setup [:with_stack_id_from_test]

test "default_periodic_measurements/1 do not raise if stack down", ctx do
for {m, f, a} <-
StackSupervisor.Telemetry.default_periodic_measurements(%{
stack_id: ctx.stack_id,
replication_opts: [slot_name: "no_such_slot"]
}) do
apply(m, f, a ++ [%{}])
end
end
end
end
Loading