From dbddb6549af56209a2f1e245d39fe84f14bf6998 Mon Sep 17 00:00:00 2001 From: timeless Date: Mon, 1 Dec 2025 15:52:23 -0600 Subject: [PATCH 1/4] get all commits from our ext repo --- lib/pgslice.rb | 1 + lib/pgslice/cli.rb | 22 +++ lib/pgslice/cli/fill.rb | 96 ++++++++--- lib/pgslice/cli/synchronize.rb | 300 +++++++++++++++++++++++++++++++++ lib/pgslice/helpers.rb | 211 +++++++++++++++++++++-- lib/pgslice/table.rb | 32 +++- 6 files changed, 630 insertions(+), 32 deletions(-) create mode 100644 lib/pgslice/cli/synchronize.rb diff --git a/lib/pgslice.rb b/lib/pgslice.rb index a8bf0cf..4732176 100644 --- a/lib/pgslice.rb +++ b/lib/pgslice.rb @@ -21,3 +21,4 @@ require_relative "pgslice/cli/swap" require_relative "pgslice/cli/unprep" require_relative "pgslice/cli/unswap" +require_relative "pgslice/cli/synchronize" diff --git a/lib/pgslice/cli.rb b/lib/pgslice/cli.rb index ff22dc0..29b708a 100644 --- a/lib/pgslice/cli.rb +++ b/lib/pgslice/cli.rb @@ -26,5 +26,27 @@ def initialize(*args) def version log("pgslice #{PgSlice::VERSION}") end + + desc "enable_mirroring TABLE", "Enable mirroring triggers for live data changes during partitioning" + def enable_mirroring(table_name) + table = create_table(table_name) + intermediate_table = table.intermediate_table + + assert_table(table) + assert_table(intermediate_table) + + enable_mirroring_triggers(table) + log("Mirroring triggers enabled for #{table_name}") + end + + desc "disable_mirroring TABLE", "Disable mirroring triggers after partitioning is complete" + def disable_mirroring(table_name) + table = create_table(table_name) + + assert_table(table) + + disable_mirroring_triggers(table) + log("Mirroring triggers disabled for #{table_name}") + end end end diff --git a/lib/pgslice/cli/fill.rb b/lib/pgslice/cli/fill.rb index 64a3f53..a4a85c0 100644 --- a/lib/pgslice/cli/fill.rb +++ b/lib/pgslice/cli/fill.rb @@ -5,7 +5,7 @@ class CLI option :swapped, type: :boolean, default: false, desc: "Use swapped table" option :source_table, desc: "Source table" option :dest_table, desc: "Destination table" - option :start, type: :numeric, desc: "Primary key to start" + option :start, type: :string, desc: "Primary key to start (numeric or ULID)" option :where, desc: "Conditions to filter" option :sleep, type: :numeric, desc: "Seconds to sleep between batches" def fill(table) @@ -45,21 +45,44 @@ def fill(table) begin max_source_id = source_table.max_id(primary_key) rescue PG::UndefinedFunction - abort "Only numeric primary keys are supported" + abort "Only numeric and ULID primary keys are supported" end max_dest_id = if options[:start] - options[:start] + # Convert to appropriate type + start_val = options[:start] + numeric_id?(start_val) ? start_val.to_i : start_val elsif options[:swapped] dest_table.max_id(primary_key, where: options[:where], below: max_source_id) else dest_table.max_id(primary_key, where: options[:where]) end - if max_dest_id == 0 && !options[:swapped] + # Get the appropriate handler for the ID type + # Prefer --start option, then max_source_id, then sample from table + handler = if options[:start] + id_handler(options[:start], connection, source_table, primary_key) + elsif max_source_id + id_handler(max_source_id, connection, source_table, primary_key) + else + # Sample a row to determine ID type + sample_query = "SELECT #{quote_ident(primary_key)} FROM #{quote_table(source_table)} LIMIT 1" + log_sql sample_query + sample_result = execute(sample_query)[0] + if sample_result && sample_result[primary_key] + id_handler(sample_result[primary_key], connection, source_table, primary_key) + else + # Default to numeric if we can't determine + Helpers::NumericHandler.new + end + end + + if (max_dest_id == 0 || max_dest_id == handler.min_value) && !options[:swapped] min_source_id = source_table.min_id(primary_key, field, cast, starting_time, options[:where]) - max_dest_id = min_source_id - 1 if min_source_id + if min_source_id + max_dest_id = handler.predecessor(min_source_id) + end end starting_id = max_dest_id @@ -67,14 +90,15 @@ def fill(table) batch_size = options[:batch_size] i = 1 - batch_count = ((max_source_id - starting_id) / batch_size.to_f).ceil + batch_count = handler.batch_count(starting_id, max_source_id, batch_size) + first_batch = true if batch_count == 0 log_sql "/* nothing to fill */" end - while starting_id < max_source_id - where = "#{quote_ident(primary_key)} > #{quote(starting_id)} AND #{quote_ident(primary_key)} <= #{quote(starting_id + batch_size)}" + while handler.should_continue?(starting_id, max_source_id) + where = handler.batch_where_condition(primary_key, starting_id, batch_size, first_batch && options[:start]) if starting_time where << " AND #{quote_ident(field)} >= #{sql_date(starting_time, cast)} AND #{quote_ident(field)} < #{sql_date(ending_time, cast)}" end @@ -82,19 +106,53 @@ def fill(table) where << " AND #{options[:where]}" end - query = <<~SQL - /* #{i} of #{batch_count} */ - INSERT INTO #{quote_table(dest_table)} (#{fields}) - SELECT #{fields} FROM #{quote_table(source_table)} - WHERE #{where} - SQL - - run_query(query) - - starting_id += batch_size + batch_label = batch_count ? "#{i} of #{batch_count}" : "batch #{i}" + + if handler.is_a?(UlidHandler) + # For ULIDs, use CTE with RETURNING to get max ID inserted + query = <<~SQL + /* #{batch_label} */ + WITH inserted_batch AS ( + INSERT INTO #{quote_table(dest_table)} (#{fields}) + SELECT #{fields} FROM #{quote_table(source_table)} + WHERE #{where} + ORDER BY #{quote_ident(primary_key)} + LIMIT #{batch_size} + ON CONFLICT DO NOTHING + RETURNING #{quote_ident(primary_key)} + ) + SELECT MAX(#{quote_ident(primary_key)}) as max_inserted_id FROM inserted_batch + SQL + + log_sql query + result = execute(query) + max_inserted_id = result[0]["max_inserted_id"] + puts "starting_id: #{starting_id}" + puts "max_inserted_id: #{max_inserted_id}" + + # If no records were inserted, break the loop + if max_inserted_id.nil? + break + end + + starting_id = max_inserted_id + else + query = <<~SQL + /* #{batch_label} */ + INSERT INTO #{quote_table(dest_table)} (#{fields}) + SELECT #{fields} FROM #{quote_table(source_table)} + WHERE #{where} + ON CONFLICT DO NOTHING + SQL + + run_query(query) + starting_id = handler.next_starting_id(starting_id, batch_size) + end + i += 1 + first_batch = false - if options[:sleep] && starting_id <= max_source_id + if options[:sleep] && handler.should_continue?(starting_id, max_source_id) sleep(options[:sleep]) end end diff --git a/lib/pgslice/cli/synchronize.rb b/lib/pgslice/cli/synchronize.rb new file mode 100644 index 0000000..a99db60 --- /dev/null +++ b/lib/pgslice/cli/synchronize.rb @@ -0,0 +1,300 @@ +module PgSlice + class CLI + desc "synchronize TABLE", "Synchronize data between two tables" + option :source_table, type: :string, desc: "Source table to compare (default: TABLE)" + option :target_table, type: :string, desc: "Target table to compare (default: TABLE_intermediate)" + option :primary_key, type: :string, desc: "Primary key column name" + option :start, type: :string, desc: "Primary key value to start synchronization at" + option :window_size, type: :numeric, default: 1000, desc: "Number of rows to synchronize per batch" + option :delay, type: :numeric, default: 0, desc: "Base delay in seconds between batches (M)" + option :delay_multiplier, type: :numeric, default: 0, desc: "Delay multiplier for batch time (P)" + option :read_only, type: :boolean, default: false, desc: "Log SQL statements instead of executing them" + def synchronize(table_name) + table = create_table(table_name) + + # Determine source and target tables + source_table = options[:source_table] ? create_table(options[:source_table]) : table + target_table = options[:target_table] ? create_table(options[:target_table]) : table.intermediate_table + + # Verify both tables exist + assert_table(source_table) + assert_table(target_table) + + # Get and verify schemas match + source_schema = get_table_schema(source_table) + target_schema = get_table_schema(target_table) + verify_schemas_match(source_table, target_table, source_schema, target_schema) + + # Get primary key + primary_key = options[:primary_key] || source_table.primary_key&.first + abort "Primary key not found. Specify with --primary-key" unless primary_key + abort "Primary key '#{primary_key}' not found in source table" unless source_schema[primary_key] + + # Determine starting value + starting_id = options[:start] + unless starting_id + starting_id = get_min_id(source_table, primary_key) + abort "No rows found in source table" unless starting_id + end + + # Get parameters + window_size = options[:window_size] + base_delay = options[:delay] + delay_multiplier = options[:delay_multiplier] + read_only = options[:read_only] + + log "Synchronizing #{source_table} to #{target_table}" + log "Mode: #{read_only ? 'READ-ONLY (logging only)' : 'WRITE (executing changes)'}" + log "Primary key: #{primary_key}" + log "Starting at: #{starting_id}" + log "Window size: #{window_size}" + log "Base delay: #{base_delay}s" + log "Delay multiplier: #{delay_multiplier}" + log + + # Statistics + stats = { + total_rows: 0, + matching_rows: 0, + rows_with_differences: 0, + missing_rows: 0, + extra_rows: 0, + batches: 0 + } + + columns = source_schema.keys + + # Main synchronization loop + first_batch = true + loop do + batch_start_time = Time.now + + # Fetch batch from source + source_rows = fetch_batch(source_table, primary_key, starting_id, window_size, columns, first_batch) + break if source_rows.empty? + + stats[:batches] += 1 + first_batch = false + stats[:total_rows] += source_rows.size + + # Get primary keys and range from source batch + source_pks = source_rows.map { |row| row[primary_key] } + first_source_pk = source_rows.first[primary_key] + last_source_pk = source_rows.last[primary_key] + + # Fetch corresponding rows from target using range query to catch deletions + target_rows = fetch_rows_by_range(target_table, primary_key, first_source_pk, last_source_pk, columns) + target_rows_by_pk = target_rows.each_with_object({}) { |row, hash| hash[row[primary_key]] = row } + + # Compare and generate fix queries + fix_queries = [] + + source_rows.each do |source_row| + pk_value = source_row[primary_key] + target_row = target_rows_by_pk[pk_value] + + if target_row.nil? + # Missing row in target + stats[:missing_rows] += 1 + fix_queries << generate_insert(target_table, source_row, columns) + elsif rows_differ?(source_row, target_row, columns) + # Rows differ + stats[:rows_with_differences] += 1 + fix_queries << generate_update(target_table, primary_key, source_row, columns) + else + # Rows match + stats[:matching_rows] += 1 + end + end + + # Check for extra rows in target (rows in target but not in source batch) + # Note: This only checks within the current batch window + extra_pks = target_rows_by_pk.keys - source_pks + extra_pks.each do |pk_value| + stats[:extra_rows] += 1 + fix_queries << generate_delete(target_table, primary_key, pk_value) + end + + # Get first and last primary key for logging + first_pk = source_rows.first[primary_key] + last_pk = source_rows.last[primary_key] + pk_range = first_pk == last_pk ? "#{first_pk}" : "#{first_pk}...#{last_pk}" + + # Execute or log fix queries + if fix_queries.any? + log_with_timestamp "Batch #{stats[:batches]}: Found #{fix_queries.size} differences (keys in range #{pk_range})" + if read_only + log_sql "-- Read-only mode: logging statements (not executing)" + fix_queries.each { |query| log_sql query } + log_sql + else + # In write mode, log truncated SQL and execute without auto-logging + fix_queries.each { |query| log_sql truncate_sql_for_log(query) } + run_queries(fix_queries, silent: true) + end + else + log_with_timestamp "Batch #{stats[:batches]}: All #{source_rows.size} rows match (keys in range #{pk_range})" + end + + # Update starting_id for next batch (use > not >=) + starting_id = source_rows.last[primary_key] + + # Calculate adaptive delay: M + N*P + batch_duration = Time.now - batch_start_time + sleep_time = base_delay + (batch_duration * delay_multiplier) + if sleep_time > 0 + log_with_timestamp "Sleeping #{sleep_time.round(2)}s (#{base_delay}s base + #{batch_duration.round(2)}s batch time * #{delay_multiplier} multiplier)" + sleep(sleep_time) + end + + # Break if we processed fewer rows than window size (last batch) + break if source_rows.size < window_size + end + + # Print summary + log + log "Synchronization complete" + log "=" * 50 + log "Total batches: #{stats[:batches]}" + log "Total rows compared: #{stats[:total_rows]}" + log "Matching rows: #{stats[:matching_rows]}" + log "Rows with differences: #{stats[:rows_with_differences]}" + log "Missing rows: #{stats[:missing_rows]}" + log "Extra rows: #{stats[:extra_rows]}" + end + + private + + def log_with_timestamp(message) + timestamp = Time.now.strftime("%Y-%m-%d %H:%M:%S") + log "[#{timestamp}] #{message}" + end + + def get_table_schema(table) + query = <<~SQL + SELECT column_name, data_type, character_maximum_length, numeric_precision, numeric_scale + FROM information_schema.columns + WHERE table_schema = $1 AND table_name = $2 AND is_generated = 'NEVER' + ORDER BY ordinal_position + SQL + rows = execute(query, [table.schema, table.name]) + rows.each_with_object({}) do |row, hash| + hash[row["column_name"]] = { + data_type: row["data_type"], + character_maximum_length: row["character_maximum_length"], + numeric_precision: row["numeric_precision"], + numeric_scale: row["numeric_scale"] + } + end + end + + def verify_schemas_match(source_table, target_table, source_schema, target_schema) + source_schema.each do |col_name, col_spec| + target_spec = target_schema[col_name] + abort "Column '#{col_name}' exists in #{source_table} but not in #{target_table}" unless target_spec + + if col_spec[:data_type] != target_spec[:data_type] + abort "Column '#{col_name}' type mismatch: #{source_table} has #{col_spec[:data_type]}, #{target_table} has #{target_spec[:data_type]}" + end + end + + target_schema.each do |col_name, _| + abort "Column '#{col_name}' exists in #{target_table} but not in #{source_table}" unless source_schema[col_name] + end + end + + def get_min_id(table, primary_key) + query = "SELECT #{quote_ident(primary_key)} FROM #{quote_table(table)} ORDER BY #{quote_ident(primary_key)} LIMIT 1" + result = execute(query) + result.first&.values&.first + end + + def fetch_batch(table, primary_key, starting_id, limit, columns, first_batch = false) + column_list = columns.map { |c| quote_ident(c) }.join(", ") + # Use >= for first batch to include starting_id, > for subsequent batches + operator = first_batch ? ">=" : ">" + query = <<~SQL + SELECT #{column_list} + FROM #{quote_table(table)} + WHERE #{quote_ident(primary_key)} #{operator} #{quote(starting_id)} + ORDER BY #{quote_ident(primary_key)} + LIMIT #{limit.to_i} + SQL + execute(query) + end + + def fetch_rows_by_pks(table, primary_key, pk_values, columns) + return [] if pk_values.empty? + + column_list = columns.map { |c| quote_ident(c) }.join(", ") + # Build IN clause with proper quoting + pk_list = pk_values.map { |pk| quote(pk) }.join(", ") + query = <<~SQL + SELECT #{column_list} + FROM #{quote_table(table)} + WHERE #{quote_ident(primary_key)} IN (#{pk_list}) + SQL + execute(query) + end + + def fetch_rows_by_range(table, primary_key, first_pk, last_pk, columns) + column_list = columns.map { |c| quote_ident(c) }.join(", ") + query = <<~SQL + SELECT #{column_list} + FROM #{quote_table(table)} + WHERE #{quote_ident(primary_key)} >= #{quote(first_pk)} + AND #{quote_ident(primary_key)} <= #{quote(last_pk)} + ORDER BY #{quote_ident(primary_key)} + SQL + execute(query) + end + + def rows_differ?(source_row, target_row, columns) + columns.any? { |col| source_row[col] != target_row[col] } + end + + def generate_insert(table, row, columns) + column_list = columns.map { |c| quote_ident(c) }.join(", ") + value_list = columns.map { |c| quote(row[c]) }.join(", ") + "INSERT INTO #{quote_table(table)} (#{column_list}) VALUES (#{value_list});" + end + + def generate_update(table, primary_key, row, columns) + set_clause = columns.reject { |c| c == primary_key }.map { |c| "#{quote_ident(c)} = #{quote(row[c])}" }.join(", ") + "UPDATE #{quote_table(table)} SET #{set_clause} WHERE #{quote_ident(primary_key)} = #{quote(row[primary_key])};" + end + + def generate_delete(table, primary_key, pk_value) + "DELETE FROM #{quote_table(table)} WHERE #{quote_ident(primary_key)} = #{quote(pk_value)};" + end + + def truncate_sql_for_log(sql) + # For INSERT statements: show "INSERT INTO table... VALUES(first 20 chars...[truncated]" + if sql =~ /\A(INSERT INTO [^\s]+)\s.*?\sVALUES\s*\((.*)\);?\z/i + table_part = $1 + values_part = $2 + preview = values_part[0, 20] + return "#{table_part}... VALUES(#{preview}...[truncated]" + end + + # For UPDATE statements: show "UPDATE table... SET...[truncated]" + if sql =~ /\A(UPDATE [^\s]+)\s+SET\s+(.*?)\s+WHERE/i + table_part = $1 + set_part = $2 + preview = set_part[0, 20] + return "#{table_part}... SET #{preview}...[truncated]" + end + + # For DELETE statements: show "DELETE FROM table WHERE...[truncated]" + if sql =~ /\A(DELETE FROM [^\s]+)\s+WHERE\s+(.*);?\z/i + table_part = $1 + where_part = $2 + preview = where_part[0, 20] + return "#{table_part}... WHERE #{preview}...[truncated]" + end + + # Fallback: just show first 50 chars + sql[0, 50] + "...[truncated]" + end + end +end diff --git a/lib/pgslice/helpers.rb b/lib/pgslice/helpers.rb index ee2b1b3..cae872a 100644 --- a/lib/pgslice/helpers.rb +++ b/lib/pgslice/helpers.rb @@ -6,6 +6,9 @@ module Helpers year: "YYYY" } + # ULID epoch start corresponding to 01/01/1970 + DEFAULT_ULID = "00000H5A406P0C3DQMCQ5MV6WQ" + protected # output @@ -61,18 +64,20 @@ def execute(query, params = []) connection.exec_params(query, params).to_a end - def run_queries(queries) + def run_queries(queries, silent: false) connection.transaction do execute("SET LOCAL client_min_messages TO warning") unless options[:dry_run] - log_sql "BEGIN;" - log_sql - run_queries_without_transaction(queries) - log_sql "COMMIT;" + unless silent + log_sql "BEGIN;" + log_sql + end + run_queries_without_transaction(queries, silent: silent) + log_sql "COMMIT;" unless silent end end - def run_query(query) - log_sql query + def run_query(query, silent: false) + log_sql query unless silent unless options[:dry_run] begin execute(query) @@ -80,12 +85,12 @@ def run_query(query) abort "#{e.class.name}: #{e.message}" end end - log_sql + log_sql unless silent end - def run_queries_without_transaction(queries) + def run_queries_without_transaction(queries, silent: false) queries.each do |query| - run_query(query) + run_query(query, silent: silent) end end @@ -167,7 +172,9 @@ def quote_ident(value) end def quote(value) - if value.is_a?(Numeric) + if value.nil? + "NULL" + elsif value.is_a?(Numeric) value else connection.escape_literal(value) @@ -178,6 +185,108 @@ def quote_table(table) table.quote_table end + # ULID helper methods + def ulid?(value) + return false unless value.is_a?(String) + # Match pure ULIDs or ULIDs with prefixes + value.match?(/\A[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) || + value.match?(/.*[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) + end + + def numeric_id?(value) + value.is_a?(Numeric) || (value.is_a?(String) && value.match?(/\A\d+\z/)) + end + + def id_type(value) + return :numeric if numeric_id?(value) + return :ulid if ulid?(value) + :unknown + end + + # Factory method to get the appropriate ID handler + def id_handler(sample_id, connection = nil, table = nil, primary_key = nil) + if ulid?(sample_id) + UlidHandler.new(connection, table, primary_key) + else + NumericHandler.new + end + end + + class NumericHandler + def min_value + 1 + end + + def predecessor(id) + id - 1 + end + + def should_continue?(current_id, max_id) + current_id < max_id + end + + def batch_count(starting_id, max_id, batch_size) + ((max_id - starting_id) / batch_size.to_f).ceil + end + + def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false) + helpers = PgSlice::CLI.instance + operator = inclusive ? ">=" : ">" + "#{helpers.quote_ident(primary_key)} #{operator} #{helpers.quote(starting_id)} AND #{helpers.quote_ident(primary_key)} <= #{helpers.quote(starting_id + batch_size)}" + end + + def next_starting_id(starting_id, batch_size) + starting_id + batch_size + end + end + + class UlidHandler + def initialize(connection = nil, table = nil, primary_key = nil) + @connection = connection + @table = table + @primary_key = primary_key + end + + def min_value + PgSlice::Helpers::DEFAULT_ULID + end + + def predecessor(id) + # Use database lookup to find the actual predecessor + return PgSlice::Helpers::DEFAULT_ULID unless @connection && @table && @primary_key + + query = <<~SQL + SELECT MAX(#{PG::Connection.quote_ident(@primary_key)}) + FROM #{@table.quote_table} + WHERE #{PG::Connection.quote_ident(@primary_key)} < '#{id}' + SQL + + log_sql query + result = @connection.exec(query) + predecessor_id = result[0]["max"] + predecessor_id || PgSlice::Helpers::DEFAULT_ULID + end + + def should_continue?(current_id, max_id) + current_id < max_id + end + + def batch_count(starting_id, max_id, batch_size) + nil # Unknown for ULIDs + end + + def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false) + operator = inclusive ? ">=" : ">" + "#{PG::Connection.quote_ident(primary_key)} #{operator} '#{starting_id}'" + end + + def next_starting_id(starting_id, batch_size) + # For ULIDs, we need to get the max ID from the current batch + # This will be handled in the fill logic + nil + end + end + def quote_no_schema(table) quote_ident(table.name) end @@ -205,5 +314,85 @@ def make_stat_def(stat_def, table) stat_name = "#{table}_#{m[1].split(", ").map { |v| v.gsub(/\W/i, "") }.join("_")}_stat" stat_def.sub(/ FROM \S+/, " FROM #{quote_table(table)}").sub(/ STATISTICS .+ ON /, " STATISTICS #{quote_ident(stat_name)} ON ") + ";" end + + # mirroring triggers + + def enable_mirroring_triggers(table) + intermediate_table = table.intermediate_table + function_name = "#{table.name}_mirror_to_intermediate" + trigger_name = "#{table.name}_mirror_trigger" + + queries = [] + + # create mirror function + queries << <<~SQL + CREATE OR REPLACE FUNCTION #{quote_ident(function_name)}() + RETURNS TRIGGER AS $$ + BEGIN + IF TG_OP = 'DELETE' THEN + DELETE FROM #{quote_table(intermediate_table)} WHERE #{mirror_where_clause(table, 'OLD')}; + RETURN OLD; + ELSIF TG_OP = 'UPDATE' THEN + UPDATE #{quote_table(intermediate_table)} SET #{mirror_set_clause(table)} WHERE #{mirror_where_clause(table, 'OLD')}; + RETURN NEW; + ELSIF TG_OP = 'INSERT' THEN + INSERT INTO #{quote_table(intermediate_table)} (#{mirror_column_list(table)}) VALUES (#{mirror_new_tuple_list(table)}); + RETURN NEW; + END IF; + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + SQL + + # create trigger + queries << <<~SQL + CREATE TRIGGER #{quote_ident(trigger_name)} + AFTER INSERT OR UPDATE OR DELETE ON #{quote_table(table)} + FOR EACH ROW EXECUTE FUNCTION #{quote_ident(function_name)}(); + SQL + + run_queries(queries) + end + + def disable_mirroring_triggers(table) + function_name = "#{table.name}_mirror_to_intermediate" + trigger_name = "#{table.name}_mirror_trigger" + + queries = [] + + # drop trigger + queries << <<~SQL + DROP TRIGGER IF EXISTS #{quote_ident(trigger_name)} ON #{quote_table(table)}; + SQL + + # drop function + queries << <<~SQL + DROP FUNCTION IF EXISTS #{quote_ident(function_name)}(); + SQL + + run_queries(queries) + end + + def mirror_column_list(table) + table.columns.map { |column| quote_ident(column) }.join(", ") + end + + def mirror_new_tuple_list(table) + table.columns.map { |column| "NEW.#{quote_ident(column)}" }.join(", ") + end + + def mirror_set_clause(table) + table.columns.map { |column| "#{quote_ident(column)} = NEW.#{quote_ident(column)}" }.join(", ") + end + + def mirror_where_clause(table, record) + primary_keys = table.primary_key + if primary_keys && primary_keys.any? + primary_keys.map { |pk| "#{quote_ident(pk)} = #{record}.#{quote_ident(pk)}" }.join(" AND ") + else + # fallback to all columns if no primary key + table.columns.map { |column| "#{quote_ident(column)} = #{record}.#{quote_ident(column)}" }.join(" AND ") + end + end end end diff --git a/lib/pgslice/table.rb b/lib/pgslice/table.rb index 4e2284e..b3f7e7d 100644 --- a/lib/pgslice/table.rb +++ b/lib/pgslice/table.rb @@ -123,7 +123,11 @@ def max_id(primary_key, below: nil, where: nil) end conditions << where if where query << " WHERE #{conditions.join(" AND ")}" if conditions.any? - execute(query, params)[0]["max"].to_i + result = execute(query, params)[0]["max"] + return result if result.nil? + + # For ULIDs, return as string; for numeric, convert to int + numeric_id?(result) ? result.to_i : result end def min_id(primary_key, column, cast, starting_time, where) @@ -132,7 +136,23 @@ def min_id(primary_key, column, cast, starting_time, where) conditions << "#{quote_ident(column)} >= #{sql_date(starting_time, cast)}" if starting_time conditions << where if where query << " WHERE #{conditions.join(" AND ")}" if conditions.any? - (execute(query)[0]["min"] || 1).to_i + result = execute(query)[0]["min"] + + # Return appropriate default and type based on primary key type + if result.nil? + # Check if we're dealing with ULIDs by sampling a row + sample_query = "SELECT #{quote_ident(primary_key)} FROM #{quote_table} LIMIT 1" + sample_result = execute(sample_query)[0] + if sample_result + handler = id_handler(sample_result[primary_key]) + return handler.min_value + else + return 1 # Default numeric when no sample available + end + end + + # Return the actual result with proper type + numeric_id?(result) ? result.to_i : result end # ensure this returns partitions in the correct order @@ -224,5 +244,13 @@ def quote_ident(value) def sql_date(*args) PgSlice::CLI.instance.send(:sql_date, *args) end + + def numeric_id?(value) + PgSlice::CLI.instance.send(:numeric_id?, value) + end + + def id_handler(sample_id) + PgSlice::CLI.instance.send(:id_handler, sample_id) + end end end From a0b027efcb33f6af17f7c708fa226a1f18b8ffda Mon Sep 17 00:00:00 2001 From: timeless Date: Wed, 3 Dec 2025 10:01:24 -0600 Subject: [PATCH 2/4] Revert "get all commits from our ext repo" This reverts commit dbddb6549af56209a2f1e245d39fe84f14bf6998. --- lib/pgslice.rb | 1 - lib/pgslice/cli.rb | 22 --- lib/pgslice/cli/fill.rb | 96 +++-------- lib/pgslice/cli/synchronize.rb | 300 --------------------------------- lib/pgslice/helpers.rb | 211 ++--------------------- lib/pgslice/table.rb | 32 +--- 6 files changed, 32 insertions(+), 630 deletions(-) delete mode 100644 lib/pgslice/cli/synchronize.rb diff --git a/lib/pgslice.rb b/lib/pgslice.rb index 4732176..a8bf0cf 100644 --- a/lib/pgslice.rb +++ b/lib/pgslice.rb @@ -21,4 +21,3 @@ require_relative "pgslice/cli/swap" require_relative "pgslice/cli/unprep" require_relative "pgslice/cli/unswap" -require_relative "pgslice/cli/synchronize" diff --git a/lib/pgslice/cli.rb b/lib/pgslice/cli.rb index 29b708a..ff22dc0 100644 --- a/lib/pgslice/cli.rb +++ b/lib/pgslice/cli.rb @@ -26,27 +26,5 @@ def initialize(*args) def version log("pgslice #{PgSlice::VERSION}") end - - desc "enable_mirroring TABLE", "Enable mirroring triggers for live data changes during partitioning" - def enable_mirroring(table_name) - table = create_table(table_name) - intermediate_table = table.intermediate_table - - assert_table(table) - assert_table(intermediate_table) - - enable_mirroring_triggers(table) - log("Mirroring triggers enabled for #{table_name}") - end - - desc "disable_mirroring TABLE", "Disable mirroring triggers after partitioning is complete" - def disable_mirroring(table_name) - table = create_table(table_name) - - assert_table(table) - - disable_mirroring_triggers(table) - log("Mirroring triggers disabled for #{table_name}") - end end end diff --git a/lib/pgslice/cli/fill.rb b/lib/pgslice/cli/fill.rb index a4a85c0..64a3f53 100644 --- a/lib/pgslice/cli/fill.rb +++ b/lib/pgslice/cli/fill.rb @@ -5,7 +5,7 @@ class CLI option :swapped, type: :boolean, default: false, desc: "Use swapped table" option :source_table, desc: "Source table" option :dest_table, desc: "Destination table" - option :start, type: :string, desc: "Primary key to start (numeric or ULID)" + option :start, type: :numeric, desc: "Primary key to start" option :where, desc: "Conditions to filter" option :sleep, type: :numeric, desc: "Seconds to sleep between batches" def fill(table) @@ -45,44 +45,21 @@ def fill(table) begin max_source_id = source_table.max_id(primary_key) rescue PG::UndefinedFunction - abort "Only numeric and ULID primary keys are supported" + abort "Only numeric primary keys are supported" end max_dest_id = if options[:start] - # Convert to appropriate type - start_val = options[:start] - numeric_id?(start_val) ? start_val.to_i : start_val + options[:start] elsif options[:swapped] dest_table.max_id(primary_key, where: options[:where], below: max_source_id) else dest_table.max_id(primary_key, where: options[:where]) end - # Get the appropriate handler for the ID type - # Prefer --start option, then max_source_id, then sample from table - handler = if options[:start] - id_handler(options[:start], connection, source_table, primary_key) - elsif max_source_id - id_handler(max_source_id, connection, source_table, primary_key) - else - # Sample a row to determine ID type - sample_query = "SELECT #{quote_ident(primary_key)} FROM #{quote_table(source_table)} LIMIT 1" - log_sql sample_query - sample_result = execute(sample_query)[0] - if sample_result && sample_result[primary_key] - id_handler(sample_result[primary_key], connection, source_table, primary_key) - else - # Default to numeric if we can't determine - Helpers::NumericHandler.new - end - end - - if (max_dest_id == 0 || max_dest_id == handler.min_value) && !options[:swapped] + if max_dest_id == 0 && !options[:swapped] min_source_id = source_table.min_id(primary_key, field, cast, starting_time, options[:where]) - if min_source_id - max_dest_id = handler.predecessor(min_source_id) - end + max_dest_id = min_source_id - 1 if min_source_id end starting_id = max_dest_id @@ -90,15 +67,14 @@ def fill(table) batch_size = options[:batch_size] i = 1 - batch_count = handler.batch_count(starting_id, max_source_id, batch_size) - first_batch = true + batch_count = ((max_source_id - starting_id) / batch_size.to_f).ceil if batch_count == 0 log_sql "/* nothing to fill */" end - while handler.should_continue?(starting_id, max_source_id) - where = handler.batch_where_condition(primary_key, starting_id, batch_size, first_batch && options[:start]) + while starting_id < max_source_id + where = "#{quote_ident(primary_key)} > #{quote(starting_id)} AND #{quote_ident(primary_key)} <= #{quote(starting_id + batch_size)}" if starting_time where << " AND #{quote_ident(field)} >= #{sql_date(starting_time, cast)} AND #{quote_ident(field)} < #{sql_date(ending_time, cast)}" end @@ -106,53 +82,19 @@ def fill(table) where << " AND #{options[:where]}" end - batch_label = batch_count ? "#{i} of #{batch_count}" : "batch #{i}" - - if handler.is_a?(UlidHandler) - # For ULIDs, use CTE with RETURNING to get max ID inserted - query = <<~SQL - /* #{batch_label} */ - WITH inserted_batch AS ( - INSERT INTO #{quote_table(dest_table)} (#{fields}) - SELECT #{fields} FROM #{quote_table(source_table)} - WHERE #{where} - ORDER BY #{quote_ident(primary_key)} - LIMIT #{batch_size} - ON CONFLICT DO NOTHING - RETURNING #{quote_ident(primary_key)} - ) - SELECT MAX(#{quote_ident(primary_key)}) as max_inserted_id FROM inserted_batch - SQL - - log_sql query - result = execute(query) - max_inserted_id = result[0]["max_inserted_id"] - puts "starting_id: #{starting_id}" - puts "max_inserted_id: #{max_inserted_id}" - - # If no records were inserted, break the loop - if max_inserted_id.nil? - break - end - - starting_id = max_inserted_id - else - query = <<~SQL - /* #{batch_label} */ - INSERT INTO #{quote_table(dest_table)} (#{fields}) - SELECT #{fields} FROM #{quote_table(source_table)} - WHERE #{where} - ON CONFLICT DO NOTHING - SQL - - run_query(query) - starting_id = handler.next_starting_id(starting_id, batch_size) - end - + query = <<~SQL + /* #{i} of #{batch_count} */ + INSERT INTO #{quote_table(dest_table)} (#{fields}) + SELECT #{fields} FROM #{quote_table(source_table)} + WHERE #{where} + SQL + + run_query(query) + + starting_id += batch_size i += 1 - first_batch = false - if options[:sleep] && handler.should_continue?(starting_id, max_source_id) + if options[:sleep] && starting_id <= max_source_id sleep(options[:sleep]) end end diff --git a/lib/pgslice/cli/synchronize.rb b/lib/pgslice/cli/synchronize.rb deleted file mode 100644 index a99db60..0000000 --- a/lib/pgslice/cli/synchronize.rb +++ /dev/null @@ -1,300 +0,0 @@ -module PgSlice - class CLI - desc "synchronize TABLE", "Synchronize data between two tables" - option :source_table, type: :string, desc: "Source table to compare (default: TABLE)" - option :target_table, type: :string, desc: "Target table to compare (default: TABLE_intermediate)" - option :primary_key, type: :string, desc: "Primary key column name" - option :start, type: :string, desc: "Primary key value to start synchronization at" - option :window_size, type: :numeric, default: 1000, desc: "Number of rows to synchronize per batch" - option :delay, type: :numeric, default: 0, desc: "Base delay in seconds between batches (M)" - option :delay_multiplier, type: :numeric, default: 0, desc: "Delay multiplier for batch time (P)" - option :read_only, type: :boolean, default: false, desc: "Log SQL statements instead of executing them" - def synchronize(table_name) - table = create_table(table_name) - - # Determine source and target tables - source_table = options[:source_table] ? create_table(options[:source_table]) : table - target_table = options[:target_table] ? create_table(options[:target_table]) : table.intermediate_table - - # Verify both tables exist - assert_table(source_table) - assert_table(target_table) - - # Get and verify schemas match - source_schema = get_table_schema(source_table) - target_schema = get_table_schema(target_table) - verify_schemas_match(source_table, target_table, source_schema, target_schema) - - # Get primary key - primary_key = options[:primary_key] || source_table.primary_key&.first - abort "Primary key not found. Specify with --primary-key" unless primary_key - abort "Primary key '#{primary_key}' not found in source table" unless source_schema[primary_key] - - # Determine starting value - starting_id = options[:start] - unless starting_id - starting_id = get_min_id(source_table, primary_key) - abort "No rows found in source table" unless starting_id - end - - # Get parameters - window_size = options[:window_size] - base_delay = options[:delay] - delay_multiplier = options[:delay_multiplier] - read_only = options[:read_only] - - log "Synchronizing #{source_table} to #{target_table}" - log "Mode: #{read_only ? 'READ-ONLY (logging only)' : 'WRITE (executing changes)'}" - log "Primary key: #{primary_key}" - log "Starting at: #{starting_id}" - log "Window size: #{window_size}" - log "Base delay: #{base_delay}s" - log "Delay multiplier: #{delay_multiplier}" - log - - # Statistics - stats = { - total_rows: 0, - matching_rows: 0, - rows_with_differences: 0, - missing_rows: 0, - extra_rows: 0, - batches: 0 - } - - columns = source_schema.keys - - # Main synchronization loop - first_batch = true - loop do - batch_start_time = Time.now - - # Fetch batch from source - source_rows = fetch_batch(source_table, primary_key, starting_id, window_size, columns, first_batch) - break if source_rows.empty? - - stats[:batches] += 1 - first_batch = false - stats[:total_rows] += source_rows.size - - # Get primary keys and range from source batch - source_pks = source_rows.map { |row| row[primary_key] } - first_source_pk = source_rows.first[primary_key] - last_source_pk = source_rows.last[primary_key] - - # Fetch corresponding rows from target using range query to catch deletions - target_rows = fetch_rows_by_range(target_table, primary_key, first_source_pk, last_source_pk, columns) - target_rows_by_pk = target_rows.each_with_object({}) { |row, hash| hash[row[primary_key]] = row } - - # Compare and generate fix queries - fix_queries = [] - - source_rows.each do |source_row| - pk_value = source_row[primary_key] - target_row = target_rows_by_pk[pk_value] - - if target_row.nil? - # Missing row in target - stats[:missing_rows] += 1 - fix_queries << generate_insert(target_table, source_row, columns) - elsif rows_differ?(source_row, target_row, columns) - # Rows differ - stats[:rows_with_differences] += 1 - fix_queries << generate_update(target_table, primary_key, source_row, columns) - else - # Rows match - stats[:matching_rows] += 1 - end - end - - # Check for extra rows in target (rows in target but not in source batch) - # Note: This only checks within the current batch window - extra_pks = target_rows_by_pk.keys - source_pks - extra_pks.each do |pk_value| - stats[:extra_rows] += 1 - fix_queries << generate_delete(target_table, primary_key, pk_value) - end - - # Get first and last primary key for logging - first_pk = source_rows.first[primary_key] - last_pk = source_rows.last[primary_key] - pk_range = first_pk == last_pk ? "#{first_pk}" : "#{first_pk}...#{last_pk}" - - # Execute or log fix queries - if fix_queries.any? - log_with_timestamp "Batch #{stats[:batches]}: Found #{fix_queries.size} differences (keys in range #{pk_range})" - if read_only - log_sql "-- Read-only mode: logging statements (not executing)" - fix_queries.each { |query| log_sql query } - log_sql - else - # In write mode, log truncated SQL and execute without auto-logging - fix_queries.each { |query| log_sql truncate_sql_for_log(query) } - run_queries(fix_queries, silent: true) - end - else - log_with_timestamp "Batch #{stats[:batches]}: All #{source_rows.size} rows match (keys in range #{pk_range})" - end - - # Update starting_id for next batch (use > not >=) - starting_id = source_rows.last[primary_key] - - # Calculate adaptive delay: M + N*P - batch_duration = Time.now - batch_start_time - sleep_time = base_delay + (batch_duration * delay_multiplier) - if sleep_time > 0 - log_with_timestamp "Sleeping #{sleep_time.round(2)}s (#{base_delay}s base + #{batch_duration.round(2)}s batch time * #{delay_multiplier} multiplier)" - sleep(sleep_time) - end - - # Break if we processed fewer rows than window size (last batch) - break if source_rows.size < window_size - end - - # Print summary - log - log "Synchronization complete" - log "=" * 50 - log "Total batches: #{stats[:batches]}" - log "Total rows compared: #{stats[:total_rows]}" - log "Matching rows: #{stats[:matching_rows]}" - log "Rows with differences: #{stats[:rows_with_differences]}" - log "Missing rows: #{stats[:missing_rows]}" - log "Extra rows: #{stats[:extra_rows]}" - end - - private - - def log_with_timestamp(message) - timestamp = Time.now.strftime("%Y-%m-%d %H:%M:%S") - log "[#{timestamp}] #{message}" - end - - def get_table_schema(table) - query = <<~SQL - SELECT column_name, data_type, character_maximum_length, numeric_precision, numeric_scale - FROM information_schema.columns - WHERE table_schema = $1 AND table_name = $2 AND is_generated = 'NEVER' - ORDER BY ordinal_position - SQL - rows = execute(query, [table.schema, table.name]) - rows.each_with_object({}) do |row, hash| - hash[row["column_name"]] = { - data_type: row["data_type"], - character_maximum_length: row["character_maximum_length"], - numeric_precision: row["numeric_precision"], - numeric_scale: row["numeric_scale"] - } - end - end - - def verify_schemas_match(source_table, target_table, source_schema, target_schema) - source_schema.each do |col_name, col_spec| - target_spec = target_schema[col_name] - abort "Column '#{col_name}' exists in #{source_table} but not in #{target_table}" unless target_spec - - if col_spec[:data_type] != target_spec[:data_type] - abort "Column '#{col_name}' type mismatch: #{source_table} has #{col_spec[:data_type]}, #{target_table} has #{target_spec[:data_type]}" - end - end - - target_schema.each do |col_name, _| - abort "Column '#{col_name}' exists in #{target_table} but not in #{source_table}" unless source_schema[col_name] - end - end - - def get_min_id(table, primary_key) - query = "SELECT #{quote_ident(primary_key)} FROM #{quote_table(table)} ORDER BY #{quote_ident(primary_key)} LIMIT 1" - result = execute(query) - result.first&.values&.first - end - - def fetch_batch(table, primary_key, starting_id, limit, columns, first_batch = false) - column_list = columns.map { |c| quote_ident(c) }.join(", ") - # Use >= for first batch to include starting_id, > for subsequent batches - operator = first_batch ? ">=" : ">" - query = <<~SQL - SELECT #{column_list} - FROM #{quote_table(table)} - WHERE #{quote_ident(primary_key)} #{operator} #{quote(starting_id)} - ORDER BY #{quote_ident(primary_key)} - LIMIT #{limit.to_i} - SQL - execute(query) - end - - def fetch_rows_by_pks(table, primary_key, pk_values, columns) - return [] if pk_values.empty? - - column_list = columns.map { |c| quote_ident(c) }.join(", ") - # Build IN clause with proper quoting - pk_list = pk_values.map { |pk| quote(pk) }.join(", ") - query = <<~SQL - SELECT #{column_list} - FROM #{quote_table(table)} - WHERE #{quote_ident(primary_key)} IN (#{pk_list}) - SQL - execute(query) - end - - def fetch_rows_by_range(table, primary_key, first_pk, last_pk, columns) - column_list = columns.map { |c| quote_ident(c) }.join(", ") - query = <<~SQL - SELECT #{column_list} - FROM #{quote_table(table)} - WHERE #{quote_ident(primary_key)} >= #{quote(first_pk)} - AND #{quote_ident(primary_key)} <= #{quote(last_pk)} - ORDER BY #{quote_ident(primary_key)} - SQL - execute(query) - end - - def rows_differ?(source_row, target_row, columns) - columns.any? { |col| source_row[col] != target_row[col] } - end - - def generate_insert(table, row, columns) - column_list = columns.map { |c| quote_ident(c) }.join(", ") - value_list = columns.map { |c| quote(row[c]) }.join(", ") - "INSERT INTO #{quote_table(table)} (#{column_list}) VALUES (#{value_list});" - end - - def generate_update(table, primary_key, row, columns) - set_clause = columns.reject { |c| c == primary_key }.map { |c| "#{quote_ident(c)} = #{quote(row[c])}" }.join(", ") - "UPDATE #{quote_table(table)} SET #{set_clause} WHERE #{quote_ident(primary_key)} = #{quote(row[primary_key])};" - end - - def generate_delete(table, primary_key, pk_value) - "DELETE FROM #{quote_table(table)} WHERE #{quote_ident(primary_key)} = #{quote(pk_value)};" - end - - def truncate_sql_for_log(sql) - # For INSERT statements: show "INSERT INTO table... VALUES(first 20 chars...[truncated]" - if sql =~ /\A(INSERT INTO [^\s]+)\s.*?\sVALUES\s*\((.*)\);?\z/i - table_part = $1 - values_part = $2 - preview = values_part[0, 20] - return "#{table_part}... VALUES(#{preview}...[truncated]" - end - - # For UPDATE statements: show "UPDATE table... SET...[truncated]" - if sql =~ /\A(UPDATE [^\s]+)\s+SET\s+(.*?)\s+WHERE/i - table_part = $1 - set_part = $2 - preview = set_part[0, 20] - return "#{table_part}... SET #{preview}...[truncated]" - end - - # For DELETE statements: show "DELETE FROM table WHERE...[truncated]" - if sql =~ /\A(DELETE FROM [^\s]+)\s+WHERE\s+(.*);?\z/i - table_part = $1 - where_part = $2 - preview = where_part[0, 20] - return "#{table_part}... WHERE #{preview}...[truncated]" - end - - # Fallback: just show first 50 chars - sql[0, 50] + "...[truncated]" - end - end -end diff --git a/lib/pgslice/helpers.rb b/lib/pgslice/helpers.rb index cae872a..ee2b1b3 100644 --- a/lib/pgslice/helpers.rb +++ b/lib/pgslice/helpers.rb @@ -6,9 +6,6 @@ module Helpers year: "YYYY" } - # ULID epoch start corresponding to 01/01/1970 - DEFAULT_ULID = "00000H5A406P0C3DQMCQ5MV6WQ" - protected # output @@ -64,20 +61,18 @@ def execute(query, params = []) connection.exec_params(query, params).to_a end - def run_queries(queries, silent: false) + def run_queries(queries) connection.transaction do execute("SET LOCAL client_min_messages TO warning") unless options[:dry_run] - unless silent - log_sql "BEGIN;" - log_sql - end - run_queries_without_transaction(queries, silent: silent) - log_sql "COMMIT;" unless silent + log_sql "BEGIN;" + log_sql + run_queries_without_transaction(queries) + log_sql "COMMIT;" end end - def run_query(query, silent: false) - log_sql query unless silent + def run_query(query) + log_sql query unless options[:dry_run] begin execute(query) @@ -85,12 +80,12 @@ def run_query(query, silent: false) abort "#{e.class.name}: #{e.message}" end end - log_sql unless silent + log_sql end - def run_queries_without_transaction(queries, silent: false) + def run_queries_without_transaction(queries) queries.each do |query| - run_query(query, silent: silent) + run_query(query) end end @@ -172,9 +167,7 @@ def quote_ident(value) end def quote(value) - if value.nil? - "NULL" - elsif value.is_a?(Numeric) + if value.is_a?(Numeric) value else connection.escape_literal(value) @@ -185,108 +178,6 @@ def quote_table(table) table.quote_table end - # ULID helper methods - def ulid?(value) - return false unless value.is_a?(String) - # Match pure ULIDs or ULIDs with prefixes - value.match?(/\A[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) || - value.match?(/.*[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) - end - - def numeric_id?(value) - value.is_a?(Numeric) || (value.is_a?(String) && value.match?(/\A\d+\z/)) - end - - def id_type(value) - return :numeric if numeric_id?(value) - return :ulid if ulid?(value) - :unknown - end - - # Factory method to get the appropriate ID handler - def id_handler(sample_id, connection = nil, table = nil, primary_key = nil) - if ulid?(sample_id) - UlidHandler.new(connection, table, primary_key) - else - NumericHandler.new - end - end - - class NumericHandler - def min_value - 1 - end - - def predecessor(id) - id - 1 - end - - def should_continue?(current_id, max_id) - current_id < max_id - end - - def batch_count(starting_id, max_id, batch_size) - ((max_id - starting_id) / batch_size.to_f).ceil - end - - def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false) - helpers = PgSlice::CLI.instance - operator = inclusive ? ">=" : ">" - "#{helpers.quote_ident(primary_key)} #{operator} #{helpers.quote(starting_id)} AND #{helpers.quote_ident(primary_key)} <= #{helpers.quote(starting_id + batch_size)}" - end - - def next_starting_id(starting_id, batch_size) - starting_id + batch_size - end - end - - class UlidHandler - def initialize(connection = nil, table = nil, primary_key = nil) - @connection = connection - @table = table - @primary_key = primary_key - end - - def min_value - PgSlice::Helpers::DEFAULT_ULID - end - - def predecessor(id) - # Use database lookup to find the actual predecessor - return PgSlice::Helpers::DEFAULT_ULID unless @connection && @table && @primary_key - - query = <<~SQL - SELECT MAX(#{PG::Connection.quote_ident(@primary_key)}) - FROM #{@table.quote_table} - WHERE #{PG::Connection.quote_ident(@primary_key)} < '#{id}' - SQL - - log_sql query - result = @connection.exec(query) - predecessor_id = result[0]["max"] - predecessor_id || PgSlice::Helpers::DEFAULT_ULID - end - - def should_continue?(current_id, max_id) - current_id < max_id - end - - def batch_count(starting_id, max_id, batch_size) - nil # Unknown for ULIDs - end - - def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false) - operator = inclusive ? ">=" : ">" - "#{PG::Connection.quote_ident(primary_key)} #{operator} '#{starting_id}'" - end - - def next_starting_id(starting_id, batch_size) - # For ULIDs, we need to get the max ID from the current batch - # This will be handled in the fill logic - nil - end - end - def quote_no_schema(table) quote_ident(table.name) end @@ -314,85 +205,5 @@ def make_stat_def(stat_def, table) stat_name = "#{table}_#{m[1].split(", ").map { |v| v.gsub(/\W/i, "") }.join("_")}_stat" stat_def.sub(/ FROM \S+/, " FROM #{quote_table(table)}").sub(/ STATISTICS .+ ON /, " STATISTICS #{quote_ident(stat_name)} ON ") + ";" end - - # mirroring triggers - - def enable_mirroring_triggers(table) - intermediate_table = table.intermediate_table - function_name = "#{table.name}_mirror_to_intermediate" - trigger_name = "#{table.name}_mirror_trigger" - - queries = [] - - # create mirror function - queries << <<~SQL - CREATE OR REPLACE FUNCTION #{quote_ident(function_name)}() - RETURNS TRIGGER AS $$ - BEGIN - IF TG_OP = 'DELETE' THEN - DELETE FROM #{quote_table(intermediate_table)} WHERE #{mirror_where_clause(table, 'OLD')}; - RETURN OLD; - ELSIF TG_OP = 'UPDATE' THEN - UPDATE #{quote_table(intermediate_table)} SET #{mirror_set_clause(table)} WHERE #{mirror_where_clause(table, 'OLD')}; - RETURN NEW; - ELSIF TG_OP = 'INSERT' THEN - INSERT INTO #{quote_table(intermediate_table)} (#{mirror_column_list(table)}) VALUES (#{mirror_new_tuple_list(table)}); - RETURN NEW; - END IF; - RETURN NULL; - END; - $$ LANGUAGE plpgsql; - SQL - - # create trigger - queries << <<~SQL - CREATE TRIGGER #{quote_ident(trigger_name)} - AFTER INSERT OR UPDATE OR DELETE ON #{quote_table(table)} - FOR EACH ROW EXECUTE FUNCTION #{quote_ident(function_name)}(); - SQL - - run_queries(queries) - end - - def disable_mirroring_triggers(table) - function_name = "#{table.name}_mirror_to_intermediate" - trigger_name = "#{table.name}_mirror_trigger" - - queries = [] - - # drop trigger - queries << <<~SQL - DROP TRIGGER IF EXISTS #{quote_ident(trigger_name)} ON #{quote_table(table)}; - SQL - - # drop function - queries << <<~SQL - DROP FUNCTION IF EXISTS #{quote_ident(function_name)}(); - SQL - - run_queries(queries) - end - - def mirror_column_list(table) - table.columns.map { |column| quote_ident(column) }.join(", ") - end - - def mirror_new_tuple_list(table) - table.columns.map { |column| "NEW.#{quote_ident(column)}" }.join(", ") - end - - def mirror_set_clause(table) - table.columns.map { |column| "#{quote_ident(column)} = NEW.#{quote_ident(column)}" }.join(", ") - end - - def mirror_where_clause(table, record) - primary_keys = table.primary_key - if primary_keys && primary_keys.any? - primary_keys.map { |pk| "#{quote_ident(pk)} = #{record}.#{quote_ident(pk)}" }.join(" AND ") - else - # fallback to all columns if no primary key - table.columns.map { |column| "#{quote_ident(column)} = #{record}.#{quote_ident(column)}" }.join(" AND ") - end - end end end diff --git a/lib/pgslice/table.rb b/lib/pgslice/table.rb index b3f7e7d..4e2284e 100644 --- a/lib/pgslice/table.rb +++ b/lib/pgslice/table.rb @@ -123,11 +123,7 @@ def max_id(primary_key, below: nil, where: nil) end conditions << where if where query << " WHERE #{conditions.join(" AND ")}" if conditions.any? - result = execute(query, params)[0]["max"] - return result if result.nil? - - # For ULIDs, return as string; for numeric, convert to int - numeric_id?(result) ? result.to_i : result + execute(query, params)[0]["max"].to_i end def min_id(primary_key, column, cast, starting_time, where) @@ -136,23 +132,7 @@ def min_id(primary_key, column, cast, starting_time, where) conditions << "#{quote_ident(column)} >= #{sql_date(starting_time, cast)}" if starting_time conditions << where if where query << " WHERE #{conditions.join(" AND ")}" if conditions.any? - result = execute(query)[0]["min"] - - # Return appropriate default and type based on primary key type - if result.nil? - # Check if we're dealing with ULIDs by sampling a row - sample_query = "SELECT #{quote_ident(primary_key)} FROM #{quote_table} LIMIT 1" - sample_result = execute(sample_query)[0] - if sample_result - handler = id_handler(sample_result[primary_key]) - return handler.min_value - else - return 1 # Default numeric when no sample available - end - end - - # Return the actual result with proper type - numeric_id?(result) ? result.to_i : result + (execute(query)[0]["min"] || 1).to_i end # ensure this returns partitions in the correct order @@ -244,13 +224,5 @@ def quote_ident(value) def sql_date(*args) PgSlice::CLI.instance.send(:sql_date, *args) end - - def numeric_id?(value) - PgSlice::CLI.instance.send(:numeric_id?, value) - end - - def id_handler(sample_id) - PgSlice::CLI.instance.send(:id_handler, sample_id) - end end end From 0194f40d9c94decf296b879ae42447ea206d6c64 Mon Sep 17 00:00:00 2001 From: timeless Date: Wed, 3 Dec 2025 10:22:09 -0600 Subject: [PATCH 3/4] sync command --- lib/pgslice.rb | 1 + lib/pgslice/cli/synchronize.rb | 299 +++++++++++++++++++++++++++++++++ lib/pgslice/helpers.rb | 211 +++++++++++++++++++++-- 3 files changed, 500 insertions(+), 11 deletions(-) create mode 100644 lib/pgslice/cli/synchronize.rb diff --git a/lib/pgslice.rb b/lib/pgslice.rb index a8bf0cf..4732176 100644 --- a/lib/pgslice.rb +++ b/lib/pgslice.rb @@ -21,3 +21,4 @@ require_relative "pgslice/cli/swap" require_relative "pgslice/cli/unprep" require_relative "pgslice/cli/unswap" +require_relative "pgslice/cli/synchronize" diff --git a/lib/pgslice/cli/synchronize.rb b/lib/pgslice/cli/synchronize.rb new file mode 100644 index 0000000..d650ef9 --- /dev/null +++ b/lib/pgslice/cli/synchronize.rb @@ -0,0 +1,299 @@ +module PgSlice + class CLI + desc "synchronize TABLE", "Synchronize data between two tables" + option :source_table, type: :string, desc: "Source table to compare (default: TABLE)" + option :target_table, type: :string, desc: "Target table to compare (default: TABLE_intermediate)" + option :primary_key, type: :string, desc: "Primary key column name" + option :start, type: :string, desc: "Primary key value to start synchronization at" + option :window_size, type: :numeric, default: 1000, desc: "Number of rows to synchronize per batch" + option :delay, type: :numeric, default: 0, desc: "Base delay in seconds between batches (M)" + option :delay_multiplier, type: :numeric, default: 0, desc: "Delay multiplier for batch time (P)" + def synchronize(table_name) + table = create_table(table_name) + + # Determine source and target tables + source_table = options[:source_table] ? create_table(options[:source_table]) : table + target_table = options[:target_table] ? create_table(options[:target_table]) : table.intermediate_table + + # Verify both tables exist + assert_table(source_table) + assert_table(target_table) + + # Get and verify schemas match + source_schema = get_table_schema(source_table) + target_schema = get_table_schema(target_table) + verify_schemas_match(source_table, target_table, source_schema, target_schema) + + # Get primary key + primary_key = options[:primary_key] || source_table.primary_key&.first + abort "Primary key not found. Specify with --primary-key" unless primary_key + abort "Primary key '#{primary_key}' not found in source table" unless source_schema[primary_key] + + # Determine starting value + starting_id = options[:start] + unless starting_id + starting_id = get_min_id(source_table, primary_key) + abort "No rows found in source table" unless starting_id + end + + # Get parameters + window_size = options[:window_size] + base_delay = options[:delay] + delay_multiplier = options[:delay_multiplier] + dry_run = options[:dry_run] + + log "Synchronizing #{source_table} to #{target_table}" + log "Mode: #{dry_run ? 'DRY RUN (logging only)' : 'WRITE (executing changes)'}" + log "Primary key: #{primary_key}" + log "Starting at: #{starting_id}" + log "Window size: #{window_size}" + log "Base delay: #{base_delay}s" + log "Delay multiplier: #{delay_multiplier}" + log + + # Statistics + stats = { + total_rows: 0, + matching_rows: 0, + rows_with_differences: 0, + missing_rows: 0, + extra_rows: 0, + batches: 0 + } + + columns = source_schema.keys + + # Main synchronization loop + first_batch = true + loop do + batch_start_time = Time.now + + # Fetch batch from source + source_rows = fetch_batch(source_table, primary_key, starting_id, window_size, columns, first_batch) + break if source_rows.empty? + + stats[:batches] += 1 + first_batch = false + stats[:total_rows] += source_rows.size + + # Get primary keys and range from source batch + source_pks = source_rows.map { |row| row[primary_key] } + first_source_pk = source_rows.first[primary_key] + last_source_pk = source_rows.last[primary_key] + + # Fetch corresponding rows from target using range query to catch deletions + target_rows = fetch_rows_by_range(target_table, primary_key, first_source_pk, last_source_pk, columns) + target_rows_by_pk = target_rows.each_with_object({}) { |row, hash| hash[row[primary_key]] = row } + + # Compare and generate fix queries + fix_queries = [] + + source_rows.each do |source_row| + pk_value = source_row[primary_key] + target_row = target_rows_by_pk[pk_value] + + if target_row.nil? + # Missing row in target + stats[:missing_rows] += 1 + fix_queries << generate_insert(target_table, source_row, columns) + elsif rows_differ?(source_row, target_row, columns) + # Rows differ + stats[:rows_with_differences] += 1 + fix_queries << generate_update(target_table, primary_key, source_row, columns) + else + # Rows match + stats[:matching_rows] += 1 + end + end + + # Check for extra rows in target (rows in target but not in source batch) + # Note: This only checks within the current batch window + extra_pks = target_rows_by_pk.keys - source_pks + extra_pks.each do |pk_value| + stats[:extra_rows] += 1 + fix_queries << generate_delete(target_table, primary_key, pk_value) + end + + # Get first and last primary key for logging + first_pk = source_rows.first[primary_key] + last_pk = source_rows.last[primary_key] + pk_range = first_pk == last_pk ? "#{first_pk}" : "#{first_pk}...#{last_pk}" + + # Execute or log fix queries + if fix_queries.any? + log_with_timestamp "Batch #{stats[:batches]}: Found #{fix_queries.size} differences (keys in range #{pk_range})" + if dry_run + log_sql "-- Dry run mode: logging statements (not executing)" + fix_queries.each { |query| log_sql query } + log_sql + else + # In write mode, log truncated SQL and execute without auto-logging + fix_queries.each { |query| log_sql truncate_sql_for_log(query) } + run_queries(fix_queries, silent: true) + end + else + log_with_timestamp "Batch #{stats[:batches]}: All #{source_rows.size} rows match (keys in range #{pk_range})" + end + + # Update starting_id for next batch (use > not >=) + starting_id = source_rows.last[primary_key] + + # Calculate adaptive delay: M + N*P + batch_duration = Time.now - batch_start_time + sleep_time = base_delay + (batch_duration * delay_multiplier) + if sleep_time > 0 + log_with_timestamp "Sleeping #{sleep_time.round(2)}s (#{base_delay}s base + #{batch_duration.round(2)}s batch time * #{delay_multiplier} multiplier)" + sleep(sleep_time) + end + + # Break if we processed fewer rows than window size (last batch) + break if source_rows.size < window_size + end + + # Print summary + log + log "Synchronization complete" + log "=" * 50 + log "Total batches: #{stats[:batches]}" + log "Total rows compared: #{stats[:total_rows]}" + log "Matching rows: #{stats[:matching_rows]}" + log "Rows with differences: #{stats[:rows_with_differences]}" + log "Missing rows: #{stats[:missing_rows]}" + log "Extra rows: #{stats[:extra_rows]}" + end + + private + + def log_with_timestamp(message) + timestamp = Time.now.strftime("%Y-%m-%d %H:%M:%S") + log "[#{timestamp}] #{message}" + end + + def get_table_schema(table) + query = <<~SQL + SELECT column_name, data_type, character_maximum_length, numeric_precision, numeric_scale + FROM information_schema.columns + WHERE table_schema = $1 AND table_name = $2 AND is_generated = 'NEVER' + ORDER BY ordinal_position + SQL + rows = execute(query, [table.schema, table.name]) + rows.each_with_object({}) do |row, hash| + hash[row["column_name"]] = { + data_type: row["data_type"], + character_maximum_length: row["character_maximum_length"], + numeric_precision: row["numeric_precision"], + numeric_scale: row["numeric_scale"] + } + end + end + + def verify_schemas_match(source_table, target_table, source_schema, target_schema) + source_schema.each do |col_name, col_spec| + target_spec = target_schema[col_name] + abort "Column '#{col_name}' exists in #{source_table} but not in #{target_table}" unless target_spec + + if col_spec[:data_type] != target_spec[:data_type] + abort "Column '#{col_name}' type mismatch: #{source_table} has #{col_spec[:data_type]}, #{target_table} has #{target_spec[:data_type]}" + end + end + + target_schema.each do |col_name, _| + abort "Column '#{col_name}' exists in #{target_table} but not in #{source_table}" unless source_schema[col_name] + end + end + + def get_min_id(table, primary_key) + query = "SELECT #{quote_ident(primary_key)} FROM #{quote_table(table)} ORDER BY #{quote_ident(primary_key)} LIMIT 1" + result = execute(query) + result.first&.values&.first + end + + def fetch_batch(table, primary_key, starting_id, limit, columns, first_batch = false) + column_list = columns.map { |c| quote_ident(c) }.join(", ") + # Use >= for first batch to include starting_id, > for subsequent batches + operator = first_batch ? ">=" : ">" + query = <<~SQL + SELECT #{column_list} + FROM #{quote_table(table)} + WHERE #{quote_ident(primary_key)} #{operator} #{quote(starting_id)} + ORDER BY #{quote_ident(primary_key)} + LIMIT #{limit.to_i} + SQL + execute(query) + end + + def fetch_rows_by_pks(table, primary_key, pk_values, columns) + return [] if pk_values.empty? + + column_list = columns.map { |c| quote_ident(c) }.join(", ") + # Build IN clause with proper quoting + pk_list = pk_values.map { |pk| quote(pk) }.join(", ") + query = <<~SQL + SELECT #{column_list} + FROM #{quote_table(table)} + WHERE #{quote_ident(primary_key)} IN (#{pk_list}) + SQL + execute(query) + end + + def fetch_rows_by_range(table, primary_key, first_pk, last_pk, columns) + column_list = columns.map { |c| quote_ident(c) }.join(", ") + query = <<~SQL + SELECT #{column_list} + FROM #{quote_table(table)} + WHERE #{quote_ident(primary_key)} >= #{quote(first_pk)} + AND #{quote_ident(primary_key)} <= #{quote(last_pk)} + ORDER BY #{quote_ident(primary_key)} + SQL + execute(query) + end + + def rows_differ?(source_row, target_row, columns) + columns.any? { |col| source_row[col] != target_row[col] } + end + + def generate_insert(table, row, columns) + column_list = columns.map { |c| quote_ident(c) }.join(", ") + value_list = columns.map { |c| quote(row[c]) }.join(", ") + "INSERT INTO #{quote_table(table)} (#{column_list}) VALUES (#{value_list});" + end + + def generate_update(table, primary_key, row, columns) + set_clause = columns.reject { |c| c == primary_key }.map { |c| "#{quote_ident(c)} = #{quote(row[c])}" }.join(", ") + "UPDATE #{quote_table(table)} SET #{set_clause} WHERE #{quote_ident(primary_key)} = #{quote(row[primary_key])};" + end + + def generate_delete(table, primary_key, pk_value) + "DELETE FROM #{quote_table(table)} WHERE #{quote_ident(primary_key)} = #{quote(pk_value)};" + end + + def truncate_sql_for_log(sql) + # For INSERT statements: show "INSERT INTO table... VALUES(first 20 chars...[truncated]" + if sql =~ /\A(INSERT INTO [^\s]+)\s.*?\sVALUES\s*\((.*)\);?\z/i + table_part = $1 + values_part = $2 + preview = values_part[0, 20] + return "#{table_part}... VALUES(#{preview}...[truncated]" + end + + # For UPDATE statements: show "UPDATE table... SET...[truncated]" + if sql =~ /\A(UPDATE [^\s]+)\s+SET\s+(.*?)\s+WHERE/i + table_part = $1 + set_part = $2 + preview = set_part[0, 20] + return "#{table_part}... SET #{preview}...[truncated]" + end + + # For DELETE statements: show "DELETE FROM table WHERE...[truncated]" + if sql =~ /\A(DELETE FROM [^\s]+)\s+WHERE\s+(.*);?\z/i + table_part = $1 + where_part = $2 + preview = where_part[0, 20] + return "#{table_part}... WHERE #{preview}...[truncated]" + end + + # Fallback: just show first 50 chars + sql[0, 50] + "...[truncated]" + end + end +end diff --git a/lib/pgslice/helpers.rb b/lib/pgslice/helpers.rb index ee2b1b3..0c90bdd 100644 --- a/lib/pgslice/helpers.rb +++ b/lib/pgslice/helpers.rb @@ -6,6 +6,9 @@ module Helpers year: "YYYY" } + # ULID epoch start corresponding to 01/01/1970 + DEFAULT_ULID = "00000H5A406P0C3DQMCQ5MV6WQ" + protected # output @@ -61,18 +64,20 @@ def execute(query, params = []) connection.exec_params(query, params).to_a end - def run_queries(queries) + def run_queries(queries, silent: false) connection.transaction do execute("SET LOCAL client_min_messages TO warning") unless options[:dry_run] - log_sql "BEGIN;" - log_sql - run_queries_without_transaction(queries) - log_sql "COMMIT;" + unless silent + log_sql "BEGIN;" + log_sql + end + run_queries_without_transaction(queries, silent: silent) + log_sql "COMMIT;" unless silent end end - def run_query(query) - log_sql query + def run_query(query, silent: false) + log_sql query unless silent unless options[:dry_run] begin execute(query) @@ -80,12 +85,12 @@ def run_query(query) abort "#{e.class.name}: #{e.message}" end end - log_sql + log_sql unless silent end - def run_queries_without_transaction(queries) + def run_queries_without_transaction(queries, silent: false) queries.each do |query| - run_query(query) + run_query(query, silent: silent) end end @@ -167,7 +172,9 @@ def quote_ident(value) end def quote(value) - if value.is_a?(Numeric) + if value.nil? + "NULL" + elsif value.is_a?(Numeric) value else connection.escape_literal(value) @@ -178,6 +185,108 @@ def quote_table(table) table.quote_table end + # ULID helper methods + def ulid?(value) + return false unless value.is_a?(String) + # Match pure ULIDs or ULIDs with prefixes + value.match?(/\A[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) || + value.match?(/.*[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}\z/) + end + + def numeric_id?(value) + value.is_a?(Numeric) || (value.is_a?(String) && value.match?(/\A\d+\z/)) + end + + def id_type(value) + return :numeric if numeric_id?(value) + return :ulid if ulid?(value) + :unknown + end + + # Factory method to get the appropriate ID handler + def id_handler(sample_id, connection = nil, table = nil, primary_key = nil) + if ulid?(sample_id) + UlidHandler.new(connection, table, primary_key) + else + NumericHandler.new + end + end + + class NumericHandler + def min_value + 1 + end + + def predecessor(id) + id - 1 + end + + def should_continue?(current_id, max_id) + current_id < max_id + end + + def batch_count(starting_id, max_id, batch_size) + ((max_id - starting_id) / batch_size.to_f).ceil + end + + def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false) + helpers = PgSlice::CLI.instance + operator = inclusive ? ">=" : ">" + "#{helpers.quote_ident(primary_key)} #{operator} #{helpers.quote(starting_id)} AND #{helpers.quote_ident(primary_key)} <= #{helpers.quote(starting_id + batch_size)}" + end + + def next_starting_id(starting_id, batch_size) + starting_id + batch_size + end + end + + class UlidHandler + def initialize(connection = nil, table = nil, primary_key = nil) + @connection = connection + @table = table + @primary_key = primary_key + end + + def min_value + PgSlice::Helpers::DEFAULT_ULID + end + + def predecessor(id) + # Use database lookup to find the actual predecessor + return PgSlice::Helpers::DEFAULT_ULID unless @connection && @table && @primary_key + + query = <<~SQL + SELECT MAX(#{PG::Connection.quote_ident(@primary_key)}) + FROM #{@table.quote_table} + WHERE #{PG::Connection.quote_ident(@primary_key)} < '#{id}' + SQL + + log_sql query + result = @connection.exec(query) + predecessor_id = result[0]["max"] + predecessor_id || PgSlice::Helpers::DEFAULT_ULID + end + + def should_continue?(current_id, max_id) + current_id < max_id + end + + def batch_count(starting_id, max_id, batch_size) + nil # Unknown for ULIDs + end + + def batch_where_condition(primary_key, starting_id, batch_size, inclusive = false) + operator = inclusive ? ">=" : ">" + "#{PG::Connection.quote_ident(primary_key)} #{operator} '#{starting_id}'" + end + + def next_starting_id(starting_id, batch_size) + # For ULIDs, we need to get the max ID from the current batch + # This will be handled in the fill logic + nil + end + end + def quote_no_schema(table) quote_ident(table.name) end @@ -205,5 +314,85 @@ def make_stat_def(stat_def, table) stat_name = "#{table}_#{m[1].split(", ").map { |v| v.gsub(/\W/i, "") }.join("_")}_stat" stat_def.sub(/ FROM \S+/, " FROM #{quote_table(table)}").sub(/ STATISTICS .+ ON /, " STATISTICS #{quote_ident(stat_name)} ON ") + ";" end + + # mirroring triggers + + def enable_mirroring_triggers(table) + intermediate_table = table.intermediate_table + function_name = "#{table.name}_mirror_to_intermediate" + trigger_name = "#{table.name}_mirror_trigger" + + queries = [] + + # create mirror function + queries << <<~SQL + CREATE OR REPLACE FUNCTION #{quote_ident(function_name)}() + RETURNS TRIGGER AS $$ + BEGIN + IF TG_OP = 'DELETE' THEN + DELETE FROM #{quote_table(intermediate_table)} WHERE #{mirror_where_clause(table, 'OLD')}; + RETURN OLD; + ELSIF TG_OP = 'UPDATE' THEN + UPDATE #{quote_table(intermediate_table)} SET #{mirror_set_clause(table)} WHERE #{mirror_where_clause(table, 'OLD')}; + RETURN NEW; + ELSIF TG_OP = 'INSERT' THEN + INSERT INTO #{quote_table(intermediate_table)} (#{mirror_column_list(table)}) VALUES (#{mirror_new_tuple_list(table)}); + RETURN NEW; + END IF; + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + SQL + + # create trigger + queries << <<~SQL + CREATE TRIGGER #{quote_ident(trigger_name)} + AFTER INSERT OR UPDATE OR DELETE ON #{quote_table(table)} + FOR EACH ROW EXECUTE FUNCTION #{quote_ident(function_name)}(); + SQL + + run_queries(queries) + end + + def disable_mirroring_triggers(table) + function_name = "#{table.name}_mirror_to_intermediate" + trigger_name = "#{table.name}_mirror_trigger" + + queries = [] + + # drop trigger + queries << <<~SQL + DROP TRIGGER IF EXISTS #{quote_ident(trigger_name)} ON #{quote_table(table)}; + SQL + + # drop function + queries << <<~SQL + DROP FUNCTION IF EXISTS #{quote_ident(function_name)}(); + SQL + + run_queries(queries) + end + + def mirror_column_list(table) + table.columns.map { |column| quote_ident(column) }.join(", ") + end + + def mirror_new_tuple_list(table) + table.columns.map { |column| "NEW.#{quote_ident(column)}" }.join(", ") + end + + def mirror_set_clause(table) + table.columns.map { |column| "#{quote_ident(column)} = NEW.#{quote_ident(column)}" }.join(", ") + end + + def mirror_where_clause(table, record) + primary_keys = table.primary_key + if primary_keys && primary_keys.any? + primary_keys.map { |pk| "#{quote_ident(pk)} = #{record}.#{quote_ident(pk)}" }.join(" AND ") + else + # fallback to all columns if no primary key + table.columns.map { |column| "#{quote_ident(column)} = #{record}.#{quote_ident(column)}" }.join(" AND ") + end + end end end From ade09c5eb3155c673dc5bba5ce7f10a9117840fa Mon Sep 17 00:00:00 2001 From: timeless Date: Wed, 3 Dec 2025 10:42:46 -0600 Subject: [PATCH 4/4] README update too --- README.md | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 5ab4c06..ce5a51d 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,17 @@ You can also install it with [Homebrew](#homebrew) or [Docker](#docker). pgslice analyze ``` -7. Swap the intermediate table with the original table +7. Sync/Validate the tables + +This will ensure the two tables are definitely in sync. It should be a no-op, but will generate +INSERT, UPDATE, and DELETE statements if discrepencies are discovered. On a production system, +ensure you understand the `--window-size`, `--delay`, and `--delay-multiplier options`. + +```sh +pgslice synchronize
[options] +``` + +8. Swap the intermediate table with the original table ```sh pgslice swap
@@ -70,13 +80,15 @@ You can also install it with [Homebrew](#homebrew) or [Docker](#docker). The original table is renamed `
_retired` and the intermediate table is renamed `
`. -8. Fill the rest (rows inserted between the first fill and the swap) +9. Fill the rest (rows inserted between the first fill and the swap) + +This step should not be needed if you did the pgslice synchronize in step 7. ```sh pgslice fill
--swapped ``` -9. Back up the retired table with a tool like [pg_dump](https://www.postgresql.org/docs/current/static/app-pgdump.html) and drop it +10. Back up the retired table with a tool like [pg_dump](https://www.postgresql.org/docs/current/static/app-pgdump.html) and drop it ```sql pg_dump -c -Fc -t
_retired $PGSLICE_URL >
_retired.dump