Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 97 additions & 3 deletions scripts/sync-all-parallel.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,23 @@
# --report PATH Write report to file
# --commit-all MSG Auto-commit all dirty repos with given message
# --push Auto-push after commit (with --commit-all)
# --batch-size N Repos processed per throttled batch (default: 25,
# env SYNC_BATCH_SIZE). Set 0 to disable batching.
# --batch-pause SEC Seconds to pause between batches (default: 45,
# env SYNC_BATCH_PAUSE_SEC). A small random jitter is
# added on top to de-correlate CI trigger waves.
# --no-throttle Disable inter-batch pacing entirely (one big stream,
# legacy behaviour). Implied by --dry-run (no pushes).
#
# THROTTLING / THUNDERING-HERD CONTROL
# Phase 1 pushes to every owned repo. Each push fires that repo's GitHub
# Actions workflows. Pushing the whole estate (~355 repos) in one tight
# window saturates the account-wide hosted-runner concurrency cap, leaving
# a large fraction of estate CI transiently `queued`. To avoid this we
# process repos in batches of --batch-size and pause --batch-pause seconds
# (plus jitter) between batches, so CI trigger waves are spread over time.
# Within a batch, the original --concurrency parallelism is unchanged, and
# a failure in one repo never aborts the batch or the run.

defmodule SyncAll do
@owned_prefixes [
Expand All @@ -31,6 +48,19 @@ defmodule SyncAll do

@ignored_dirs ~w[logs monitoring .git-private-farm]

# Throttling defaults. batch_size 25 + 45s pause keeps the per-minute
# workflow-trigger rate well under the account hosted-runner cap while
# still syncing the whole estate in a bounded time. Env vars let ops tune
# without editing the script; CLI flags override env.
@default_batch_size (case Integer.parse(System.get_env("SYNC_BATCH_SIZE") || "") do
{n, _} when n >= 0 -> n
_ -> 25
end)
@default_batch_pause_sec (case Integer.parse(System.get_env("SYNC_BATCH_PAUSE_SEC") || "") do
{n, _} when n >= 0 -> n
_ -> 45
end)

defstruct [
:repos_dir,
:dry_run,
Expand All @@ -40,6 +70,9 @@ defmodule SyncAll do
:report_file,
:commit_msg,
:auto_push,
:batch_size,
:batch_pause_sec,
:throttle,
max_depth: 3
]

Expand All @@ -51,6 +84,15 @@ defmodule SyncAll do
IO.puts(" Repos dir: #{config.repos_dir}")
IO.puts(" Concurrency: #{config.concurrency}")
IO.puts(" Dry run: #{config.dry_run}")

cond do
not throttling_active?(config) ->
IO.puts(" Throttle: off (no inter-batch pacing)")

true ->
IO.puts(" Throttle: batches of #{config.batch_size}, ~#{config.batch_pause_sec}s pause (+jitter)")
end

if config.commit_msg, do: IO.puts(" Commit msg: #{config.commit_msg}")

start = System.monotonic_time(:millisecond)
Expand Down Expand Up @@ -107,7 +149,10 @@ defmodule SyncAll do
do_fetch: true,
report_file: nil,
commit_msg: nil,
auto_push: false
auto_push: false,
batch_size: @default_batch_size,
batch_pause_sec: @default_batch_pause_sec,
throttle: true
})
end

Expand All @@ -121,6 +166,9 @@ defmodule SyncAll do
defp parse_args(["--report", path | rest], config), do: parse_args(rest, %{config | report_file: path})
defp parse_args(["--commit-all", msg | rest], config), do: parse_args(rest, %{config | commit_msg: msg})
defp parse_args(["--depth", n | rest], config), do: parse_args(rest, %{config | max_depth: String.to_integer(n)})
defp parse_args(["--batch-size", n | rest], config), do: parse_args(rest, %{config | batch_size: String.to_integer(n)})
defp parse_args(["--batch-pause", n | rest], config), do: parse_args(rest, %{config | batch_pause_sec: String.to_integer(n)})
defp parse_args(["--no-throttle" | rest], config), do: parse_args(rest, %{config | throttle: false})

defp parse_args([arg | rest], config) do
if File.dir?(arg), do: parse_args(rest, %{config | repos_dir: arg}), else: parse_args(rest, config)
Expand Down Expand Up @@ -188,13 +236,60 @@ defmodule SyncAll do

# --- Phase 1: Parallel Sync ---

# Throttling is only meaningful when we will actually push (pushes are
# what trigger remote CI). A dry run pushes nothing, so pacing it just
# wastes wall-clock — disable it there as well as on explicit --no-throttle
# or a non-positive batch size.
defp throttling_active?(config) do
config.throttle and not config.dry_run and config.batch_size > 0
end

defp phase1_parallel(repos, config) do
total = length(repos)
counter = :counters.new(1, [:atomics])

IO.puts("\n\e[1m=== Phase 1: Parallel Fetch/Pull/Push (#{config.concurrency} workers) ===\e[0m")

repos
batches =
if throttling_active?(config) do
Enum.chunk_every(repos, config.batch_size)
else
[repos]
end

nbatches = length(batches)

if nbatches > 1 do
IO.puts(
"\e[2mThrottled: #{nbatches} batch(es) of up to #{config.batch_size}, " <>
"~#{config.batch_pause_sec}s (+jitter) between batches\e[0m"
)
end

batches
|> Enum.with_index(1)
|> Enum.flat_map(fn {batch, idx} ->
results = run_batch(batch, config, counter, total)

# Pause AFTER every batch except the last, so the next wave of
# workflow triggers lands once the previous wave has had time to
# start draining the runner queue. Jitter de-correlates repeated runs.
if idx < nbatches do
jitter_ms = :rand.uniform(5_000)
pause_ms = config.batch_pause_sec * 1_000 + jitter_ms
IO.puts("\n\e[2m Batch #{idx}/#{nbatches} done — pausing #{div(pause_ms, 1000)}s before next batch…\e[0m")
Process.sleep(pause_ms)
end

results
end)
|> tap(fn _ -> IO.puts("") end) # Clear progress line
end

# Run one batch at the configured concurrency. A crash/timeout in one
# repo is mapped to an error result and never aborts the batch or run.
defp run_batch(batch, config, counter, total) do
batch
|> Task.async_stream(
fn repo -> sync_one_repo(repo, config, counter, total) end,
max_concurrency: config.concurrency,
Expand All @@ -205,7 +300,6 @@ defmodule SyncAll do
{:ok, result} -> result
{:exit, _reason} -> %{repo: "?", dirty: false, fetched: false, pulled: false, pushed: false, diverged: false, error: "timeout"}
end)
|> tap(fn _ -> IO.puts("") end) # Clear progress line
end

defp sync_one_repo(repo, config, counter, total) do
Expand Down
Loading