diff --git a/README.md b/README.md
index 5ab4c06..ce5a51d 100644
--- a/README.md
+++ b/README.md
@@ -62,7 +62,17 @@ You can also install it with [Homebrew](#homebrew) or [Docker](#docker).
pgslice analyze
```
-7. Swap the intermediate table with the original table
+7. Sync/Validate the tables
+
+This will ensure the two tables are definitely in sync. It should be a no-op, but will generate
+INSERT, UPDATE, and DELETE statements if discrepencies are discovered. On a production system,
+ensure you understand the `--window-size`, `--delay`, and `--delay-multiplier options`.
+
+```sh
+pgslice synchronize [options]
+```
+
+8. Swap the intermediate table with the original table
```sh
pgslice swap
@@ -70,13 +80,15 @@ You can also install it with [Homebrew](#homebrew) or [Docker](#docker).
The original table is renamed `_retired` and the intermediate table is renamed ``.
-8. Fill the rest (rows inserted between the first fill and the swap)
+9. Fill the rest (rows inserted between the first fill and the swap)
+
+This step should not be needed if you did the pgslice synchronize in step 7.
```sh
pgslice fill --swapped
```
-9. Back up the retired table with a tool like [pg_dump](https://www.postgresql.org/docs/current/static/app-pgdump.html) and drop it
+10. Back up the retired table with a tool like [pg_dump](https://www.postgresql.org/docs/current/static/app-pgdump.html) and drop it
```sql
pg_dump -c -Fc -t _retired $PGSLICE_URL > _retired.dump
diff --git a/lib/pgslice.rb b/lib/pgslice.rb
index a8bf0cf..4732176 100644
--- a/lib/pgslice.rb
+++ b/lib/pgslice.rb
@@ -21,3 +21,4 @@
require_relative "pgslice/cli/swap"
require_relative "pgslice/cli/unprep"
require_relative "pgslice/cli/unswap"
+require_relative "pgslice/cli/synchronize"
diff --git a/lib/pgslice/cli/synchronize.rb b/lib/pgslice/cli/synchronize.rb
new file mode 100644
index 0000000..d650ef9
--- /dev/null
+++ b/lib/pgslice/cli/synchronize.rb
@@ -0,0 +1,299 @@
+module PgSlice
+ class CLI
+ desc "synchronize TABLE", "Synchronize data between two tables"
+ option :source_table, type: :string, desc: "Source table to compare (default: TABLE)"
+ option :target_table, type: :string, desc: "Target table to compare (default: TABLE_intermediate)"
+ option :primary_key, type: :string, desc: "Primary key column name"
+ option :start, type: :string, desc: "Primary key value to start synchronization at"
+ option :window_size, type: :numeric, default: 1000, desc: "Number of rows to synchronize per batch"
+ option :delay, type: :numeric, default: 0, desc: "Base delay in seconds between batches (M)"
+ option :delay_multiplier, type: :numeric, default: 0, desc: "Delay multiplier for batch time (P)"
+ def synchronize(table_name)
+ table = create_table(table_name)
+
+ # Determine source and target tables
+ source_table = options[:source_table] ? create_table(options[:source_table]) : table
+ target_table = options[:target_table] ? create_table(options[:target_table]) : table.intermediate_table
+
+ # Verify both tables exist
+ assert_table(source_table)
+ assert_table(target_table)
+
+ # Get and verify schemas match
+ source_schema = get_table_schema(source_table)
+ target_schema = get_table_schema(target_table)
+ verify_schemas_match(source_table, target_table, source_schema, target_schema)
+
+ # Get primary key
+ primary_key = options[:primary_key] || source_table.primary_key&.first
+ abort "Primary key not found. Specify with --primary-key" unless primary_key
+ abort "Primary key '#{primary_key}' not found in source table" unless source_schema[primary_key]
+
+ # Determine starting value
+ starting_id = options[:start]
+ unless starting_id
+ starting_id = get_min_id(source_table, primary_key)
+ abort "No rows found in source table" unless starting_id
+ end
+
+ # Get parameters
+ window_size = options[:window_size]
+ base_delay = options[:delay]
+ delay_multiplier = options[:delay_multiplier]
+ dry_run = options[:dry_run]
+
+ log "Synchronizing #{source_table} to #{target_table}"
+ log "Mode: #{dry_run ? 'DRY RUN (logging only)' : 'WRITE (executing changes)'}"
+ log "Primary key: #{primary_key}"
+ log "Starting at: #{starting_id}"
+ log "Window size: #{window_size}"
+ log "Base delay: #{base_delay}s"
+ log "Delay multiplier: #{delay_multiplier}"
+ log
+
+ # Statistics
+ stats = {
+ total_rows: 0,
+ matching_rows: 0,
+ rows_with_differences: 0,
+ missing_rows: 0,
+ extra_rows: 0,
+ batches: 0
+ }
+
+ columns = source_schema.keys
+
+ # Main synchronization loop
+ first_batch = true
+ loop do
+ batch_start_time = Time.now
+
+ # Fetch batch from source
+ source_rows = fetch_batch(source_table, primary_key, starting_id, window_size, columns, first_batch)
+ break if source_rows.empty?
+
+ stats[:batches] += 1
+ first_batch = false
+ stats[:total_rows] += source_rows.size
+
+ # Get primary keys and range from source batch
+ source_pks = source_rows.map { |row| row[primary_key] }
+ first_source_pk = source_rows.first[primary_key]
+ last_source_pk = source_rows.last[primary_key]
+
+ # Fetch corresponding rows from target using range query to catch deletions
+ target_rows = fetch_rows_by_range(target_table, primary_key, first_source_pk, last_source_pk, columns)
+ target_rows_by_pk = target_rows.each_with_object({}) { |row, hash| hash[row[primary_key]] = row }
+
+ # Compare and generate fix queries
+ fix_queries = []
+
+ source_rows.each do |source_row|
+ pk_value = source_row[primary_key]
+ target_row = target_rows_by_pk[pk_value]
+
+ if target_row.nil?
+ # Missing row in target
+ stats[:missing_rows] += 1
+ fix_queries << generate_insert(target_table, source_row, columns)
+ elsif rows_differ?(source_row, target_row, columns)
+ # Rows differ
+ stats[:rows_with_differences] += 1
+ fix_queries << generate_update(target_table, primary_key, source_row, columns)
+ else
+ # Rows match
+ stats[:matching_rows] += 1
+ end
+ end
+
+ # Check for extra rows in target (rows in target but not in source batch)
+ # Note: This only checks within the current batch window
+ extra_pks = target_rows_by_pk.keys - source_pks
+ extra_pks.each do |pk_value|
+ stats[:extra_rows] += 1
+ fix_queries << generate_delete(target_table, primary_key, pk_value)
+ end
+
+ # Get first and last primary key for logging
+ first_pk = source_rows.first[primary_key]
+ last_pk = source_rows.last[primary_key]
+ pk_range = first_pk == last_pk ? "#{first_pk}" : "#{first_pk}...#{last_pk}"
+
+ # Execute or log fix queries
+ if fix_queries.any?
+ log_with_timestamp "Batch #{stats[:batches]}: Found #{fix_queries.size} differences (keys in range #{pk_range})"
+ if dry_run
+ log_sql "-- Dry run mode: logging statements (not executing)"
+ fix_queries.each { |query| log_sql query }
+ log_sql
+ else
+ # In write mode, log truncated SQL and execute without auto-logging
+ fix_queries.each { |query| log_sql truncate_sql_for_log(query) }
+ run_queries(fix_queries, silent: true)
+ end
+ else
+ log_with_timestamp "Batch #{stats[:batches]}: All #{source_rows.size} rows match (keys in range #{pk_range})"
+ end
+
+ # Update starting_id for next batch (use > not >=)
+ starting_id = source_rows.last[primary_key]
+
+ # Calculate adaptive delay: M + N*P
+ batch_duration = Time.now - batch_start_time
+ sleep_time = base_delay + (batch_duration * delay_multiplier)
+ if sleep_time > 0
+ log_with_timestamp "Sleeping #{sleep_time.round(2)}s (#{base_delay}s base + #{batch_duration.round(2)}s batch time * #{delay_multiplier} multiplier)"
+ sleep(sleep_time)
+ end
+
+ # Break if we processed fewer rows than window size (last batch)
+ break if source_rows.size < window_size
+ end
+
+ # Print summary
+ log
+ log "Synchronization complete"
+ log "=" * 50
+ log "Total batches: #{stats[:batches]}"
+ log "Total rows compared: #{stats[:total_rows]}"
+ log "Matching rows: #{stats[:matching_rows]}"
+ log "Rows with differences: #{stats[:rows_with_differences]}"
+ log "Missing rows: #{stats[:missing_rows]}"
+ log "Extra rows: #{stats[:extra_rows]}"
+ end
+
+ private
+
+ def log_with_timestamp(message)
+ timestamp = Time.now.strftime("%Y-%m-%d %H:%M:%S")
+ log "[#{timestamp}] #{message}"
+ end
+
+ def get_table_schema(table)
+ query = <<~SQL
+ SELECT column_name, data_type, character_maximum_length, numeric_precision, numeric_scale
+ FROM information_schema.columns
+ WHERE table_schema = $1 AND table_name = $2 AND is_generated = 'NEVER'
+ ORDER BY ordinal_position
+ SQL
+ rows = execute(query, [table.schema, table.name])
+ rows.each_with_object({}) do |row, hash|
+ hash[row["column_name"]] = {
+ data_type: row["data_type"],
+ character_maximum_length: row["character_maximum_length"],
+ numeric_precision: row["numeric_precision"],
+ numeric_scale: row["numeric_scale"]
+ }
+ end
+ end
+
+ def verify_schemas_match(source_table, target_table, source_schema, target_schema)
+ source_schema.each do |col_name, col_spec|
+ target_spec = target_schema[col_name]
+ abort "Column '#{col_name}' exists in #{source_table} but not in #{target_table}" unless target_spec
+
+ if col_spec[:data_type] != target_spec[:data_type]
+ abort "Column '#{col_name}' type mismatch: #{source_table} has #{col_spec[:data_type]}, #{target_table} has #{target_spec[:data_type]}"
+ end
+ end
+
+ target_schema.each do |col_name, _|
+ abort "Column '#{col_name}' exists in #{target_table} but not in #{source_table}" unless source_schema[col_name]
+ end
+ end
+
+ def get_min_id(table, primary_key)
+ query = "SELECT #{quote_ident(primary_key)} FROM #{quote_table(table)} ORDER BY #{quote_ident(primary_key)} LIMIT 1"
+ result = execute(query)
+ result.first&.values&.first
+ end
+
+ def fetch_batch(table, primary_key, starting_id, limit, columns, first_batch = false)
+ column_list = columns.map { |c| quote_ident(c) }.join(", ")
+ # Use >= for first batch to include starting_id, > for subsequent batches
+ operator = first_batch ? ">=" : ">"
+ query = <<~SQL
+ SELECT #{column_list}
+ FROM #{quote_table(table)}
+ WHERE #{quote_ident(primary_key)} #{operator} #{quote(starting_id)}
+ ORDER BY #{quote_ident(primary_key)}
+ LIMIT #{limit.to_i}
+ SQL
+ execute(query)
+ end
+
+ def fetch_rows_by_pks(table, primary_key, pk_values, columns)
+ return [] if pk_values.empty?
+
+ column_list = columns.map { |c| quote_ident(c) }.join(", ")
+ # Build IN clause with proper quoting
+ pk_list = pk_values.map { |pk| quote(pk) }.join(", ")
+ query = <<~SQL
+ SELECT #{column_list}
+ FROM #{quote_table(table)}
+ WHERE #{quote_ident(primary_key)} IN (#{pk_list})
+ SQL
+ execute(query)
+ end
+
+ def fetch_rows_by_range(table, primary_key, first_pk, last_pk, columns)
+ column_list = columns.map { |c| quote_ident(c) }.join(", ")
+ query = <<~SQL
+ SELECT #{column_list}
+ FROM #{quote_table(table)}
+ WHERE #{quote_ident(primary_key)} >= #{quote(first_pk)}
+ AND #{quote_ident(primary_key)} <= #{quote(last_pk)}
+ ORDER BY #{quote_ident(primary_key)}
+ SQL
+ execute(query)
+ end
+
+ def rows_differ?(source_row, target_row, columns)
+ columns.any? { |col| source_row[col] != target_row[col] }
+ end
+
+ def generate_insert(table, row, columns)
+ column_list = columns.map { |c| quote_ident(c) }.join(", ")
+ value_list = columns.map { |c| quote(row[c]) }.join(", ")
+ "INSERT INTO #{quote_table(table)} (#{column_list}) VALUES (#{value_list});"
+ end
+
+ def generate_update(table, primary_key, row, columns)
+ set_clause = columns.reject { |c| c == primary_key }.map { |c| "#{quote_ident(c)} = #{quote(row[c])}" }.join(", ")
+ "UPDATE #{quote_table(table)} SET #{set_clause} WHERE #{quote_ident(primary_key)} = #{quote(row[primary_key])};"
+ end
+
+ def generate_delete(table, primary_key, pk_value)
+ "DELETE FROM #{quote_table(table)} WHERE #{quote_ident(primary_key)} = #{quote(pk_value)};"
+ end
+
+ def truncate_sql_for_log(sql)
+ # For INSERT statements: show "INSERT INTO table... VALUES(first 20 chars...[truncated]"
+ if sql =~ /\A(INSERT INTO [^\s]+)\s.*?\sVALUES\s*\((.*)\);?\z/i
+ table_part = $1
+ values_part = $2
+ preview = values_part[0, 20]
+ return "#{table_part}... VALUES(#{preview}...[truncated]"
+ end
+
+ # For UPDATE statements: show "UPDATE table... SET...[truncated]"
+ if sql =~ /\A(UPDATE [^\s]+)\s+SET\s+(.*?)\s+WHERE/i
+ table_part = $1
+ set_part = $2
+ preview = set_part[0, 20]
+ return "#{table_part}... SET #{preview}...[truncated]"
+ end
+
+ # For DELETE statements: show "DELETE FROM table WHERE...[truncated]"
+ if sql =~ /\A(DELETE FROM [^\s]+)\s+WHERE\s+(.*);?\z/i
+ table_part = $1
+ where_part = $2
+ preview = where_part[0, 20]
+ return "#{table_part}... WHERE #{preview}...[truncated]"
+ end
+
+ # Fallback: just show first 50 chars
+ sql[0, 50] + "...[truncated]"
+ end
+ end
+end
diff --git a/lib/pgslice/helpers.rb b/lib/pgslice/helpers.rb
index ee2b1b3..0c90bdd 100644
--- a/lib/pgslice/helpers.rb
+++ b/lib/pgslice/helpers.rb
@@ -6,6 +6,9 @@ module Helpers
year: "YYYY"
}
+ # ULID epoch start corresponding to 01/01/1970
+ DEFAULT_ULID = "00000H5A406P0C3DQMCQ5MV6WQ"
+
protected
# output
@@ -61,18 +64,20 @@ def execute(query, params = [])
connection.exec_params(query, params).to_a
end
- def run_queries(queries)
+ def run_queries(queries, silent: false)
connection.transaction do
execute("SET LOCAL client_min_messages TO warning") unless options[:dry_run]
- log_sql "BEGIN;"
- log_sql
- run_queries_without_transaction(queries)
- log_sql "COMMIT;"
+ unless silent
+ log_sql "BEGIN;"
+ log_sql
+ end
+ run_queries_without_transaction(queries, silent: silent)
+ log_sql "COMMIT;" unless silent
end
end
- def run_query(query)
- log_sql query
+ def run_query(query, silent: false)
+ log_sql query unless silent
unless options[:dry_run]
begin
execute(query)
@@ -80,12 +85,12 @@ def run_query(query)
abort "#{e.class.name}: #{e.message}"
end
end
- log_sql
+ log_sql unless silent
end
- def run_queries_without_transaction(queries)
+ def run_queries_without_transaction(queries, silent: false)
queries.each do |query|
- run_query(query)
+ run_query(query, silent: silent)
end
end
@@ -167,7 +172,9 @@ def quote_ident(value)
end
def quote(value)
- if value.is_a?(Numeric)
+ if value.nil?
+ "NULL"
+ elsif value.is_a?(Numeric)
value
else
connection.escape_literal(value)
@@ -178,6 +185,108 @@ def quote_table(table)
table.quote_table
end
+ # ULID helper methods
+ def ulid?(value)
+ return false unless value.is_a?(String)
+ # Match pure ULIDs or ULIDs with prefixes
+ value.match?(/\A[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) ||
+ value.match?(/.*[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/)
+ end
+
+ def numeric_id?(value)
+ value.is_a?(Numeric) || (value.is_a?(String) && value.match?(/\A\d+\z/))
+ end
+
+ def id_type(value)
+ return :numeric if numeric_id?(value)
+ return :ulid if ulid?(value)
+ :unknown
+ end
+
+ # Factory method to get the appropriate ID handler
+ def id_handler(sample_id, connection = nil, table = nil, primary_key = nil)
+ if ulid?(sample_id)
+ UlidHandler.new(connection, table, primary_key)
+ else
+ NumericHandler.new
+ end
+ end
+
+ class NumericHandler
+ def min_value
+ 1
+ end
+
+ def predecessor(id)
+ id - 1
+ end
+
+ def should_continue?(current_id, max_id)
+ current_id < max_id
+ end
+
+ def batch_count(starting_id, max_id, batch_size)
+ ((max_id - starting_id) / batch_size.to_f).ceil
+ end
+
+ def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false)
+ helpers = PgSlice::CLI.instance
+ operator = inclusive ? ">=" : ">"
+ "#{helpers.quote_ident(primary_key)} #{operator} #{helpers.quote(starting_id)} AND #{helpers.quote_ident(primary_key)} <= #{helpers.quote(starting_id + batch_size)}"
+ end
+
+ def next_starting_id(starting_id, batch_size)
+ starting_id + batch_size
+ end
+ end
+
+ class UlidHandler
+ def initialize(connection = nil, table = nil, primary_key = nil)
+ @connection = connection
+ @table = table
+ @primary_key = primary_key
+ end
+
+ def min_value
+ PgSlice::Helpers::DEFAULT_ULID
+ end
+
+ def predecessor(id)
+ # Use database lookup to find the actual predecessor
+ return PgSlice::Helpers::DEFAULT_ULID unless @connection && @table && @primary_key
+
+ query = <<~SQL
+ SELECT MAX(#{PG::Connection.quote_ident(@primary_key)})
+ FROM #{@table.quote_table}
+ WHERE #{PG::Connection.quote_ident(@primary_key)} < '#{id}'
+ SQL
+
+ log_sql query
+ result = @connection.exec(query)
+ predecessor_id = result[0]["max"]
+ predecessor_id || PgSlice::Helpers::DEFAULT_ULID
+ end
+
+ def should_continue?(current_id, max_id)
+ current_id < max_id
+ end
+
+ def batch_count(starting_id, max_id, batch_size)
+ nil # Unknown for ULIDs
+ end
+
+ def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false)
+ operator = inclusive ? ">=" : ">"
+ "#{PG::Connection.quote_ident(primary_key)} #{operator} '#{starting_id}'"
+ end
+
+ def next_starting_id(starting_id, batch_size)
+ # For ULIDs, we need to get the max ID from the current batch
+ # This will be handled in the fill logic
+ nil
+ end
+ end
+
def quote_no_schema(table)
quote_ident(table.name)
end
@@ -205,5 +314,85 @@ def make_stat_def(stat_def, table)
stat_name = "#{table}_#{m[1].split(", ").map { |v| v.gsub(/\W/i, "") }.join("_")}_stat"
stat_def.sub(/ FROM \S+/, " FROM #{quote_table(table)}").sub(/ STATISTICS .+ ON /, " STATISTICS #{quote_ident(stat_name)} ON ") + ";"
end
+
+ # mirroring triggers
+
+ def enable_mirroring_triggers(table)
+ intermediate_table = table.intermediate_table
+ function_name = "#{table.name}_mirror_to_intermediate"
+ trigger_name = "#{table.name}_mirror_trigger"
+
+ queries = []
+
+ # create mirror function
+ queries << <<~SQL
+ CREATE OR REPLACE FUNCTION #{quote_ident(function_name)}()
+ RETURNS TRIGGER AS $$
+ BEGIN
+ IF TG_OP = 'DELETE' THEN
+ DELETE FROM #{quote_table(intermediate_table)} WHERE #{mirror_where_clause(table, 'OLD')};
+ RETURN OLD;
+ ELSIF TG_OP = 'UPDATE' THEN
+ UPDATE #{quote_table(intermediate_table)} SET #{mirror_set_clause(table)} WHERE #{mirror_where_clause(table, 'OLD')};
+ RETURN NEW;
+ ELSIF TG_OP = 'INSERT' THEN
+ INSERT INTO #{quote_table(intermediate_table)} (#{mirror_column_list(table)}) VALUES (#{mirror_new_tuple_list(table)});
+ RETURN NEW;
+ END IF;
+ RETURN NULL;
+ END;
+ $$ LANGUAGE plpgsql;
+ SQL
+
+ # create trigger
+ queries << <<~SQL
+ CREATE TRIGGER #{quote_ident(trigger_name)}
+ AFTER INSERT OR UPDATE OR DELETE ON #{quote_table(table)}
+ FOR EACH ROW EXECUTE FUNCTION #{quote_ident(function_name)}();
+ SQL
+
+ run_queries(queries)
+ end
+
+ def disable_mirroring_triggers(table)
+ function_name = "#{table.name}_mirror_to_intermediate"
+ trigger_name = "#{table.name}_mirror_trigger"
+
+ queries = []
+
+ # drop trigger
+ queries << <<~SQL
+ DROP TRIGGER IF EXISTS #{quote_ident(trigger_name)} ON #{quote_table(table)};
+ SQL
+
+ # drop function
+ queries << <<~SQL
+ DROP FUNCTION IF EXISTS #{quote_ident(function_name)}();
+ SQL
+
+ run_queries(queries)
+ end
+
+ def mirror_column_list(table)
+ table.columns.map { |column| quote_ident(column) }.join(", ")
+ end
+
+ def mirror_new_tuple_list(table)
+ table.columns.map { |column| "NEW.#{quote_ident(column)}" }.join(", ")
+ end
+
+ def mirror_set_clause(table)
+ table.columns.map { |column| "#{quote_ident(column)} = NEW.#{quote_ident(column)}" }.join(", ")
+ end
+
+ def mirror_where_clause(table, record)
+ primary_keys = table.primary_key
+ if primary_keys && primary_keys.any?
+ primary_keys.map { |pk| "#{quote_ident(pk)} = #{record}.#{quote_ident(pk)}" }.join(" AND ")
+ else
+ # fallback to all columns if no primary key
+ table.columns.map { |column| "#{quote_ident(column)} = #{record}.#{quote_ident(column)}" }.join(" AND ")
+ end
+ end
end
end