diff --git a/README.md b/README.md index 5ab4c06..ce5a51d 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,17 @@ You can also install it with [Homebrew](#homebrew) or [Docker](#docker). pgslice analyze ``` -7. Swap the intermediate table with the original table +7. Sync/Validate the tables + +This will ensure the two tables are definitely in sync. It should be a no-op, but will generate +INSERT, UPDATE, and DELETE statements if discrepencies are discovered. On a production system, +ensure you understand the `--window-size`, `--delay`, and `--delay-multiplier options`. + +```sh +pgslice synchronize
[options] +``` + +8. Swap the intermediate table with the original table ```sh pgslice swap
@@ -70,13 +80,15 @@ You can also install it with [Homebrew](#homebrew) or [Docker](#docker). The original table is renamed `
_retired` and the intermediate table is renamed `
`. -8. Fill the rest (rows inserted between the first fill and the swap) +9. Fill the rest (rows inserted between the first fill and the swap) + +This step should not be needed if you did the pgslice synchronize in step 7. ```sh pgslice fill
--swapped ``` -9. Back up the retired table with a tool like [pg_dump](https://www.postgresql.org/docs/current/static/app-pgdump.html) and drop it +10. Back up the retired table with a tool like [pg_dump](https://www.postgresql.org/docs/current/static/app-pgdump.html) and drop it ```sql pg_dump -c -Fc -t
_retired $PGSLICE_URL >
_retired.dump diff --git a/lib/pgslice.rb b/lib/pgslice.rb index a8bf0cf..4732176 100644 --- a/lib/pgslice.rb +++ b/lib/pgslice.rb @@ -21,3 +21,4 @@ require_relative "pgslice/cli/swap" require_relative "pgslice/cli/unprep" require_relative "pgslice/cli/unswap" +require_relative "pgslice/cli/synchronize" diff --git a/lib/pgslice/cli/synchronize.rb b/lib/pgslice/cli/synchronize.rb new file mode 100644 index 0000000..d650ef9 --- /dev/null +++ b/lib/pgslice/cli/synchronize.rb @@ -0,0 +1,299 @@ +module PgSlice + class CLI + desc "synchronize TABLE", "Synchronize data between two tables" + option :source_table, type: :string, desc: "Source table to compare (default: TABLE)" + option :target_table, type: :string, desc: "Target table to compare (default: TABLE_intermediate)" + option :primary_key, type: :string, desc: "Primary key column name" + option :start, type: :string, desc: "Primary key value to start synchronization at" + option :window_size, type: :numeric, default: 1000, desc: "Number of rows to synchronize per batch" + option :delay, type: :numeric, default: 0, desc: "Base delay in seconds between batches (M)" + option :delay_multiplier, type: :numeric, default: 0, desc: "Delay multiplier for batch time (P)" + def synchronize(table_name) + table = create_table(table_name) + + # Determine source and target tables + source_table = options[:source_table] ? create_table(options[:source_table]) : table + target_table = options[:target_table] ? create_table(options[:target_table]) : table.intermediate_table + + # Verify both tables exist + assert_table(source_table) + assert_table(target_table) + + # Get and verify schemas match + source_schema = get_table_schema(source_table) + target_schema = get_table_schema(target_table) + verify_schemas_match(source_table, target_table, source_schema, target_schema) + + # Get primary key + primary_key = options[:primary_key] || source_table.primary_key&.first + abort "Primary key not found. Specify with --primary-key" unless primary_key + abort "Primary key '#{primary_key}' not found in source table" unless source_schema[primary_key] + + # Determine starting value + starting_id = options[:start] + unless starting_id + starting_id = get_min_id(source_table, primary_key) + abort "No rows found in source table" unless starting_id + end + + # Get parameters + window_size = options[:window_size] + base_delay = options[:delay] + delay_multiplier = options[:delay_multiplier] + dry_run = options[:dry_run] + + log "Synchronizing #{source_table} to #{target_table}" + log "Mode: #{dry_run ? 'DRY RUN (logging only)' : 'WRITE (executing changes)'}" + log "Primary key: #{primary_key}" + log "Starting at: #{starting_id}" + log "Window size: #{window_size}" + log "Base delay: #{base_delay}s" + log "Delay multiplier: #{delay_multiplier}" + log + + # Statistics + stats = { + total_rows: 0, + matching_rows: 0, + rows_with_differences: 0, + missing_rows: 0, + extra_rows: 0, + batches: 0 + } + + columns = source_schema.keys + + # Main synchronization loop + first_batch = true + loop do + batch_start_time = Time.now + + # Fetch batch from source + source_rows = fetch_batch(source_table, primary_key, starting_id, window_size, columns, first_batch) + break if source_rows.empty? + + stats[:batches] += 1 + first_batch = false + stats[:total_rows] += source_rows.size + + # Get primary keys and range from source batch + source_pks = source_rows.map { |row| row[primary_key] } + first_source_pk = source_rows.first[primary_key] + last_source_pk = source_rows.last[primary_key] + + # Fetch corresponding rows from target using range query to catch deletions + target_rows = fetch_rows_by_range(target_table, primary_key, first_source_pk, last_source_pk, columns) + target_rows_by_pk = target_rows.each_with_object({}) { |row, hash| hash[row[primary_key]] = row } + + # Compare and generate fix queries + fix_queries = [] + + source_rows.each do |source_row| + pk_value = source_row[primary_key] + target_row = target_rows_by_pk[pk_value] + + if target_row.nil? + # Missing row in target + stats[:missing_rows] += 1 + fix_queries << generate_insert(target_table, source_row, columns) + elsif rows_differ?(source_row, target_row, columns) + # Rows differ + stats[:rows_with_differences] += 1 + fix_queries << generate_update(target_table, primary_key, source_row, columns) + else + # Rows match + stats[:matching_rows] += 1 + end + end + + # Check for extra rows in target (rows in target but not in source batch) + # Note: This only checks within the current batch window + extra_pks = target_rows_by_pk.keys - source_pks + extra_pks.each do |pk_value| + stats[:extra_rows] += 1 + fix_queries << generate_delete(target_table, primary_key, pk_value) + end + + # Get first and last primary key for logging + first_pk = source_rows.first[primary_key] + last_pk = source_rows.last[primary_key] + pk_range = first_pk == last_pk ? "#{first_pk}" : "#{first_pk}...#{last_pk}" + + # Execute or log fix queries + if fix_queries.any? + log_with_timestamp "Batch #{stats[:batches]}: Found #{fix_queries.size} differences (keys in range #{pk_range})" + if dry_run + log_sql "-- Dry run mode: logging statements (not executing)" + fix_queries.each { |query| log_sql query } + log_sql + else + # In write mode, log truncated SQL and execute without auto-logging + fix_queries.each { |query| log_sql truncate_sql_for_log(query) } + run_queries(fix_queries, silent: true) + end + else + log_with_timestamp "Batch #{stats[:batches]}: All #{source_rows.size} rows match (keys in range #{pk_range})" + end + + # Update starting_id for next batch (use > not >=) + starting_id = source_rows.last[primary_key] + + # Calculate adaptive delay: M + N*P + batch_duration = Time.now - batch_start_time + sleep_time = base_delay + (batch_duration * delay_multiplier) + if sleep_time > 0 + log_with_timestamp "Sleeping #{sleep_time.round(2)}s (#{base_delay}s base + #{batch_duration.round(2)}s batch time * #{delay_multiplier} multiplier)" + sleep(sleep_time) + end + + # Break if we processed fewer rows than window size (last batch) + break if source_rows.size < window_size + end + + # Print summary + log + log "Synchronization complete" + log "=" * 50 + log "Total batches: #{stats[:batches]}" + log "Total rows compared: #{stats[:total_rows]}" + log "Matching rows: #{stats[:matching_rows]}" + log "Rows with differences: #{stats[:rows_with_differences]}" + log "Missing rows: #{stats[:missing_rows]}" + log "Extra rows: #{stats[:extra_rows]}" + end + + private + + def log_with_timestamp(message) + timestamp = Time.now.strftime("%Y-%m-%d %H:%M:%S") + log "[#{timestamp}] #{message}" + end + + def get_table_schema(table) + query = <<~SQL + SELECT column_name, data_type, character_maximum_length, numeric_precision, numeric_scale + FROM information_schema.columns + WHERE table_schema = $1 AND table_name = $2 AND is_generated = 'NEVER' + ORDER BY ordinal_position + SQL + rows = execute(query, [table.schema, table.name]) + rows.each_with_object({}) do |row, hash| + hash[row["column_name"]] = { + data_type: row["data_type"], + character_maximum_length: row["character_maximum_length"], + numeric_precision: row["numeric_precision"], + numeric_scale: row["numeric_scale"] + } + end + end + + def verify_schemas_match(source_table, target_table, source_schema, target_schema) + source_schema.each do |col_name, col_spec| + target_spec = target_schema[col_name] + abort "Column '#{col_name}' exists in #{source_table} but not in #{target_table}" unless target_spec + + if col_spec[:data_type] != target_spec[:data_type] + abort "Column '#{col_name}' type mismatch: #{source_table} has #{col_spec[:data_type]}, #{target_table} has #{target_spec[:data_type]}" + end + end + + target_schema.each do |col_name, _| + abort "Column '#{col_name}' exists in #{target_table} but not in #{source_table}" unless source_schema[col_name] + end + end + + def get_min_id(table, primary_key) + query = "SELECT #{quote_ident(primary_key)} FROM #{quote_table(table)} ORDER BY #{quote_ident(primary_key)} LIMIT 1" + result = execute(query) + result.first&.values&.first + end + + def fetch_batch(table, primary_key, starting_id, limit, columns, first_batch = false) + column_list = columns.map { |c| quote_ident(c) }.join(", ") + # Use >= for first batch to include starting_id, > for subsequent batches + operator = first_batch ? ">=" : ">" + query = <<~SQL + SELECT #{column_list} + FROM #{quote_table(table)} + WHERE #{quote_ident(primary_key)} #{operator} #{quote(starting_id)} + ORDER BY #{quote_ident(primary_key)} + LIMIT #{limit.to_i} + SQL + execute(query) + end + + def fetch_rows_by_pks(table, primary_key, pk_values, columns) + return [] if pk_values.empty? + + column_list = columns.map { |c| quote_ident(c) }.join(", ") + # Build IN clause with proper quoting + pk_list = pk_values.map { |pk| quote(pk) }.join(", ") + query = <<~SQL + SELECT #{column_list} + FROM #{quote_table(table)} + WHERE #{quote_ident(primary_key)} IN (#{pk_list}) + SQL + execute(query) + end + + def fetch_rows_by_range(table, primary_key, first_pk, last_pk, columns) + column_list = columns.map { |c| quote_ident(c) }.join(", ") + query = <<~SQL + SELECT #{column_list} + FROM #{quote_table(table)} + WHERE #{quote_ident(primary_key)} >= #{quote(first_pk)} + AND #{quote_ident(primary_key)} <= #{quote(last_pk)} + ORDER BY #{quote_ident(primary_key)} + SQL + execute(query) + end + + def rows_differ?(source_row, target_row, columns) + columns.any? { |col| source_row[col] != target_row[col] } + end + + def generate_insert(table, row, columns) + column_list = columns.map { |c| quote_ident(c) }.join(", ") + value_list = columns.map { |c| quote(row[c]) }.join(", ") + "INSERT INTO #{quote_table(table)} (#{column_list}) VALUES (#{value_list});" + end + + def generate_update(table, primary_key, row, columns) + set_clause = columns.reject { |c| c == primary_key }.map { |c| "#{quote_ident(c)} = #{quote(row[c])}" }.join(", ") + "UPDATE #{quote_table(table)} SET #{set_clause} WHERE #{quote_ident(primary_key)} = #{quote(row[primary_key])};" + end + + def generate_delete(table, primary_key, pk_value) + "DELETE FROM #{quote_table(table)} WHERE #{quote_ident(primary_key)} = #{quote(pk_value)};" + end + + def truncate_sql_for_log(sql) + # For INSERT statements: show "INSERT INTO table... VALUES(first 20 chars...[truncated]" + if sql =~ /\A(INSERT INTO [^\s]+)\s.*?\sVALUES\s*\((.*)\);?\z/i + table_part = $1 + values_part = $2 + preview = values_part[0, 20] + return "#{table_part}... VALUES(#{preview}...[truncated]" + end + + # For UPDATE statements: show "UPDATE table... SET...[truncated]" + if sql =~ /\A(UPDATE [^\s]+)\s+SET\s+(.*?)\s+WHERE/i + table_part = $1 + set_part = $2 + preview = set_part[0, 20] + return "#{table_part}... SET #{preview}...[truncated]" + end + + # For DELETE statements: show "DELETE FROM table WHERE...[truncated]" + if sql =~ /\A(DELETE FROM [^\s]+)\s+WHERE\s+(.*);?\z/i + table_part = $1 + where_part = $2 + preview = where_part[0, 20] + return "#{table_part}... WHERE #{preview}...[truncated]" + end + + # Fallback: just show first 50 chars + sql[0, 50] + "...[truncated]" + end + end +end diff --git a/lib/pgslice/helpers.rb b/lib/pgslice/helpers.rb index ee2b1b3..0c90bdd 100644 --- a/lib/pgslice/helpers.rb +++ b/lib/pgslice/helpers.rb @@ -6,6 +6,9 @@ module Helpers year: "YYYY" } + # ULID epoch start corresponding to 01/01/1970 + DEFAULT_ULID = "00000H5A406P0C3DQMCQ5MV6WQ" + protected # output @@ -61,18 +64,20 @@ def execute(query, params = []) connection.exec_params(query, params).to_a end - def run_queries(queries) + def run_queries(queries, silent: false) connection.transaction do execute("SET LOCAL client_min_messages TO warning") unless options[:dry_run] - log_sql "BEGIN;" - log_sql - run_queries_without_transaction(queries) - log_sql "COMMIT;" + unless silent + log_sql "BEGIN;" + log_sql + end + run_queries_without_transaction(queries, silent: silent) + log_sql "COMMIT;" unless silent end end - def run_query(query) - log_sql query + def run_query(query, silent: false) + log_sql query unless silent unless options[:dry_run] begin execute(query) @@ -80,12 +85,12 @@ def run_query(query) abort "#{e.class.name}: #{e.message}" end end - log_sql + log_sql unless silent end - def run_queries_without_transaction(queries) + def run_queries_without_transaction(queries, silent: false) queries.each do |query| - run_query(query) + run_query(query, silent: silent) end end @@ -167,7 +172,9 @@ def quote_ident(value) end def quote(value) - if value.is_a?(Numeric) + if value.nil? + "NULL" + elsif value.is_a?(Numeric) value else connection.escape_literal(value) @@ -178,6 +185,108 @@ def quote_table(table) table.quote_table end + # ULID helper methods + def ulid?(value) + return false unless value.is_a?(String) + # Match pure ULIDs or ULIDs with prefixes + value.match?(/\A[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) || + value.match?(/.*[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) + end + + def numeric_id?(value) + value.is_a?(Numeric) || (value.is_a?(String) && value.match?(/\A\d+\z/)) + end + + def id_type(value) + return :numeric if numeric_id?(value) + return :ulid if ulid?(value) + :unknown + end + + # Factory method to get the appropriate ID handler + def id_handler(sample_id, connection = nil, table = nil, primary_key = nil) + if ulid?(sample_id) + UlidHandler.new(connection, table, primary_key) + else + NumericHandler.new + end + end + + class NumericHandler + def min_value + 1 + end + + def predecessor(id) + id - 1 + end + + def should_continue?(current_id, max_id) + current_id < max_id + end + + def batch_count(starting_id, max_id, batch_size) + ((max_id - starting_id) / batch_size.to_f).ceil + end + + def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false) + helpers = PgSlice::CLI.instance + operator = inclusive ? ">=" : ">" + "#{helpers.quote_ident(primary_key)} #{operator} #{helpers.quote(starting_id)} AND #{helpers.quote_ident(primary_key)} <= #{helpers.quote(starting_id + batch_size)}" + end + + def next_starting_id(starting_id, batch_size) + starting_id + batch_size + end + end + + class UlidHandler + def initialize(connection = nil, table = nil, primary_key = nil) + @connection = connection + @table = table + @primary_key = primary_key + end + + def min_value + PgSlice::Helpers::DEFAULT_ULID + end + + def predecessor(id) + # Use database lookup to find the actual predecessor + return PgSlice::Helpers::DEFAULT_ULID unless @connection && @table && @primary_key + + query = <<~SQL + SELECT MAX(#{PG::Connection.quote_ident(@primary_key)}) + FROM #{@table.quote_table} + WHERE #{PG::Connection.quote_ident(@primary_key)} < '#{id}' + SQL + + log_sql query + result = @connection.exec(query) + predecessor_id = result[0]["max"] + predecessor_id || PgSlice::Helpers::DEFAULT_ULID + end + + def should_continue?(current_id, max_id) + current_id < max_id + end + + def batch_count(starting_id, max_id, batch_size) + nil # Unknown for ULIDs + end + + def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false) + operator = inclusive ? ">=" : ">" + "#{PG::Connection.quote_ident(primary_key)} #{operator} '#{starting_id}'" + end + + def next_starting_id(starting_id, batch_size) + # For ULIDs, we need to get the max ID from the current batch + # This will be handled in the fill logic + nil + end + end + def quote_no_schema(table) quote_ident(table.name) end @@ -205,5 +314,85 @@ def make_stat_def(stat_def, table) stat_name = "#{table}_#{m[1].split(", ").map { |v| v.gsub(/\W/i, "") }.join("_")}_stat" stat_def.sub(/ FROM \S+/, " FROM #{quote_table(table)}").sub(/ STATISTICS .+ ON /, " STATISTICS #{quote_ident(stat_name)} ON ") + ";" end + + # mirroring triggers + + def enable_mirroring_triggers(table) + intermediate_table = table.intermediate_table + function_name = "#{table.name}_mirror_to_intermediate" + trigger_name = "#{table.name}_mirror_trigger" + + queries = [] + + # create mirror function + queries << <<~SQL + CREATE OR REPLACE FUNCTION #{quote_ident(function_name)}() + RETURNS TRIGGER AS $$ + BEGIN + IF TG_OP = 'DELETE' THEN + DELETE FROM #{quote_table(intermediate_table)} WHERE #{mirror_where_clause(table, 'OLD')}; + RETURN OLD; + ELSIF TG_OP = 'UPDATE' THEN + UPDATE #{quote_table(intermediate_table)} SET #{mirror_set_clause(table)} WHERE #{mirror_where_clause(table, 'OLD')}; + RETURN NEW; + ELSIF TG_OP = 'INSERT' THEN + INSERT INTO #{quote_table(intermediate_table)} (#{mirror_column_list(table)}) VALUES (#{mirror_new_tuple_list(table)}); + RETURN NEW; + END IF; + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + SQL + + # create trigger + queries << <<~SQL + CREATE TRIGGER #{quote_ident(trigger_name)} + AFTER INSERT OR UPDATE OR DELETE ON #{quote_table(table)} + FOR EACH ROW EXECUTE FUNCTION #{quote_ident(function_name)}(); + SQL + + run_queries(queries) + end + + def disable_mirroring_triggers(table) + function_name = "#{table.name}_mirror_to_intermediate" + trigger_name = "#{table.name}_mirror_trigger" + + queries = [] + + # drop trigger + queries << <<~SQL + DROP TRIGGER IF EXISTS #{quote_ident(trigger_name)} ON #{quote_table(table)}; + SQL + + # drop function + queries << <<~SQL + DROP FUNCTION IF EXISTS #{quote_ident(function_name)}(); + SQL + + run_queries(queries) + end + + def mirror_column_list(table) + table.columns.map { |column| quote_ident(column) }.join(", ") + end + + def mirror_new_tuple_list(table) + table.columns.map { |column| "NEW.#{quote_ident(column)}" }.join(", ") + end + + def mirror_set_clause(table) + table.columns.map { |column| "#{quote_ident(column)} = NEW.#{quote_ident(column)}" }.join(", ") + end + + def mirror_where_clause(table, record) + primary_keys = table.primary_key + if primary_keys && primary_keys.any? + primary_keys.map { |pk| "#{quote_ident(pk)} = #{record}.#{quote_ident(pk)}" }.join(" AND ") + else + # fallback to all columns if no primary key + table.columns.map { |column| "#{quote_ident(column)} = #{record}.#{quote_ident(column)}" }.join(" AND ") + end + end end end