From 912448f67aa8ab4cdbe0187e613749ba7f1bc136 Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Fri, 7 Nov 2025 09:58:12 -0700 Subject: [PATCH 01/11] delta validation sqs --- compare_delta_folders.py | 317 +++++++++++++++++++++++++++++++++++++++ compare_deltas.sh | 122 +++++++++++++++ 2 files changed, 439 insertions(+) create mode 100755 compare_delta_folders.py create mode 100755 compare_deltas.sh diff --git a/compare_delta_folders.py b/compare_delta_folders.py new file mode 100755 index 0000000..1d2e6e7 --- /dev/null +++ b/compare_delta_folders.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python3 +""" +Compare opt-out records between regular delta and SQS delta folders for a given date. + +This script downloads all delta files from both folders and verifies that all opt-out +records in the regular delta folder are present in the SQS delta folder. + +Delta file format: Each entry is 72 bytes (32-byte hash + 32-byte ID + 8-byte timestamp) + +Usage: + python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 + python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 \\ + --regular-prefix optout-v2/delta --sqs-prefix sqs-delta/delta +""" + +import argparse +import struct +import sys +import traceback +from datetime import datetime +from typing import List, Set, Tuple + +try: + import boto3 + from botocore.exceptions import ClientError +except ImportError: + print("Error: boto3 not installed. Run: pip install boto3") + sys.exit(1) + + +class OptOutRecord: + """Represents a single opt-out record (hash + id + timestamp)""" + + ENTRY_SIZE = 72 # 32 (identity_hash) + 32 (advertising_id) + 8 (timestamp) + + def __init__(self, identity_hash: bytes, advertising_id: bytes, timestamp: int): + self.identity_hash = identity_hash + self.advertising_id = advertising_id + self.timestamp = timestamp + + def is_sentinel(self) -> bool: + """Check if this is a sentinel entry (start or end)""" + return (self.identity_hash == b'\x00' * 32 or + self.identity_hash == b'\xff' * 32) + + def __hash__(self): + """Return hash for set/dict operations""" + return hash((self.identity_hash, self.advertising_id, self.timestamp)) + + def __eq__(self, other): + """Compare two OptOutRecord instances for equality""" + if not isinstance(other, OptOutRecord): + return False + return (self.identity_hash == other.identity_hash and + self.advertising_id == other.advertising_id and + self.timestamp == other.timestamp) + + def __repr__(self): + """Return string representation of the opt-out record""" + hash_hex = self.identity_hash.hex()[:16] + id_hex = self.advertising_id.hex()[:16] + try: + dt = datetime.fromtimestamp(self.timestamp) + dt_str = dt.strftime('%Y-%m-%d %H:%M:%S') + except (ValueError, OSError, OverflowError): + dt_str = "INVALID_TS" + return f"OptOutRecord(hash={hash_hex}..., id={id_hex}..., ts={self.timestamp} [{dt_str}])" + + +def parse_records_from_file(data: bytes) -> List[OptOutRecord]: + """Parse opt-out records from a delta file, skipping sentinels""" + records = [] + offset = 0 + entry_size = OptOutRecord.ENTRY_SIZE # 72 bytes: 32 + 32 + 8 + + while offset + entry_size <= len(data): + identity_hash = data[offset:offset + 32] # 32 bytes + advertising_id = data[offset + 32:offset + 64] # 32 bytes + timestamp = struct.unpack(' bytes: + """Download file from S3""" + try: + s3 = boto3.client('s3') + response = s3.get_object(Bucket=bucket, Key=key) + return response['Body'].read() + except ClientError as error: + print(f"Error downloading s3://{bucket}/{key}: {error}") + raise + + +def list_files_in_folder(bucket: str, prefix: str) -> List[str]: + """List all .dat files in an S3 folder""" + try: + s3 = boto3.client('s3') + files = [] + paginator = s3.get_paginator('list_objects_v2') + + for page in paginator.paginate(Bucket=bucket, Prefix=prefix): + if 'Contents' not in page: + continue + for obj in page['Contents']: + if obj['Key'].endswith('.dat'): + files.append(obj['Key']) + + return sorted(files) + except ClientError as error: + print(f"Error listing files in s3://{bucket}/{prefix}: {error}") + raise + + +def load_records_from_folder( + bucket: str, prefix: str, date_folder: str +) -> Tuple[Set[OptOutRecord], dict]: + """Load all opt-out records from all files in a folder""" + full_prefix = f"{prefix}{date_folder}/" + + print(f"\nšŸ“‚ Loading files from s3://{bucket}/{full_prefix}") + files = list_files_in_folder(bucket, full_prefix) + + if not files: + print(" āš ļø No .dat files found") + return set(), {} + + print(f" Found {len(files)} delta files") + + all_records = set() + file_stats = {} + + for i, file_key in enumerate(files, 1): + filename = file_key.split('/')[-1] + print(f" [{i}/{len(files)}] Downloading {filename}...", end='', flush=True) + + try: + data = download_from_s3(bucket, file_key) + records = parse_records_from_file(data) + + all_records.update(records) + total_entries_in_file = len(data) // OptOutRecord.ENTRY_SIZE + file_stats[filename] = { + 'size': len(data), + 'entries': len(records), + 'total_entries': total_entries_in_file, # Includes sentinels + 'file_key': file_key + } + + print(f" {len(records)} records") + except (ClientError, struct.error, ValueError) as error: + print(f" ERROR: {error}") + continue + + return all_records, file_stats + + +def analyze_differences(regular_records: Set[OptOutRecord], + sqs_records: Set[OptOutRecord], + show_samples: int = 10) -> bool: + """Analyze and report differences between record sets""" + + print("\nšŸ“Š Analysis Results") + print(f" Regular delta records: {len(regular_records):,}") + print(f" SQS delta records: {len(sqs_records):,}") + + # Records in regular but not in SQS (MISSING from SQS) + missing_in_sqs = regular_records - sqs_records + + # Records in SQS but not in regular (EXTRA in SQS) + extra_in_sqs = sqs_records - regular_records + + # Common records + common = regular_records & sqs_records + + print(f" Common records: {len(common):,}") + print(f" Missing from SQS: {len(missing_in_sqs):,}") + print(f" Extra in SQS: {len(extra_in_sqs):,}") + + all_good = True + + if missing_in_sqs: + print(f"\nāŒ MISSING: {len(missing_in_sqs)} records in regular delta are NOT in SQS delta") + print(f" Sample of missing records (first {min(show_samples, len(missing_in_sqs))}):") + for i, record in enumerate(list(missing_in_sqs)[:show_samples], 1): + print(f" {i}. {record}") + if len(missing_in_sqs) > show_samples: + print(f" ... and {len(missing_in_sqs) - show_samples} more") + all_good = False + else: + print("\nāœ… All records from regular delta are present in SQS delta") + + if extra_in_sqs: + print(f"\nāš ļø EXTRA: {len(extra_in_sqs)} records in SQS delta are NOT in regular delta") + print(" (This might be okay if SQS captured additional opt-outs)") + print(f" Sample of extra records (first {min(show_samples, len(extra_in_sqs))}):") + for i, record in enumerate(list(extra_in_sqs)[:show_samples], 1): + print(f" {i}. {record}") + if len(extra_in_sqs) > show_samples: + print(f" ... and {len(extra_in_sqs) - show_samples} more") + + return all_good + + +def print_file_stats(regular_stats: dict, sqs_stats: dict) -> None: + """Print file statistics for both folders""" + print("\nšŸ“ˆ File Statistics") + + print(f"\n Regular Delta Files: {len(regular_stats)}") + if regular_stats: + total_size = sum(s['size'] for s in regular_stats.values()) + total_entries = sum(s['entries'] for s in regular_stats.values()) + print(f" Total size: {total_size:,} bytes") + print(f" Total entries: {total_entries:,}") + print(f" Avg entries/file: {total_entries / len(regular_stats):.1f}") + + print(f"\n SQS Delta Files: {len(sqs_stats)}") + if sqs_stats: + total_size = sum(s['size'] for s in sqs_stats.values()) + total_entries = sum(s['entries'] for s in sqs_stats.values()) + print(f" Total size: {total_size:,} bytes") + print(f" Total entries: {total_entries:,}") + print(f" Avg entries/file: {total_entries / len(sqs_stats):.1f}") + + +def main() -> None: + """Main entry point for comparing opt-out delta folders.""" + parser = argparse.ArgumentParser( + description='Compare opt-out records between regular and SQS delta folders', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Compare folders for a specific date + python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 + + # Use custom prefixes + python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 \\ + --regular-prefix optout-v2/delta --sqs-prefix sqs-delta/delta + """ + ) + + parser.add_argument('--bucket', required=True, + help='S3 bucket name') + parser.add_argument('--date', required=True, + help='Date folder to compare (e.g., 2025-11-07)') + parser.add_argument('--regular-prefix', default='optout/delta/', + help='S3 prefix for regular delta files (default: optout/delta/)') + parser.add_argument('--sqs-prefix', default='sqs-delta/delta/', + help='S3 prefix for SQS delta files (default: sqs-delta/delta/)') + parser.add_argument('--show-samples', type=int, default=10, + help='Number of sample records to show for differences (default: 10)') + + args = parser.parse_args() + + print("=" * 80) + print(f"šŸ” Comparing Opt-Out Delta Files for {args.date}") + print("=" * 80) + print(f"Bucket: {args.bucket}") + print(f"Regular prefix: {args.regular_prefix}") + print(f"SQS prefix: {args.sqs_prefix}") + + try: + # Load all records from both folders + regular_records, regular_stats = load_records_from_folder( + args.bucket, args.regular_prefix, args.date + ) + + sqs_records, sqs_stats = load_records_from_folder( + args.bucket, args.sqs_prefix, args.date + ) + + if not regular_records and not sqs_records: + print("\nāŒ No records found in either folder") + sys.exit(1) + + if not regular_records: + print("\nāš ļø No records in regular delta folder") + + if not sqs_records: + print("\nāš ļø No records in SQS delta folder") + + # Print file statistics + print_file_stats(regular_stats, sqs_stats) + + # Analyze differences + all_good = analyze_differences(regular_records, sqs_records, args.show_samples) + + print("\n" + "=" * 80) + if all_good: + print("āœ… SUCCESS: All regular delta records are present in SQS delta") + print("=" * 80) + sys.exit(0) + else: + print("āŒ FAILURE: Some regular delta records are missing from SQS delta") + print("=" * 80) + sys.exit(1) + + except (ClientError, ValueError, OSError) as error: + print(f"\nāŒ Error: {error}") + traceback.print_exc() + sys.exit(1) + except Exception as error: # pylint: disable=broad-except + print(f"\nāŒ Unexpected error: {error}") + traceback.print_exc() + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/compare_deltas.sh b/compare_deltas.sh new file mode 100755 index 0000000..e7a0515 --- /dev/null +++ b/compare_deltas.sh @@ -0,0 +1,122 @@ +#!/bin/bash +# Compare opt-out records between regular and SQS delta folders for a given date + +set -e + +BUCKET="${OPTOUT_S3_BUCKET:-}" +REGULAR_PREFIX="${REGULAR_PREFIX:-optout/delta/}" +SQS_PREFIX="${SQS_PREFIX:-sqs-delta/delta/}" + +show_usage() { + echo "Usage: $0 [options]" + echo " OR: $0 --date [options]" + echo "" + echo "Compare opt-out records between regular and SQS delta folders for a specific date." + echo "" + echo "Arguments:" + echo " Date folder to compare (e.g., 2025-11-07)" + echo "" + echo "Options:" + echo " --date Date folder (alternative to positional arg)" + echo " --bucket S3 bucket name (or set OPTOUT_S3_BUCKET env var)" + echo " --regular-prefix Regular delta prefix (default: optout/delta/)" + echo " --sqs-prefix SQS delta prefix (default: sqs-delta/delta/)" + echo " --show-samples Number of sample differences to show (default: 10)" + echo "" + echo "Examples:" + echo " # Positional date with env variable" + echo " export OPTOUT_S3_BUCKET=my-bucket" + echo " $0 2025-11-07" + echo "" + echo " # Using --date flag" + echo " $0 --date 2025-11-07 --bucket my-bucket" + echo "" + echo " # Mixed style" + echo " $0 --bucket my-bucket 2025-11-07" + echo "" + echo " # Custom prefixes" + echo " $0 --date 2025-11-07 --bucket my-bucket --regular-prefix optout-v2/delta --sqs-prefix sqs-delta/delta" +} + +# Parse arguments +DATE="" +EXTRA_ARGS=() + +while [[ $# -gt 0 ]]; do + case $1 in + -h|--help) + show_usage + exit 0 + ;; + --bucket) + BUCKET="$2" + shift 2 + ;; + --date) + DATE="$2" + shift 2 + ;; + --regular-prefix) + REGULAR_PREFIX="$2" + EXTRA_ARGS+=("--regular-prefix" "$2") + shift 2 + ;; + --sqs-prefix) + SQS_PREFIX="$2" + EXTRA_ARGS+=("--sqs-prefix" "$2") + shift 2 + ;; + --show-samples) + EXTRA_ARGS+=("--show-samples" "$2") + shift 2 + ;; + -*) + echo "Error: Unknown option: $1" + show_usage + exit 1 + ;; + *) + if [ -z "$DATE" ]; then + DATE="$1" + else + echo "Error: Unknown argument: $1" + show_usage + exit 1 + fi + shift + ;; + esac +done + +# Strip trailing slash from date if present +DATE="${DATE%/}" + +if [ -z "$DATE" ]; then + echo "Error: Date argument is required" + echo "" + show_usage + exit 1 +fi + +if [ -z "$BUCKET" ]; then + echo "Error: S3 bucket not specified" + echo "Set OPTOUT_S3_BUCKET environment variable or use --bucket option" + echo "" + show_usage + exit 1 +fi + +# Check if Python script exists +if [ ! -f "compare_delta_folders.py" ]; then + echo "Error: compare_delta_folders.py not found in current directory" + exit 1 +fi + +# Run the comparison +python3 compare_delta_folders.py \ + --bucket "$BUCKET" \ + --date "$DATE" \ + --regular-prefix "$REGULAR_PREFIX" \ + --sqs-prefix "$SQS_PREFIX" \ + "${EXTRA_ARGS[@]}" + From f13b447875150b1077e24a206b44694058cfaf19 Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Sat, 8 Nov 2025 13:50:53 -0700 Subject: [PATCH 02/11] fix timestamp validation, add --quiet flag, add all env validation --- compare_delta_folders.py | 92 ++++++++++++++++++++++-------- compare_deltas.sh | 57 ++++++++++-------- compare_deltas_all_environments.sh | 50 ++++++++++++++++ 3 files changed, 151 insertions(+), 48 deletions(-) create mode 100755 compare_deltas_all_environments.sh diff --git a/compare_delta_folders.py b/compare_delta_folders.py index 1d2e6e7..5c01f67 100755 --- a/compare_delta_folders.py +++ b/compare_delta_folders.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -Compare opt-out records between regular delta and SQS delta folders for a given date. +Compare opt-out records between regular delta and SQS delta folders for given date(s). This script downloads all delta files from both folders and verifies that all opt-out records in the regular delta folder are present in the SQS delta folder. @@ -9,6 +9,7 @@ Usage: python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 + python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 --date 2025-11-08 python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 \\ --regular-prefix optout-v2/delta --sqs-prefix sqs-delta/delta """ @@ -29,9 +30,9 @@ class OptOutRecord: - """Represents a single opt-out record (hash + id + timestamp)""" + """Represents a single opt-out record (hash + id + timestamp + metadata)""" - ENTRY_SIZE = 72 # 32 (identity_hash) + 32 (advertising_id) + 8 (timestamp) + ENTRY_SIZE = 72 # 32 (identity_hash) + 32 (advertising_id) + 7 (timestamp) + 1 (metadata) def __init__(self, identity_hash: bytes, advertising_id: bytes, timestamp: int): self.identity_hash = identity_hash @@ -44,16 +45,15 @@ def is_sentinel(self) -> bool: self.identity_hash == b'\xff' * 32) def __hash__(self): - """Return hash for set/dict operations""" - return hash((self.identity_hash, self.advertising_id, self.timestamp)) + """Return hash for set/dict operations (only hash+id, not timestamp)""" + return hash((self.identity_hash, self.advertising_id)) def __eq__(self, other): - """Compare two OptOutRecord instances for equality""" + """Compare two OptOutRecord instances for equality (only hash+id, not timestamp)""" if not isinstance(other, OptOutRecord): return False return (self.identity_hash == other.identity_hash and - self.advertising_id == other.advertising_id and - self.timestamp == other.timestamp) + self.advertising_id == other.advertising_id) def __repr__(self): """Return string representation of the opt-out record""" @@ -68,22 +68,36 @@ def __repr__(self): def parse_records_from_file(data: bytes) -> List[OptOutRecord]: - """Parse opt-out records from a delta file, skipping sentinels""" + """Parse opt-out records from a delta file, skipping sentinels and invalid records""" records = [] offset = 0 entry_size = OptOutRecord.ENTRY_SIZE # 72 bytes: 32 + 32 + 8 + # Valid timestamp range: Jan 1, 2020 to Jan 1, 2100 + MIN_VALID_TIMESTAMP = 1577836800 # 2020-01-01 + MAX_VALID_TIMESTAMP = 4102444800 # 2100-01-01 + while offset + entry_size <= len(data): identity_hash = data[offset:offset + 32] # 32 bytes advertising_id = data[offset + 32:offset + 64] # 32 bytes - timestamp = struct.unpack(' MAX_VALID_TIMESTAMP: + print(f"\n āš ļø Skipping record with invalid timestamp: {timestamp}") + offset += entry_size + continue + records.append(record) offset += entry_size return records @@ -121,7 +135,7 @@ def list_files_in_folder(bucket: str, prefix: str) -> List[str]: def load_records_from_folder( - bucket: str, prefix: str, date_folder: str + bucket: str, prefix: str, date_folder: str, quiet: bool = False ) -> Tuple[Set[OptOutRecord], dict]: """Load all opt-out records from all files in a folder""" full_prefix = f"{prefix}{date_folder}/" @@ -140,7 +154,8 @@ def load_records_from_folder( for i, file_key in enumerate(files, 1): filename = file_key.split('/')[-1] - print(f" [{i}/{len(files)}] Downloading {filename}...", end='', flush=True) + if not quiet: + print(f" [{i}/{len(files)}] Downloading {filename}...", end='', flush=True) try: data = download_from_s3(bucket, file_key) @@ -155,7 +170,8 @@ def load_records_from_folder( 'file_key': file_key } - print(f" {len(records)} records") + if not quiet: + print(f" {len(records)} records") except (ClientError, struct.error, ValueError) as error: print(f" ERROR: {error}") continue @@ -163,6 +179,23 @@ def load_records_from_folder( return all_records, file_stats +def load_records_from_multiple_folders( + bucket: str, prefix: str, date_folders: List[str], quiet: bool = False +) -> Tuple[Set[OptOutRecord], dict]: + """Load and aggregate records from multiple date folders""" + all_records = set() + all_stats = {} + + print(f"\nšŸ“… Loading records from {len(date_folders)} date folder(s)") + + for date_folder in date_folders: + records, stats = load_records_from_folder(bucket, prefix, date_folder, quiet) + all_records.update(records) + all_stats.update(stats) + + return all_records, all_stats + + def analyze_differences(regular_records: Set[OptOutRecord], sqs_records: Set[OptOutRecord], show_samples: int = 10) -> bool: @@ -241,6 +274,9 @@ def main() -> None: # Compare folders for a specific date python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 + # Compare across multiple dates (handles rollover) + python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 --date 2025-11-08 + # Use custom prefixes python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 \\ --regular-prefix optout-v2/delta --sqs-prefix sqs-delta/delta @@ -249,32 +285,40 @@ def main() -> None: parser.add_argument('--bucket', required=True, help='S3 bucket name') - parser.add_argument('--date', required=True, - help='Date folder to compare (e.g., 2025-11-07)') + parser.add_argument('--date', required=True, action='append', dest='dates', + help='Date folder to compare (e.g., 2025-11-07). Can be specified multiple times.') parser.add_argument('--regular-prefix', default='optout/delta/', help='S3 prefix for regular delta files (default: optout/delta/)') parser.add_argument('--sqs-prefix', default='sqs-delta/delta/', help='S3 prefix for SQS delta files (default: sqs-delta/delta/)') parser.add_argument('--show-samples', type=int, default=10, help='Number of sample records to show for differences (default: 10)') + parser.add_argument('--quiet', '-q', action='store_true', + help='Suppress download progress output') args = parser.parse_args() + # Display dates being compared + date_display = ', '.join(args.dates) if len(args.dates) > 1 else args.dates[0] + print("=" * 80) - print(f"šŸ” Comparing Opt-Out Delta Files for {args.date}") + print(f"šŸ” Comparing Opt-Out Delta Files for {date_display}") print("=" * 80) print(f"Bucket: {args.bucket}") print(f"Regular prefix: {args.regular_prefix}") print(f"SQS prefix: {args.sqs_prefix}") + print(f"Date folders: {len(args.dates)}") + for date_folder in args.dates: + print(f" - {date_folder}") try: - # Load all records from both folders - regular_records, regular_stats = load_records_from_folder( - args.bucket, args.regular_prefix, args.date + # Load all records from both folders (aggregating across multiple dates) + regular_records, regular_stats = load_records_from_multiple_folders( + args.bucket, args.regular_prefix, args.dates, args.quiet ) - sqs_records, sqs_stats = load_records_from_folder( - args.bucket, args.sqs_prefix, args.date + sqs_records, sqs_stats = load_records_from_multiple_folders( + args.bucket, args.sqs_prefix, args.dates, args.quiet ) if not regular_records and not sqs_records: diff --git a/compare_deltas.sh b/compare_deltas.sh index e7a0515..3045920 100755 --- a/compare_deltas.sh +++ b/compare_deltas.sh @@ -8,38 +8,40 @@ REGULAR_PREFIX="${REGULAR_PREFIX:-optout/delta/}" SQS_PREFIX="${SQS_PREFIX:-sqs-delta/delta/}" show_usage() { - echo "Usage: $0 [options]" - echo " OR: $0 --date [options]" + echo "Usage: $0 [date2] [date3] ... [options]" + echo " OR: $0 --date [--date ] ... [options]" echo "" - echo "Compare opt-out records between regular and SQS delta folders for a specific date." + echo "Compare opt-out records between regular and SQS delta folders for specific date(s)." + echo "Multiple dates can be specified to handle records that roll over midnight." echo "" echo "Arguments:" - echo " Date folder to compare (e.g., 2025-11-07)" + echo " Date folder(s) to compare (e.g., 2025-11-07)" echo "" echo "Options:" - echo " --date Date folder (alternative to positional arg)" + echo " --date Date folder (can be specified multiple times)" echo " --bucket S3 bucket name (or set OPTOUT_S3_BUCKET env var)" echo " --regular-prefix Regular delta prefix (default: optout/delta/)" echo " --sqs-prefix SQS delta prefix (default: sqs-delta/delta/)" echo " --show-samples Number of sample differences to show (default: 10)" + echo " --quiet, -q Suppress download progress output" echo "" echo "Examples:" - echo " # Positional date with env variable" + echo " # Single date with env variable" echo " export OPTOUT_S3_BUCKET=my-bucket" echo " $0 2025-11-07" echo "" - echo " # Using --date flag" - echo " $0 --date 2025-11-07 --bucket my-bucket" + echo " # Multiple dates to handle rollover (recommended)" + echo " $0 --date 2025-11-07 --date 2025-11-08 --bucket my-bucket" echo "" - echo " # Mixed style" - echo " $0 --bucket my-bucket 2025-11-07" + echo " # Positional dates" + echo " $0 2025-11-07 2025-11-08 --bucket my-bucket" echo "" echo " # Custom prefixes" echo " $0 --date 2025-11-07 --bucket my-bucket --regular-prefix optout-v2/delta --sqs-prefix sqs-delta/delta" } # Parse arguments -DATE="" +DATES=() EXTRA_ARGS=() while [[ $# -gt 0 ]]; do @@ -53,7 +55,7 @@ while [[ $# -gt 0 ]]; do shift 2 ;; --date) - DATE="$2" + DATES+=("$2") shift 2 ;; --regular-prefix) @@ -70,29 +72,30 @@ while [[ $# -gt 0 ]]; do EXTRA_ARGS+=("--show-samples" "$2") shift 2 ;; + --quiet|-q) + EXTRA_ARGS+=("--quiet") + shift + ;; -*) echo "Error: Unknown option: $1" show_usage exit 1 ;; *) - if [ -z "$DATE" ]; then - DATE="$1" - else - echo "Error: Unknown argument: $1" - show_usage - exit 1 - fi + # Positional date argument + DATES+=("$1") shift ;; esac done -# Strip trailing slash from date if present -DATE="${DATE%/}" +# Strip trailing slashes from dates if present +for i in "${!DATES[@]}"; do + DATES[$i]="${DATES[$i]%/}" +done -if [ -z "$DATE" ]; then - echo "Error: Date argument is required" +if [ ${#DATES[@]} -eq 0 ]; then + echo "Error: At least one date argument is required" echo "" show_usage exit 1 @@ -112,10 +115,16 @@ if [ ! -f "compare_delta_folders.py" ]; then exit 1 fi +# Build date arguments for Python script +DATE_ARGS=() +for date in "${DATES[@]}"; do + DATE_ARGS+=("--date" "$date") +done + # Run the comparison python3 compare_delta_folders.py \ --bucket "$BUCKET" \ - --date "$DATE" \ + "${DATE_ARGS[@]}" \ --regular-prefix "$REGULAR_PREFIX" \ --sqs-prefix "$SQS_PREFIX" \ "${EXTRA_ARGS[@]}" diff --git a/compare_deltas_all_environments.sh b/compare_deltas_all_environments.sh new file mode 100755 index 0000000..bafd178 --- /dev/null +++ b/compare_deltas_all_environments.sh @@ -0,0 +1,50 @@ +#!/bin/bash +# Compare deltas across all environments +# Requires aws-sso to be installed and configured + +set -e + +# Get date arguments (default to yesterday and today if not provided) +if [ $# -eq 0 ]; then + DATES="--date $(date -v-1d +%Y-%m-%d) --date $(date +%Y-%m-%d)" +else + DATES="" + for date in "$@"; do + DATES="$DATES --date $date" + done +fi + +echo "================================" +echo "Comparing Deltas - UID2 TEST" +echo "================================" +aws-sso exec --account 072245134533 --role scrum-uid2-full-access -- \ + bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=uid2-optout-test-store && ./compare_deltas.sh $DATES --regular-prefix optout/delta/ --sqs-prefix sqs-delta/delta/ --quiet" + +echo "================================" +echo "Comparing Deltas - EUID INTEG" +echo "================================" +aws-sso exec --account 101244608629 --role scrum-uid2-elevated -- \ + bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=euid-optout-integ-store && ./compare_deltas.sh $DATES --regular-prefix optout/delta/ --sqs-prefix sqs-delta/delta/ --quiet" + +echo "================================" +echo "Comparing Deltas - UID2 INTEG" +echo "================================" +aws-sso exec --account 150073873184 --role scrum-uid2-elevated -- \ + bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=uid2-optout-integ-store && ./compare_deltas.sh $DATES --regular-prefix uid2-optout-integ/delta/ --sqs-prefix sqs-delta/delta/ --quiet" + +echo "================================" +echo "Comparing Deltas - EUID PROD" +echo "================================" +aws-sso exec --account 248068286741 --role scrum-uid2-elevated -- \ + bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=euid-optout && ./compare_deltas.sh $DATES --regular-prefix optout/delta/ --sqs-prefix sqs-delta/delta/ --quiet" + +echo "================================" +echo "Comparing Deltas - UID2 PROD" +echo "================================" +aws-sso exec --account 475720075663 --role scrum-uid2-elevated -- \ + bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=uid2-optout && ./compare_deltas.sh $DATES --regular-prefix optout-v2/delta/ --sqs-prefix sqs-delta/delta/ --quiet" + +echo "" +echo "================================" +echo "All environments compared!" +echo "================================" From 88ed3390a009ea3970f255037cf00299cbf58518 Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Sat, 8 Nov 2025 16:13:04 -0700 Subject: [PATCH 03/11] add efficiency calculation --- compare_deltas_all_environments.sh | 85 ++++++++++++++++++++++++++---- 1 file changed, 75 insertions(+), 10 deletions(-) diff --git a/compare_deltas_all_environments.sh b/compare_deltas_all_environments.sh index bafd178..6425b07 100755 --- a/compare_deltas_all_environments.sh +++ b/compare_deltas_all_environments.sh @@ -4,6 +4,12 @@ set -e +# Initialize aggregate counters +TOTAL_REGULAR_FILES=0 +TOTAL_REGULAR_ENTRIES=0 +TOTAL_SQS_FILES=0 +TOTAL_SQS_ENTRIES=0 + # Get date arguments (default to yesterday and today if not provided) if [ $# -eq 0 ]; then DATES="--date $(date -v-1d +%Y-%m-%d) --date $(date +%Y-%m-%d)" @@ -14,35 +20,94 @@ else done fi +# Function to extract and sum statistics +extract_stats() { + local output="$1" + local env_name="$2" + + # Extract file counts + local regular_files=$(echo "$output" | grep "Regular Delta Files:" | awk '{print $4}') + local sqs_files=$(echo "$output" | grep "SQS Delta Files:" | awk '{print $4}') + + # Extract entry counts (from "Total entries:" line) + local regular_entries=$(echo "$output" | grep -A3 "Regular Delta Files:" | grep "Total entries:" | awk '{print $3}') + local sqs_entries=$(echo "$output" | grep -A3 "SQS Delta Files:" | grep "Total entries:" | awk '{print $3}') + + if [ -n "$regular_files" ] && [ -n "$sqs_files" ]; then + echo " $env_name: Regular $regular_files files/$regular_entries entries, SQS $sqs_files files/$sqs_entries entries" + TOTAL_REGULAR_FILES=$((TOTAL_REGULAR_FILES + regular_files)) + TOTAL_REGULAR_ENTRIES=$((TOTAL_REGULAR_ENTRIES + regular_entries)) + TOTAL_SQS_FILES=$((TOTAL_SQS_FILES + sqs_files)) + TOTAL_SQS_ENTRIES=$((TOTAL_SQS_ENTRIES + sqs_entries)) + fi +} + echo "================================" echo "Comparing Deltas - UID2 TEST" echo "================================" -aws-sso exec --account 072245134533 --role scrum-uid2-full-access -- \ - bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=uid2-optout-test-store && ./compare_deltas.sh $DATES --regular-prefix optout/delta/ --sqs-prefix sqs-delta/delta/ --quiet" +OUTPUT=$(aws-sso exec --account 072245134533 --role scrum-uid2-full-access -- \ + bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=uid2-optout-test-store && ./compare_deltas.sh $DATES --regular-prefix optout/delta/ --sqs-prefix sqs-delta/delta/ --quiet" 2>&1) +echo "$OUTPUT" +extract_stats "$OUTPUT" "UID2-TEST" echo "================================" echo "Comparing Deltas - EUID INTEG" echo "================================" -aws-sso exec --account 101244608629 --role scrum-uid2-elevated -- \ - bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=euid-optout-integ-store && ./compare_deltas.sh $DATES --regular-prefix optout/delta/ --sqs-prefix sqs-delta/delta/ --quiet" +OUTPUT=$(aws-sso exec --account 101244608629 --role scrum-uid2-elevated -- \ + bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=euid-optout-integ-store && ./compare_deltas.sh $DATES --regular-prefix optout/delta/ --sqs-prefix sqs-delta/delta/ --quiet" 2>&1) +echo "$OUTPUT" +extract_stats "$OUTPUT" "EUID-INTEG" echo "================================" echo "Comparing Deltas - UID2 INTEG" echo "================================" -aws-sso exec --account 150073873184 --role scrum-uid2-elevated -- \ - bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=uid2-optout-integ-store && ./compare_deltas.sh $DATES --regular-prefix uid2-optout-integ/delta/ --sqs-prefix sqs-delta/delta/ --quiet" +OUTPUT=$(aws-sso exec --account 150073873184 --role scrum-uid2-elevated -- \ + bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=uid2-optout-integ-store && ./compare_deltas.sh $DATES --regular-prefix uid2-optout-integ/delta/ --sqs-prefix sqs-delta/delta/ --quiet" 2>&1) +echo "$OUTPUT" +extract_stats "$OUTPUT" "UID2-INTEG" echo "================================" echo "Comparing Deltas - EUID PROD" echo "================================" -aws-sso exec --account 248068286741 --role scrum-uid2-elevated -- \ - bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=euid-optout && ./compare_deltas.sh $DATES --regular-prefix optout/delta/ --sqs-prefix sqs-delta/delta/ --quiet" +OUTPUT=$(aws-sso exec --account 248068286741 --role scrum-uid2-elevated -- \ + bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=euid-optout && ./compare_deltas.sh $DATES --regular-prefix optout/delta/ --sqs-prefix sqs-delta/delta/ --quiet" 2>&1) +echo "$OUTPUT" +extract_stats "$OUTPUT" "EUID-PROD" echo "================================" echo "Comparing Deltas - UID2 PROD" echo "================================" -aws-sso exec --account 475720075663 --role scrum-uid2-elevated -- \ - bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=uid2-optout && ./compare_deltas.sh $DATES --regular-prefix optout-v2/delta/ --sqs-prefix sqs-delta/delta/ --quiet" +OUTPUT=$(aws-sso exec --account 475720075663 --role scrum-uid2-elevated -- \ + bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=uid2-optout && ./compare_deltas.sh $DATES --regular-prefix optout-v2/delta/ --sqs-prefix sqs-delta/delta/ --quiet" 2>&1) +echo "$OUTPUT" +extract_stats "$OUTPUT" "UID2-PROD" + +echo "" +echo "================================================================================" +echo "šŸ“Š AGGREGATE EFFICIENCY SUMMARY" +echo "================================================================================" +echo "" +echo "Environment Breakdown:" + +echo "" +echo "Total Across All Environments:" +echo " Regular Delta: $TOTAL_REGULAR_FILES files, $TOTAL_REGULAR_ENTRIES entries" +echo " SQS Delta: $TOTAL_SQS_FILES files, $TOTAL_SQS_ENTRIES entries" +echo "" + +# Calculate efficiency multipliers +if [ $TOTAL_SQS_FILES -gt 0 ] && [ $TOTAL_SQS_ENTRIES -gt 0 ]; then + FILE_EFFICIENCY=$(awk "BEGIN {printf \"%.2f\", $TOTAL_REGULAR_FILES / $TOTAL_SQS_FILES}") + ENTRY_EFFICIENCY=$(awk "BEGIN {printf \"%.2f\", $TOTAL_REGULAR_ENTRIES / $TOTAL_SQS_ENTRIES}") + FILE_REDUCTION=$(awk "BEGIN {printf \"%.1f\", (($TOTAL_REGULAR_FILES - $TOTAL_SQS_FILES) * 100.0) / $TOTAL_REGULAR_FILES}") + ENTRY_REDUCTION=$(awk "BEGIN {printf \"%.1f\", (($TOTAL_REGULAR_ENTRIES - $TOTAL_SQS_ENTRIES) * 100.0) / $TOTAL_REGULAR_ENTRIES}") + + echo "SQS Efficiency Gains:" + echo " šŸ“ Files: ${FILE_EFFICIENCY}x fewer files (${FILE_REDUCTION}% reduction)" + echo " šŸ“ Entries: ${ENTRY_EFFICIENCY}x fewer entries (${ENTRY_REDUCTION}% reduction)" +else + echo "āš ļø Unable to calculate efficiency (no SQS data)" +fi echo "" echo "================================" From 1c259d67e21476d931d2072416fd10417c4db489 Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Tue, 18 Nov 2025 16:50:34 -0700 Subject: [PATCH 04/11] bug fix --- compare_delta_folders.py | 8 ++++++-- compare_deltas_all_environments.sh | 13 +++++++++---- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/compare_delta_folders.py b/compare_delta_folders.py index 5c01f67..25ca95b 100755 --- a/compare_delta_folders.py +++ b/compare_delta_folders.py @@ -322,8 +322,12 @@ def main() -> None: ) if not regular_records and not sqs_records: - print("\nāŒ No records found in either folder") - sys.exit(1) + print("\nāš ļø No records found in either folder (environment may be empty)") + print_file_stats(regular_stats, sqs_stats) + print("\n" + "=" * 80) + print("āœ… SUCCESS: No data to compare (empty environment)") + print("=" * 80) + sys.exit(0) # Empty environment is NOT an error! if not regular_records: print("\nāš ļø No records in regular delta folder") diff --git a/compare_deltas_all_environments.sh b/compare_deltas_all_environments.sh index 6425b07..3a1bfce 100755 --- a/compare_deltas_all_environments.sh +++ b/compare_deltas_all_environments.sh @@ -35,10 +35,15 @@ extract_stats() { if [ -n "$regular_files" ] && [ -n "$sqs_files" ]; then echo " $env_name: Regular $regular_files files/$regular_entries entries, SQS $sqs_files files/$sqs_entries entries" - TOTAL_REGULAR_FILES=$((TOTAL_REGULAR_FILES + regular_files)) - TOTAL_REGULAR_ENTRIES=$((TOTAL_REGULAR_ENTRIES + regular_entries)) - TOTAL_SQS_FILES=$((TOTAL_SQS_FILES + sqs_files)) - TOTAL_SQS_ENTRIES=$((TOTAL_SQS_ENTRIES + sqs_entries)) + # Remove commas from numbers before arithmetic + regular_files_clean=${regular_files//,/} + regular_entries_clean=${regular_entries//,/} + sqs_files_clean=${sqs_files//,/} + sqs_entries_clean=${sqs_entries//,/} + TOTAL_REGULAR_FILES=$((TOTAL_REGULAR_FILES + regular_files_clean)) + TOTAL_REGULAR_ENTRIES=$((TOTAL_REGULAR_ENTRIES + regular_entries_clean)) + TOTAL_SQS_FILES=$((TOTAL_SQS_FILES + sqs_files_clean)) + TOTAL_SQS_ENTRIES=$((TOTAL_SQS_ENTRIES + sqs_entries_clean)) fi } From 65261a935a9043d0c8306e1ef8e25e5c1aafd52a Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Sat, 13 Dec 2025 15:25:40 -0700 Subject: [PATCH 05/11] reset pom version --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index a874e1d..56e81eb 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.uid2 uid2-optout - 4.4.0 + 4.5.0 uid2-optout https://github.com/IABTechLab/uid2-optout From 147eebb84e05f4b02977f8d2f717b5f50d6fcd75 Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Mon, 15 Dec 2025 21:00:59 -0700 Subject: [PATCH 06/11] script refactor --- compare_deltas_all_environments.sh | 153 +++++++++++++++-------------- 1 file changed, 77 insertions(+), 76 deletions(-) diff --git a/compare_deltas_all_environments.sh b/compare_deltas_all_environments.sh index 3a1bfce..9a9abfc 100755 --- a/compare_deltas_all_environments.sh +++ b/compare_deltas_all_environments.sh @@ -1,24 +1,48 @@ #!/bin/bash # Compare deltas across all environments # Requires aws-sso to be installed and configured +# +# Usage: +# ./compare_deltas_all_environments.sh [--env ENV] [DATE...] +# +# Examples: +# ./compare_deltas_all_environments.sh # All envs, yesterday+today +# ./compare_deltas_all_environments.sh 2025-12-15 # All envs, specific date +# ./compare_deltas_all_environments.sh --env uid2-test # Single env, yesterday+today +# ./compare_deltas_all_environments.sh --env uid2-prod 2025-12-15 # Single env, specific date +# +# Available environments: uid2-test, euid-integ, uid2-integ, euid-prod, uid2-prod set -e +# Parse arguments +ENV_FILTER="" +DATES="" + +while [[ $# -gt 0 ]]; do + case $1 in + --env|-e) + ENV_FILTER=$(echo "$2" | tr '[:upper:]' '[:lower:]') + shift 2 + ;; + *) + DATES="$DATES --date $1" + shift + ;; + esac +done + +# Default to yesterday and today if no dates provided +if [ -z "$DATES" ]; then + DATES="--date $(date -v-1d +%Y-%m-%d) --date $(date +%Y-%m-%d)" +fi + # Initialize aggregate counters TOTAL_REGULAR_FILES=0 TOTAL_REGULAR_ENTRIES=0 TOTAL_SQS_FILES=0 TOTAL_SQS_ENTRIES=0 - -# Get date arguments (default to yesterday and today if not provided) -if [ $# -eq 0 ]; then - DATES="--date $(date -v-1d +%Y-%m-%d) --date $(date +%Y-%m-%d)" -else - DATES="" - for date in "$@"; do - DATES="$DATES --date $date" - done -fi +ENVS_RUN=0 # Function to extract and sum statistics extract_stats() { @@ -47,74 +71,51 @@ extract_stats() { fi } -echo "================================" -echo "Comparing Deltas - UID2 TEST" -echo "================================" -OUTPUT=$(aws-sso exec --account 072245134533 --role scrum-uid2-full-access -- \ - bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=uid2-optout-test-store && ./compare_deltas.sh $DATES --regular-prefix optout/delta/ --sqs-prefix sqs-delta/delta/ --quiet" 2>&1) -echo "$OUTPUT" -extract_stats "$OUTPUT" "UID2-TEST" - -echo "================================" -echo "Comparing Deltas - EUID INTEG" -echo "================================" -OUTPUT=$(aws-sso exec --account 101244608629 --role scrum-uid2-elevated -- \ - bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=euid-optout-integ-store && ./compare_deltas.sh $DATES --regular-prefix optout/delta/ --sqs-prefix sqs-delta/delta/ --quiet" 2>&1) -echo "$OUTPUT" -extract_stats "$OUTPUT" "EUID-INTEG" - -echo "================================" -echo "Comparing Deltas - UID2 INTEG" -echo "================================" -OUTPUT=$(aws-sso exec --account 150073873184 --role scrum-uid2-elevated -- \ - bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=uid2-optout-integ-store && ./compare_deltas.sh $DATES --regular-prefix uid2-optout-integ/delta/ --sqs-prefix sqs-delta/delta/ --quiet" 2>&1) -echo "$OUTPUT" -extract_stats "$OUTPUT" "UID2-INTEG" - -echo "================================" -echo "Comparing Deltas - EUID PROD" -echo "================================" -OUTPUT=$(aws-sso exec --account 248068286741 --role scrum-uid2-elevated -- \ - bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=euid-optout && ./compare_deltas.sh $DATES --regular-prefix optout/delta/ --sqs-prefix sqs-delta/delta/ --quiet" 2>&1) -echo "$OUTPUT" -extract_stats "$OUTPUT" "EUID-PROD" +run_comparison() { + local env_name="$1" + local account="$2" + local role="$3" + local bucket="$4" + local regular_prefix="$5" + local sqs_prefix="$6" + + # Check if we should skip this environment + local env_lower=$(echo "$env_name" | tr '[:upper:]' '[:lower:]') + if [ -n "$ENV_FILTER" ] && [ "$env_lower" != "$ENV_FILTER" ]; then + return + fi + + ENVS_RUN=$((ENVS_RUN + 1)) + echo "======================================== $env_name ========================================" + OUTPUT=$(aws-sso exec --account "$account" --role "$role" -- \ + bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=$bucket && ./compare_deltas.sh $DATES --regular-prefix $regular_prefix --sqs-prefix $sqs_prefix --quiet" 2>&1) + # Filter out the separator lines from compare_deltas.sh output + echo "$OUTPUT" | grep -v "^====" | grep -v "^$" + extract_stats "$OUTPUT" "$env_name" + echo "" +} -echo "================================" -echo "Comparing Deltas - UID2 PROD" -echo "================================" -OUTPUT=$(aws-sso exec --account 475720075663 --role scrum-uid2-elevated -- \ - bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=uid2-optout && ./compare_deltas.sh $DATES --regular-prefix optout-v2/delta/ --sqs-prefix sqs-delta/delta/ --quiet" 2>&1) -echo "$OUTPUT" -extract_stats "$OUTPUT" "UID2-PROD" +run_comparison "UID2-TEST" "072245134533" "scrum-uid2-full-access" "uid2-optout-test-store" "optout/delta/" "sqs-delta/delta/" +run_comparison "EUID-INTEG" "101244608629" "scrum-uid2-elevated" "euid-optout-integ-store" "optout/delta/" "sqs-delta/delta/" +run_comparison "UID2-INTEG" "150073873184" "scrum-uid2-elevated" "uid2-optout-integ-store" "uid2-optout-integ/delta/" "sqs-delta/delta/" +run_comparison "EUID-PROD" "409985233527" "scrum-uid2-elevated" "euid-optout-prod-store" "optout/delta/" "sqs-delta/delta/" +run_comparison "UID2-PROD" "553165044900" "scrum-uid2-elevated" "uid2-optout-prod-store" "optout-v2/delta/" "sqs-delta/delta/" -echo "" -echo "================================================================================" -echo "šŸ“Š AGGREGATE EFFICIENCY SUMMARY" -echo "================================================================================" -echo "" -echo "Environment Breakdown:" +# Only show summary if we ran environments +if [ $ENVS_RUN -eq 0 ]; then + echo "āŒ No matching environment found for: $ENV_FILTER" + echo "Available: uid2-test, euid-integ, uid2-integ, euid-prod, uid2-prod" + exit 1 +fi -echo "" -echo "Total Across All Environments:" -echo " Regular Delta: $TOTAL_REGULAR_FILES files, $TOTAL_REGULAR_ENTRIES entries" -echo " SQS Delta: $TOTAL_SQS_FILES files, $TOTAL_SQS_ENTRIES entries" -echo "" +# Only show summary for multiple environments +if [ $ENVS_RUN -gt 1 ]; then + echo "======================================== SUMMARY ========================================" + echo "Total: Regular $TOTAL_REGULAR_FILES files/$TOTAL_REGULAR_ENTRIES entries, SQS $TOTAL_SQS_FILES files/$TOTAL_SQS_ENTRIES entries" -# Calculate efficiency multipliers -if [ $TOTAL_SQS_FILES -gt 0 ] && [ $TOTAL_SQS_ENTRIES -gt 0 ]; then - FILE_EFFICIENCY=$(awk "BEGIN {printf \"%.2f\", $TOTAL_REGULAR_FILES / $TOTAL_SQS_FILES}") - ENTRY_EFFICIENCY=$(awk "BEGIN {printf \"%.2f\", $TOTAL_REGULAR_ENTRIES / $TOTAL_SQS_ENTRIES}") - FILE_REDUCTION=$(awk "BEGIN {printf \"%.1f\", (($TOTAL_REGULAR_FILES - $TOTAL_SQS_FILES) * 100.0) / $TOTAL_REGULAR_FILES}") - ENTRY_REDUCTION=$(awk "BEGIN {printf \"%.1f\", (($TOTAL_REGULAR_ENTRIES - $TOTAL_SQS_ENTRIES) * 100.0) / $TOTAL_REGULAR_ENTRIES}") - - echo "SQS Efficiency Gains:" - echo " šŸ“ Files: ${FILE_EFFICIENCY}x fewer files (${FILE_REDUCTION}% reduction)" - echo " šŸ“ Entries: ${ENTRY_EFFICIENCY}x fewer entries (${ENTRY_REDUCTION}% reduction)" -else - echo "āš ļø Unable to calculate efficiency (no SQS data)" + if [ $TOTAL_SQS_FILES -gt 0 ] && [ $TOTAL_SQS_ENTRIES -gt 0 ]; then + FILE_EFFICIENCY=$(awk "BEGIN {printf \"%.1f\", $TOTAL_REGULAR_FILES / $TOTAL_SQS_FILES}") + ENTRY_EFFICIENCY=$(awk "BEGIN {printf \"%.1f\", $TOTAL_REGULAR_ENTRIES / $TOTAL_SQS_ENTRIES}") + echo "Efficiency: ${FILE_EFFICIENCY}x fewer files, ${ENTRY_EFFICIENCY}x fewer entries" + fi fi - -echo "" -echo "================================" -echo "All environments compared!" -echo "================================" From 2d26b790cb7e4e37ca2072219cb8e4b7cf358470 Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Tue, 16 Dec 2025 17:28:58 -0700 Subject: [PATCH 07/11] update validation --- compare_delta_folders.py | 178 +++++++++-------------------- compare_deltas.sh | 72 ++---------- compare_deltas_all_environments.sh | 103 ++++++----------- 3 files changed, 94 insertions(+), 259 deletions(-) diff --git a/compare_delta_folders.py b/compare_delta_folders.py index 25ca95b..0c47b10 100755 --- a/compare_delta_folders.py +++ b/compare_delta_folders.py @@ -1,18 +1,4 @@ #!/usr/bin/env python3 -""" -Compare opt-out records between regular delta and SQS delta folders for given date(s). - -This script downloads all delta files from both folders and verifies that all opt-out -records in the regular delta folder are present in the SQS delta folder. - -Delta file format: Each entry is 72 bytes (32-byte hash + 32-byte ID + 8-byte timestamp) - -Usage: - python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 - python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 --date 2025-11-08 - python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 \\ - --regular-prefix optout-v2/delta --sqs-prefix sqs-delta/delta -""" import argparse import struct @@ -30,9 +16,8 @@ class OptOutRecord: - """Represents a single opt-out record (hash + id + timestamp + metadata)""" - - ENTRY_SIZE = 72 # 32 (identity_hash) + 32 (advertising_id) + 7 (timestamp) + 1 (metadata) + # 32 (identity_hash) + 32 (advertising_id) + 7 (timestamp) + 1 (metadata) + ENTRY_SIZE = 72 def __init__(self, identity_hash: bytes, advertising_id: bytes, timestamp: int): self.identity_hash = identity_hash @@ -40,23 +25,19 @@ def __init__(self, identity_hash: bytes, advertising_id: bytes, timestamp: int): self.timestamp = timestamp def is_sentinel(self) -> bool: - """Check if this is a sentinel entry (start or end)""" return (self.identity_hash == b'\x00' * 32 or self.identity_hash == b'\xff' * 32) def __hash__(self): - """Return hash for set/dict operations (only hash+id, not timestamp)""" return hash((self.identity_hash, self.advertising_id)) def __eq__(self, other): - """Compare two OptOutRecord instances for equality (only hash+id, not timestamp)""" if not isinstance(other, OptOutRecord): return False return (self.identity_hash == other.identity_hash and self.advertising_id == other.advertising_id) def __repr__(self): - """Return string representation of the opt-out record""" hash_hex = self.identity_hash.hex()[:16] id_hex = self.advertising_id.hex()[:16] try: @@ -68,32 +49,27 @@ def __repr__(self): def parse_records_from_file(data: bytes) -> List[OptOutRecord]: - """Parse opt-out records from a delta file, skipping sentinels and invalid records""" records = [] offset = 0 - entry_size = OptOutRecord.ENTRY_SIZE # 72 bytes: 32 + 32 + 8 + entry_size = OptOutRecord.ENTRY_SIZE - # Valid timestamp range: Jan 1, 2020 to Jan 1, 2100 MIN_VALID_TIMESTAMP = 1577836800 # 2020-01-01 MAX_VALID_TIMESTAMP = 4102444800 # 2100-01-01 while offset + entry_size <= len(data): - identity_hash = data[offset:offset + 32] # 32 bytes - advertising_id = data[offset + 32:offset + 64] # 32 bytes - # Read 8 bytes but mask to 7 bytes (56 bits) - last byte is metadata + identity_hash = data[offset:offset + 32] + advertising_id = data[offset + 32:offset + 64] + # Last byte is metadata, mask to 56 bits for timestamp timestamp_raw = struct.unpack(' MAX_VALID_TIMESTAMP: - print(f"\n āš ļø Skipping record with invalid timestamp: {timestamp}") offset += entry_size continue @@ -104,18 +80,16 @@ def parse_records_from_file(data: bytes) -> List[OptOutRecord]: def download_from_s3(bucket: str, key: str) -> bytes: - """Download file from S3""" try: s3 = boto3.client('s3') response = s3.get_object(Bucket=bucket, Key=key) return response['Body'].read() except ClientError as error: - print(f"Error downloading s3://{bucket}/{key}: {error}") + print(f"\nError downloading s3://{bucket}/{key}: {error}") raise -def list_files_in_folder(bucket: str, prefix: str) -> List[str]: - """List all .dat files in an S3 folder""" +def list_dat_files(bucket: str, prefix: str) -> List[str]: try: s3 = boto3.client('s3') files = [] @@ -137,57 +111,51 @@ def list_files_in_folder(bucket: str, prefix: str) -> List[str]: def load_records_from_folder( bucket: str, prefix: str, date_folder: str, quiet: bool = False ) -> Tuple[Set[OptOutRecord], dict]: - """Load all opt-out records from all files in a folder""" full_prefix = f"{prefix}{date_folder}/" - - print(f"\nšŸ“‚ Loading files from s3://{bucket}/{full_prefix}") - files = list_files_in_folder(bucket, full_prefix) + files = list_dat_files(bucket, full_prefix) if not files: - print(" āš ļø No .dat files found") + print(f" {date_folder}: no files") return set(), {} - print(f" Found {len(files)} delta files") - all_records = set() file_stats = {} + total_records = 0 for i, file_key in enumerate(files, 1): filename = file_key.split('/')[-1] if not quiet: - print(f" [{i}/{len(files)}] Downloading {filename}...", end='', flush=True) + print(f"\r {date_folder}: [{i}/{len(files)}] {total_records} records", end='', flush=True) try: data = download_from_s3(bucket, file_key) records = parse_records_from_file(data) + total_records += len(records) all_records.update(records) total_entries_in_file = len(data) // OptOutRecord.ENTRY_SIZE file_stats[filename] = { 'size': len(data), 'entries': len(records), - 'total_entries': total_entries_in_file, # Includes sentinels + 'total_entries': total_entries_in_file, 'file_key': file_key } - - if not quiet: - print(f" {len(records)} records") except (ClientError, struct.error, ValueError) as error: - print(f" ERROR: {error}") + print(f"\n ERROR: {error}") continue + if not quiet: + print(f"\r {date_folder}: {len(files)} files, {total_records} records" + " " * 20) + return all_records, file_stats def load_records_from_multiple_folders( bucket: str, prefix: str, date_folders: List[str], quiet: bool = False ) -> Tuple[Set[OptOutRecord], dict]: - """Load and aggregate records from multiple date folders""" all_records = set() all_stats = {} - print(f"\nšŸ“… Loading records from {len(date_folders)} date folder(s)") - for date_folder in date_folders: records, stats = load_records_from_folder(bucket, prefix, date_folder, quiet) all_records.update(records) @@ -199,60 +167,49 @@ def load_records_from_multiple_folders( def analyze_differences(regular_records: Set[OptOutRecord], sqs_records: Set[OptOutRecord], show_samples: int = 10) -> bool: - """Analyze and report differences between record sets""" - - print("\nšŸ“Š Analysis Results") - print(f" Regular delta records: {len(regular_records):,}") - print(f" SQS delta records: {len(sqs_records):,}") + print("\n\nšŸ“Š Analysis Results (unique records)") + print(f"\n Regular: {len(regular_records):,}") + print(f" SQS: {len(sqs_records):,}") - # Records in regular but not in SQS (MISSING from SQS) missing_in_sqs = regular_records - sqs_records - - # Records in SQS but not in regular (EXTRA in SQS) extra_in_sqs = sqs_records - regular_records - - # Common records common = regular_records & sqs_records - print(f" Common records: {len(common):,}") - print(f" Missing from SQS: {len(missing_in_sqs):,}") - print(f" Extra in SQS: {len(extra_in_sqs):,}") + print(f" Common: {len(common):,}") + print(f" Missing: {len(missing_in_sqs):,}") + print(f" Extra: {len(extra_in_sqs):,}") - all_good = True + all_records_matched = True if missing_in_sqs: - print(f"\nāŒ MISSING: {len(missing_in_sqs)} records in regular delta are NOT in SQS delta") - print(f" Sample of missing records (first {min(show_samples, len(missing_in_sqs))}):") + print(f"\nāŒ MISSING: {len(missing_in_sqs)} records in regular are NOT in SQS") + print(f" Sample (first {min(show_samples, len(missing_in_sqs))}):") for i, record in enumerate(list(missing_in_sqs)[:show_samples], 1): print(f" {i}. {record}") if len(missing_in_sqs) > show_samples: print(f" ... and {len(missing_in_sqs) - show_samples} more") - all_good = False - else: - print("\nāœ… All records from regular delta are present in SQS delta") + all_records_matched = False if extra_in_sqs: - print(f"\nāš ļø EXTRA: {len(extra_in_sqs)} records in SQS delta are NOT in regular delta") - print(" (This might be okay if SQS captured additional opt-outs)") - print(f" Sample of extra records (first {min(show_samples, len(extra_in_sqs))}):") + print(f"\nāš ļø EXTRA: {len(extra_in_sqs)} records in SQS are NOT in regular") + print(f" Sample (first {min(show_samples, len(extra_in_sqs))}):") for i, record in enumerate(list(extra_in_sqs)[:show_samples], 1): print(f" {i}. {record}") if len(extra_in_sqs) > show_samples: print(f" ... and {len(extra_in_sqs) - show_samples} more") - return all_good + return all_records_matched def print_file_stats(regular_stats: dict, sqs_stats: dict) -> None: - """Print file statistics for both folders""" - print("\nšŸ“ˆ File Statistics") + print("\n\nšŸ“ˆ File Statistics") print(f"\n Regular Delta Files: {len(regular_stats)}") if regular_stats: total_size = sum(s['size'] for s in regular_stats.values()) total_entries = sum(s['entries'] for s in regular_stats.values()) print(f" Total size: {total_size:,} bytes") - print(f" Total entries: {total_entries:,}") + print(f" Total entries: {total_entries:,} (with duplicates)") print(f" Avg entries/file: {total_entries / len(regular_stats):.1f}") print(f"\n SQS Delta Files: {len(sqs_stats)}") @@ -260,63 +217,39 @@ def print_file_stats(regular_stats: dict, sqs_stats: dict) -> None: total_size = sum(s['size'] for s in sqs_stats.values()) total_entries = sum(s['entries'] for s in sqs_stats.values()) print(f" Total size: {total_size:,} bytes") - print(f" Total entries: {total_entries:,}") + print(f" Total entries: {total_entries:,} (with duplicates)") print(f" Avg entries/file: {total_entries / len(sqs_stats):.1f}") def main() -> None: - """Main entry point for comparing opt-out delta folders.""" parser = argparse.ArgumentParser( - description='Compare opt-out records between regular and SQS delta folders', - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog=""" -Examples: - # Compare folders for a specific date - python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 - - # Compare across multiple dates (handles rollover) - python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 --date 2025-11-08 - - # Use custom prefixes - python3 compare_delta_folders.py --bucket my-bucket --date 2025-11-07 \\ - --regular-prefix optout-v2/delta --sqs-prefix sqs-delta/delta - """ + description='Compare opt-out records between regular and SQS delta folders' ) - - parser.add_argument('--bucket', required=True, - help='S3 bucket name') + parser.add_argument('--bucket', required=True, help='S3 bucket name') parser.add_argument('--date', required=True, action='append', dest='dates', - help='Date folder to compare (e.g., 2025-11-07). Can be specified multiple times.') + help='Date folder (can be specified multiple times)') parser.add_argument('--regular-prefix', default='optout/delta/', - help='S3 prefix for regular delta files (default: optout/delta/)') + help='S3 prefix for regular delta files') parser.add_argument('--sqs-prefix', default='sqs-delta/delta/', - help='S3 prefix for SQS delta files (default: sqs-delta/delta/)') + help='S3 prefix for SQS delta files') parser.add_argument('--show-samples', type=int, default=10, - help='Number of sample records to show for differences (default: 10)') + help='Number of sample records to show for differences') parser.add_argument('--quiet', '-q', action='store_true', help='Suppress download progress output') args = parser.parse_args() - # Display dates being compared - date_display = ', '.join(args.dates) if len(args.dates) > 1 else args.dates[0] - - print("=" * 80) - print(f"šŸ” Comparing Opt-Out Delta Files for {date_display}") - print("=" * 80) - print(f"Bucket: {args.bucket}") - print(f"Regular prefix: {args.regular_prefix}") - print(f"SQS prefix: {args.sqs_prefix}") - print(f"Date folders: {len(args.dates)}") - for date_folder in args.dates: - print(f" - {date_folder}") + date_display = ', '.join(args.dates) + print(f"šŸ” {args.bucket} | Dates: {date_display}") + print(f"\n Regular: {args.regular_prefix}") try: - # Load all records from both folders (aggregating across multiple dates) regular_records, regular_stats = load_records_from_multiple_folders( args.bucket, args.regular_prefix, args.dates, args.quiet ) + print(f"\n SQS: {args.sqs_prefix}") + sqs_records, sqs_stats = load_records_from_multiple_folders( args.bucket, args.sqs_prefix, args.dates, args.quiet ) @@ -324,10 +257,8 @@ def main() -> None: if not regular_records and not sqs_records: print("\nāš ļø No records found in either folder (environment may be empty)") print_file_stats(regular_stats, sqs_stats) - print("\n" + "=" * 80) - print("āœ… SUCCESS: No data to compare (empty environment)") - print("=" * 80) - sys.exit(0) # Empty environment is NOT an error! + print("\nāœ… SUCCESS: No data to compare (empty environment)") + sys.exit(0) if not regular_records: print("\nāš ļø No records in regular delta folder") @@ -335,20 +266,15 @@ def main() -> None: if not sqs_records: print("\nāš ļø No records in SQS delta folder") - # Print file statistics print_file_stats(regular_stats, sqs_stats) - # Analyze differences - all_good = analyze_differences(regular_records, sqs_records, args.show_samples) + all_records_matched = analyze_differences(regular_records, sqs_records, args.show_samples) - print("\n" + "=" * 80) - if all_good: - print("āœ… SUCCESS: All regular delta records are present in SQS delta") - print("=" * 80) + if all_records_matched: + print("\nāœ… SUCCESS: All regular delta records are present in SQS delta") sys.exit(0) else: - print("āŒ FAILURE: Some regular delta records are missing from SQS delta") - print("=" * 80) + print("\nāŒ FAILURE: Some regular delta records are missing from SQS delta") sys.exit(1) except (ClientError, ValueError, OSError) as error: diff --git a/compare_deltas.sh b/compare_deltas.sh index 3045920..a74efa1 100755 --- a/compare_deltas.sh +++ b/compare_deltas.sh @@ -1,53 +1,21 @@ #!/bin/bash -# Compare opt-out records between regular and SQS delta folders for a given date +# Internal script - run via compare_deltas_all_environments.sh set -e +die() { echo "Error: $1" >&2; exit 1; } + BUCKET="${OPTOUT_S3_BUCKET:-}" REGULAR_PREFIX="${REGULAR_PREFIX:-optout/delta/}" SQS_PREFIX="${SQS_PREFIX:-sqs-delta/delta/}" -show_usage() { - echo "Usage: $0 [date2] [date3] ... [options]" - echo " OR: $0 --date [--date ] ... [options]" - echo "" - echo "Compare opt-out records between regular and SQS delta folders for specific date(s)." - echo "Multiple dates can be specified to handle records that roll over midnight." - echo "" - echo "Arguments:" - echo " Date folder(s) to compare (e.g., 2025-11-07)" - echo "" - echo "Options:" - echo " --date Date folder (can be specified multiple times)" - echo " --bucket S3 bucket name (or set OPTOUT_S3_BUCKET env var)" - echo " --regular-prefix Regular delta prefix (default: optout/delta/)" - echo " --sqs-prefix SQS delta prefix (default: sqs-delta/delta/)" - echo " --show-samples Number of sample differences to show (default: 10)" - echo " --quiet, -q Suppress download progress output" - echo "" - echo "Examples:" - echo " # Single date with env variable" - echo " export OPTOUT_S3_BUCKET=my-bucket" - echo " $0 2025-11-07" - echo "" - echo " # Multiple dates to handle rollover (recommended)" - echo " $0 --date 2025-11-07 --date 2025-11-08 --bucket my-bucket" - echo "" - echo " # Positional dates" - echo " $0 2025-11-07 2025-11-08 --bucket my-bucket" - echo "" - echo " # Custom prefixes" - echo " $0 --date 2025-11-07 --bucket my-bucket --regular-prefix optout-v2/delta --sqs-prefix sqs-delta/delta" -} - -# Parse arguments DATES=() EXTRA_ARGS=() while [[ $# -gt 0 ]]; do case $1 in -h|--help) - show_usage + echo "Internal script - use compare_deltas_all_environments.sh instead" exit 0 ;; --bucket) @@ -77,55 +45,31 @@ while [[ $# -gt 0 ]]; do shift ;; -*) - echo "Error: Unknown option: $1" - show_usage - exit 1 + die "Unknown option: $1" ;; *) - # Positional date argument DATES+=("$1") shift ;; esac done -# Strip trailing slashes from dates if present for i in "${!DATES[@]}"; do DATES[$i]="${DATES[$i]%/}" done -if [ ${#DATES[@]} -eq 0 ]; then - echo "Error: At least one date argument is required" - echo "" - show_usage - exit 1 -fi - -if [ -z "$BUCKET" ]; then - echo "Error: S3 bucket not specified" - echo "Set OPTOUT_S3_BUCKET environment variable or use --bucket option" - echo "" - show_usage - exit 1 -fi +[ ${#DATES[@]} -eq 0 ] && die "At least one date argument is required" +[ -z "$BUCKET" ] && die "OPTOUT_S3_BUCKET not set" +[ ! -f "compare_delta_folders.py" ] && die "compare_delta_folders.py not found" -# Check if Python script exists -if [ ! -f "compare_delta_folders.py" ]; then - echo "Error: compare_delta_folders.py not found in current directory" - exit 1 -fi - -# Build date arguments for Python script DATE_ARGS=() for date in "${DATES[@]}"; do DATE_ARGS+=("--date" "$date") done -# Run the comparison python3 compare_delta_folders.py \ --bucket "$BUCKET" \ "${DATE_ARGS[@]}" \ --regular-prefix "$REGULAR_PREFIX" \ --sqs-prefix "$SQS_PREFIX" \ "${EXTRA_ARGS[@]}" - diff --git a/compare_deltas_all_environments.sh b/compare_deltas_all_environments.sh index 9a9abfc..bc62cb8 100755 --- a/compare_deltas_all_environments.sh +++ b/compare_deltas_all_environments.sh @@ -1,26 +1,33 @@ #!/bin/bash -# Compare deltas across all environments -# Requires aws-sso to be installed and configured -# -# Usage: -# ./compare_deltas_all_environments.sh [--env ENV] [DATE...] -# -# Examples: -# ./compare_deltas_all_environments.sh # All envs, yesterday+today -# ./compare_deltas_all_environments.sh 2025-12-15 # All envs, specific date -# ./compare_deltas_all_environments.sh --env uid2-test # Single env, yesterday+today -# ./compare_deltas_all_environments.sh --env uid2-prod 2025-12-15 # Single env, specific date -# -# Available environments: uid2-test, euid-integ, uid2-integ, euid-prod, uid2-prod -set -e +AVAILABLE_ENVS="uid2-test, euid-integ, uid2-integ, euid-prod, uid2-prod" + +show_help() { + echo "Usage: $0 [--env ENV] [DATE...]" + echo "" + echo "Compare opt-out deltas between regular and SQS pipelines across environments." + echo "" + echo "Options:" + echo " --env, -e ENV Run only for specified environment" + echo " --help, -h Show this help message" + echo "" + echo "Examples:" + echo " $0 # All envs, yesterday+today" + echo " $0 2025-12-15 # All envs, specific date" + echo " $0 --env uid2-prod 2025-12-15 # Single env, specific date" + echo "" + echo "Available environments: $AVAILABLE_ENVS" +} -# Parse arguments ENV_FILTER="" DATES="" while [[ $# -gt 0 ]]; do case $1 in + --help|-h) + show_help + exit 0 + ;; --env|-e) ENV_FILTER=$(echo "$2" | tr '[:upper:]' '[:lower:]') shift 2 @@ -32,45 +39,12 @@ while [[ $# -gt 0 ]]; do esac done -# Default to yesterday and today if no dates provided if [ -z "$DATES" ]; then DATES="--date $(date -v-1d +%Y-%m-%d) --date $(date +%Y-%m-%d)" fi -# Initialize aggregate counters -TOTAL_REGULAR_FILES=0 -TOTAL_REGULAR_ENTRIES=0 -TOTAL_SQS_FILES=0 -TOTAL_SQS_ENTRIES=0 ENVS_RUN=0 -# Function to extract and sum statistics -extract_stats() { - local output="$1" - local env_name="$2" - - # Extract file counts - local regular_files=$(echo "$output" | grep "Regular Delta Files:" | awk '{print $4}') - local sqs_files=$(echo "$output" | grep "SQS Delta Files:" | awk '{print $4}') - - # Extract entry counts (from "Total entries:" line) - local regular_entries=$(echo "$output" | grep -A3 "Regular Delta Files:" | grep "Total entries:" | awk '{print $3}') - local sqs_entries=$(echo "$output" | grep -A3 "SQS Delta Files:" | grep "Total entries:" | awk '{print $3}') - - if [ -n "$regular_files" ] && [ -n "$sqs_files" ]; then - echo " $env_name: Regular $regular_files files/$regular_entries entries, SQS $sqs_files files/$sqs_entries entries" - # Remove commas from numbers before arithmetic - regular_files_clean=${regular_files//,/} - regular_entries_clean=${regular_entries//,/} - sqs_files_clean=${sqs_files//,/} - sqs_entries_clean=${sqs_entries//,/} - TOTAL_REGULAR_FILES=$((TOTAL_REGULAR_FILES + regular_files_clean)) - TOTAL_REGULAR_ENTRIES=$((TOTAL_REGULAR_ENTRIES + regular_entries_clean)) - TOTAL_SQS_FILES=$((TOTAL_SQS_FILES + sqs_files_clean)) - TOTAL_SQS_ENTRIES=$((TOTAL_SQS_ENTRIES + sqs_entries_clean)) - fi -} - run_comparison() { local env_name="$1" local account="$2" @@ -79,43 +53,34 @@ run_comparison() { local regular_prefix="$5" local sqs_prefix="$6" - # Check if we should skip this environment local env_lower=$(echo "$env_name" | tr '[:upper:]' '[:lower:]') if [ -n "$ENV_FILTER" ] && [ "$env_lower" != "$ENV_FILTER" ]; then return fi ENVS_RUN=$((ENVS_RUN + 1)) + echo "" echo "======================================== $env_name ========================================" - OUTPUT=$(aws-sso exec --account "$account" --role "$role" -- \ - bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=$bucket && ./compare_deltas.sh $DATES --regular-prefix $regular_prefix --sqs-prefix $sqs_prefix --quiet" 2>&1) - # Filter out the separator lines from compare_deltas.sh output - echo "$OUTPUT" | grep -v "^====" | grep -v "^$" - extract_stats "$OUTPUT" "$env_name" + echo "" + + if ! aws-sso exec --account "$account" --role "$role" -- \ + bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=$bucket && ./compare_deltas.sh $DATES --regular-prefix $regular_prefix --sqs-prefix $sqs_prefix" 2>&1; then + echo "āŒ Command failed for $env_name" + fi + + echo "" + echo "======================================== END $env_name ========================================" echo "" } -run_comparison "UID2-TEST" "072245134533" "scrum-uid2-full-access" "uid2-optout-test-store" "optout/delta/" "sqs-delta/delta/" +run_comparison "UID2-TEST" "072245134533" "scrum-uid2-full-access" "uid2-optout-test-store" "optout-legacy/delta/" "optout/delta/" run_comparison "EUID-INTEG" "101244608629" "scrum-uid2-elevated" "euid-optout-integ-store" "optout/delta/" "sqs-delta/delta/" run_comparison "UID2-INTEG" "150073873184" "scrum-uid2-elevated" "uid2-optout-integ-store" "uid2-optout-integ/delta/" "sqs-delta/delta/" run_comparison "EUID-PROD" "409985233527" "scrum-uid2-elevated" "euid-optout-prod-store" "optout/delta/" "sqs-delta/delta/" run_comparison "UID2-PROD" "553165044900" "scrum-uid2-elevated" "uid2-optout-prod-store" "optout-v2/delta/" "sqs-delta/delta/" -# Only show summary if we ran environments if [ $ENVS_RUN -eq 0 ]; then echo "āŒ No matching environment found for: $ENV_FILTER" - echo "Available: uid2-test, euid-integ, uid2-integ, euid-prod, uid2-prod" + echo "Available: $AVAILABLE_ENVS" exit 1 fi - -# Only show summary for multiple environments -if [ $ENVS_RUN -gt 1 ]; then - echo "======================================== SUMMARY ========================================" - echo "Total: Regular $TOTAL_REGULAR_FILES files/$TOTAL_REGULAR_ENTRIES entries, SQS $TOTAL_SQS_FILES files/$TOTAL_SQS_ENTRIES entries" - - if [ $TOTAL_SQS_FILES -gt 0 ] && [ $TOTAL_SQS_ENTRIES -gt 0 ]; then - FILE_EFFICIENCY=$(awk "BEGIN {printf \"%.1f\", $TOTAL_REGULAR_FILES / $TOTAL_SQS_FILES}") - ENTRY_EFFICIENCY=$(awk "BEGIN {printf \"%.1f\", $TOTAL_REGULAR_ENTRIES / $TOTAL_SQS_ENTRIES}") - echo "Efficiency: ${FILE_EFFICIENCY}x fewer files, ${ENTRY_EFFICIENCY}x fewer entries" - fi -fi From b5c964b65302e608632f0ec93a4b990c7032d1a8 Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Tue, 16 Dec 2025 19:08:37 -0700 Subject: [PATCH 08/11] add cache to script --- .gitignore | 2 ++ compare_delta_folders.py | 39 ++++++++++++++++++++++++++---- compare_deltas_all_environments.sh | 8 +++--- 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/.gitignore b/.gitignore index 4d3c028..7de3a2b 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ out/ generated/ uid2-optout.iml .DS_Store +optout-data/ +.cache/ \ No newline at end of file diff --git a/compare_delta_folders.py b/compare_delta_folders.py index 0c47b10..68b5057 100755 --- a/compare_delta_folders.py +++ b/compare_delta_folders.py @@ -5,6 +5,7 @@ import sys import traceback from datetime import datetime +from pathlib import Path from typing import List, Set, Tuple try: @@ -14,6 +15,8 @@ print("Error: boto3 not installed. Run: pip install boto3") sys.exit(1) +CACHE_DIR = "./.cache/delta-cache/" + class OptOutRecord: # 32 (identity_hash) + 32 (advertising_id) + 7 (timestamp) + 1 (metadata) @@ -79,11 +82,32 @@ def parse_records_from_file(data: bytes) -> List[OptOutRecord]: return records -def download_from_s3(bucket: str, key: str) -> bytes: +def get_cached_file(bucket: str, key: str) -> bytes | None: + filename = key.split('/')[-1] + cache_path = Path(CACHE_DIR) / bucket / filename + if cache_path.exists(): + return cache_path.read_bytes() + return None + + +def save_to_cache(bucket: str, key: str, data: bytes) -> None: + filename = key.split('/')[-1] + cache_path = Path(CACHE_DIR) / bucket / filename + cache_path.parent.mkdir(parents=True, exist_ok=True) + cache_path.write_bytes(data) + + +def download_from_s3(bucket: str, key: str) -> tuple[bytes, bool]: + cached = get_cached_file(bucket, key) + if cached is not None: + return cached, True + try: s3 = boto3.client('s3') response = s3.get_object(Bucket=bucket, Key=key) - return response['Body'].read() + data = response['Body'].read() + save_to_cache(bucket, key, data) + return data, False except ClientError as error: print(f"\nError downloading s3://{bucket}/{key}: {error}") raise @@ -121,14 +145,18 @@ def load_records_from_folder( all_records = set() file_stats = {} total_records = 0 + cached_count = 0 for i, file_key in enumerate(files, 1): filename = file_key.split('/')[-1] if not quiet: - print(f"\r {date_folder}: [{i}/{len(files)}] {total_records} records", end='', flush=True) + cache_info = f" ({cached_count} cached)" if cached_count > 0 else "" + print(f"\r {date_folder}: [{i}/{len(files)}] {total_records} records{cache_info}", end='', flush=True) try: - data = download_from_s3(bucket, file_key) + data, from_cache = download_from_s3(bucket, file_key) + if from_cache: + cached_count += 1 records = parse_records_from_file(data) total_records += len(records) @@ -145,7 +173,8 @@ def load_records_from_folder( continue if not quiet: - print(f"\r {date_folder}: {len(files)} files, {total_records} records" + " " * 20) + cache_info = f" ({cached_count} cached)" if cached_count > 0 else "" + print(f"\r {date_folder}: {len(files)} files, {total_records} records{cache_info}" + " " * 20) return all_records, file_stats diff --git a/compare_deltas_all_environments.sh b/compare_deltas_all_environments.sh index bc62cb8..08344f9 100755 --- a/compare_deltas_all_environments.sh +++ b/compare_deltas_all_environments.sh @@ -74,10 +74,10 @@ run_comparison() { } run_comparison "UID2-TEST" "072245134533" "scrum-uid2-full-access" "uid2-optout-test-store" "optout-legacy/delta/" "optout/delta/" -run_comparison "EUID-INTEG" "101244608629" "scrum-uid2-elevated" "euid-optout-integ-store" "optout/delta/" "sqs-delta/delta/" -run_comparison "UID2-INTEG" "150073873184" "scrum-uid2-elevated" "uid2-optout-integ-store" "uid2-optout-integ/delta/" "sqs-delta/delta/" -run_comparison "EUID-PROD" "409985233527" "scrum-uid2-elevated" "euid-optout-prod-store" "optout/delta/" "sqs-delta/delta/" -run_comparison "UID2-PROD" "553165044900" "scrum-uid2-elevated" "uid2-optout-prod-store" "optout-v2/delta/" "sqs-delta/delta/" +run_comparison "EUID-INTEG" "101244608629" "scrum-uid2-elevated" "euid-optout-integ-store" "optout-legacy/delta/" "optout/delta/" +run_comparison "UID2-INTEG" "150073873184" "scrum-uid2-elevated" "uid2-optout-integ-store" "optout-legacy/delta/" "uid2-optout-integ/delta/" +# run_comparison "EUID-PROD" "409985233527" "scrum-uid2-elevated" "euid-optout-prod-store" "optout/delta/" "sqs-delta/delta/" +# run_comparison "UID2-PROD" "553165044900" "scrum-uid2-elevated" "uid2-optout-prod-store" "optout-v2/delta/" "sqs-delta/delta/" if [ $ENVS_RUN -eq 0 ]; then echo "āŒ No matching environment found for: $ENV_FILTER" From 561af93777602ecc158abff5a979ed5dc25e7631 Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Tue, 16 Dec 2025 19:18:25 -0700 Subject: [PATCH 09/11] refactor --- compare_delta_folders.py | 44 +++++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/compare_delta_folders.py b/compare_delta_folders.py index 68b5057..d5402a7 100755 --- a/compare_delta_folders.py +++ b/compare_delta_folders.py @@ -6,7 +6,7 @@ import traceback from datetime import datetime from pathlib import Path -from typing import List, Set, Tuple +from typing import Dict, List, Optional, Set, Tuple try: import boto3 @@ -15,12 +15,20 @@ print("Error: boto3 not installed. Run: pip install boto3") sys.exit(1) -CACHE_DIR = "./.cache/delta-cache/" +CACHE_DIR = "./.cache/delta-cache/" + +IDENTITY_HASH_SIZE = 32 +ADVERTISING_ID_SIZE = 32 +TIMESTAMP_AND_METADATA_SIZE = 8 + +MIN_VALID_TIMESTAMP = 1577836800 # 2020-01-01 +MAX_VALID_TIMESTAMP = 4102444800 # 2100-01-01 + +TIMESTAMP_MASK = 0x00FFFFFFFFFFFFFF # Masks out the metadata byte class OptOutRecord: - # 32 (identity_hash) + 32 (advertising_id) + 7 (timestamp) + 1 (metadata) - ENTRY_SIZE = 72 + ENTRY_SIZE = IDENTITY_HASH_SIZE + ADVERTISING_ID_SIZE + TIMESTAMP_AND_METADATA_SIZE def __init__(self, identity_hash: bytes, advertising_id: bytes, timestamp: int): self.identity_hash = identity_hash @@ -28,8 +36,8 @@ def __init__(self, identity_hash: bytes, advertising_id: bytes, timestamp: int): self.timestamp = timestamp def is_sentinel(self) -> bool: - return (self.identity_hash == b'\x00' * 32 or - self.identity_hash == b'\xff' * 32) + return (self.identity_hash == b'\x00' * IDENTITY_HASH_SIZE or + self.identity_hash == b'\xff' * IDENTITY_HASH_SIZE) def __hash__(self): return hash((self.identity_hash, self.advertising_id)) @@ -55,16 +63,13 @@ def parse_records_from_file(data: bytes) -> List[OptOutRecord]: records = [] offset = 0 entry_size = OptOutRecord.ENTRY_SIZE - - MIN_VALID_TIMESTAMP = 1577836800 # 2020-01-01 - MAX_VALID_TIMESTAMP = 4102444800 # 2100-01-01 + timestamp_offset = IDENTITY_HASH_SIZE + ADVERTISING_ID_SIZE while offset + entry_size <= len(data): - identity_hash = data[offset:offset + 32] - advertising_id = data[offset + 32:offset + 64] - # Last byte is metadata, mask to 56 bits for timestamp - timestamp_raw = struct.unpack(' List[OptOutRecord]: return records -def get_cached_file(bucket: str, key: str) -> bytes | None: +def get_cached_file(bucket: str, key: str) -> Optional[bytes]: filename = key.split('/')[-1] cache_path = Path(CACHE_DIR) / bucket / filename if cache_path.exists(): @@ -97,7 +102,8 @@ def save_to_cache(bucket: str, key: str, data: bytes) -> None: cache_path.write_bytes(data) -def download_from_s3(bucket: str, key: str) -> tuple[bytes, bool]: +def download_from_s3(bucket: str, key: str) -> Tuple[bytes, bool]: + """Returns (data, was_cached) tuple.""" cached = get_cached_file(bucket, key) if cached is not None: return cached, True @@ -134,7 +140,7 @@ def list_dat_files(bucket: str, prefix: str) -> List[str]: def load_records_from_folder( bucket: str, prefix: str, date_folder: str, quiet: bool = False -) -> Tuple[Set[OptOutRecord], dict]: +) -> Tuple[Set[OptOutRecord], Dict[str, dict]]: full_prefix = f"{prefix}{date_folder}/" files = list_dat_files(bucket, full_prefix) @@ -181,7 +187,7 @@ def load_records_from_folder( def load_records_from_multiple_folders( bucket: str, prefix: str, date_folders: List[str], quiet: bool = False -) -> Tuple[Set[OptOutRecord], dict]: +) -> Tuple[Set[OptOutRecord], Dict[str, dict]]: all_records = set() all_stats = {} @@ -230,7 +236,7 @@ def analyze_differences(regular_records: Set[OptOutRecord], return all_records_matched -def print_file_stats(regular_stats: dict, sqs_stats: dict) -> None: +def print_file_stats(regular_stats: Dict[str, dict], sqs_stats: Dict[str, dict]) -> None: print("\n\nšŸ“ˆ File Statistics") print(f"\n Regular Delta Files: {len(regular_stats)}") From 0fec5b5b1c00fecc1d1968daf19e227e38a8f44b Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Wed, 17 Dec 2025 12:08:50 -0700 Subject: [PATCH 10/11] pylint fixes --- compare_delta_folders.py | 33 ++++++++++++++++++++++++++------- 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/compare_delta_folders.py b/compare_delta_folders.py index d5402a7..368d603 100755 --- a/compare_delta_folders.py +++ b/compare_delta_folders.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +"""Compare opt-out records between regular and SQS delta folders in S3.""" import argparse import struct @@ -6,7 +7,7 @@ import traceback from datetime import datetime from pathlib import Path -from typing import Dict, List, Optional, Set, Tuple +from typing import Any, Dict, List, Optional, Set, Tuple try: import boto3 @@ -28,6 +29,8 @@ class OptOutRecord: + """Represents a single opt-out record from a delta file.""" + ENTRY_SIZE = IDENTITY_HASH_SIZE + ADVERTISING_ID_SIZE + TIMESTAMP_AND_METADATA_SIZE def __init__(self, identity_hash: bytes, advertising_id: bytes, timestamp: int): @@ -36,19 +39,20 @@ def __init__(self, identity_hash: bytes, advertising_id: bytes, timestamp: int): self.timestamp = timestamp def is_sentinel(self) -> bool: + """Return True if this record is a sentinel (all zeros or all ones).""" return (self.identity_hash == b'\x00' * IDENTITY_HASH_SIZE or self.identity_hash == b'\xff' * IDENTITY_HASH_SIZE) - def __hash__(self): + def __hash__(self) -> int: return hash((self.identity_hash, self.advertising_id)) - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: if not isinstance(other, OptOutRecord): - return False + return NotImplemented return (self.identity_hash == other.identity_hash and self.advertising_id == other.advertising_id) - def __repr__(self): + def __repr__(self) -> str: hash_hex = self.identity_hash.hex()[:16] id_hex = self.advertising_id.hex()[:16] try: @@ -60,6 +64,7 @@ def __repr__(self): def parse_records_from_file(data: bytes) -> List[OptOutRecord]: + """Parse binary data into a list of OptOutRecords, filtering invalid entries.""" records = [] offset = 0 entry_size = OptOutRecord.ENTRY_SIZE @@ -88,6 +93,7 @@ def parse_records_from_file(data: bytes) -> List[OptOutRecord]: def get_cached_file(bucket: str, key: str) -> Optional[bytes]: + """Return cached file contents if available, otherwise None.""" filename = key.split('/')[-1] cache_path = Path(CACHE_DIR) / bucket / filename if cache_path.exists(): @@ -96,6 +102,7 @@ def get_cached_file(bucket: str, key: str) -> Optional[bytes]: def save_to_cache(bucket: str, key: str, data: bytes) -> None: + """Save file data to local cache directory.""" filename = key.split('/')[-1] cache_path = Path(CACHE_DIR) / bucket / filename cache_path.parent.mkdir(parents=True, exist_ok=True) @@ -120,6 +127,7 @@ def download_from_s3(bucket: str, key: str) -> Tuple[bytes, bool]: def list_dat_files(bucket: str, prefix: str) -> List[str]: + """List all .dat files in the given S3 bucket and prefix.""" try: s3 = boto3.client('s3') files = [] @@ -141,6 +149,7 @@ def list_dat_files(bucket: str, prefix: str) -> List[str]: def load_records_from_folder( bucket: str, prefix: str, date_folder: str, quiet: bool = False ) -> Tuple[Set[OptOutRecord], Dict[str, dict]]: + """Load all records from a single date folder, returning records and file stats.""" full_prefix = f"{prefix}{date_folder}/" files = list_dat_files(bucket, full_prefix) @@ -157,7 +166,8 @@ def load_records_from_folder( filename = file_key.split('/')[-1] if not quiet: cache_info = f" ({cached_count} cached)" if cached_count > 0 else "" - print(f"\r {date_folder}: [{i}/{len(files)}] {total_records} records{cache_info}", end='', flush=True) + progress = f"\r {date_folder}: [{i}/{len(files)}] {total_records} records{cache_info}" + print(progress, end='', flush=True) try: data, from_cache = download_from_s3(bucket, file_key) @@ -180,7 +190,8 @@ def load_records_from_folder( if not quiet: cache_info = f" ({cached_count} cached)" if cached_count > 0 else "" - print(f"\r {date_folder}: {len(files)} files, {total_records} records{cache_info}" + " " * 20) + summary = f"\r {date_folder}: {len(files)} files, {total_records} records{cache_info}" + print(summary + " " * 20) return all_records, file_stats @@ -188,6 +199,7 @@ def load_records_from_folder( def load_records_from_multiple_folders( bucket: str, prefix: str, date_folders: List[str], quiet: bool = False ) -> Tuple[Set[OptOutRecord], Dict[str, dict]]: + """Load and merge records from multiple date folders.""" all_records = set() all_stats = {} @@ -202,6 +214,11 @@ def load_records_from_multiple_folders( def analyze_differences(regular_records: Set[OptOutRecord], sqs_records: Set[OptOutRecord], show_samples: int = 10) -> bool: + """ + Compare record sets and print differences. + + Returns True if all regular records exist in SQS. + """ print("\n\nšŸ“Š Analysis Results (unique records)") print(f"\n Regular: {len(regular_records):,}") print(f" SQS: {len(sqs_records):,}") @@ -237,6 +254,7 @@ def analyze_differences(regular_records: Set[OptOutRecord], def print_file_stats(regular_stats: Dict[str, dict], sqs_stats: Dict[str, dict]) -> None: + """Print summary statistics for regular and SQS delta files.""" print("\n\nšŸ“ˆ File Statistics") print(f"\n Regular Delta Files: {len(regular_stats)}") @@ -257,6 +275,7 @@ def print_file_stats(regular_stats: Dict[str, dict], sqs_stats: Dict[str, dict]) def main() -> None: + """Entry point: parse arguments and run the comparison.""" parser = argparse.ArgumentParser( description='Compare opt-out records between regular and SQS delta folders' ) From e469d545aafce4dfbaa7cabbda42794ddd1cc82f Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Fri, 19 Dec 2025 14:44:11 -0700 Subject: [PATCH 11/11] update folders --- compare_deltas_all_environments.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/compare_deltas_all_environments.sh b/compare_deltas_all_environments.sh index 08344f9..77bb8b8 100755 --- a/compare_deltas_all_environments.sh +++ b/compare_deltas_all_environments.sh @@ -76,8 +76,8 @@ run_comparison() { run_comparison "UID2-TEST" "072245134533" "scrum-uid2-full-access" "uid2-optout-test-store" "optout-legacy/delta/" "optout/delta/" run_comparison "EUID-INTEG" "101244608629" "scrum-uid2-elevated" "euid-optout-integ-store" "optout-legacy/delta/" "optout/delta/" run_comparison "UID2-INTEG" "150073873184" "scrum-uid2-elevated" "uid2-optout-integ-store" "optout-legacy/delta/" "uid2-optout-integ/delta/" -# run_comparison "EUID-PROD" "409985233527" "scrum-uid2-elevated" "euid-optout-prod-store" "optout/delta/" "sqs-delta/delta/" -# run_comparison "UID2-PROD" "553165044900" "scrum-uid2-elevated" "uid2-optout-prod-store" "optout-v2/delta/" "sqs-delta/delta/" +run_comparison "EUID-PROD" "409985233527" "scrum-uid2-elevated" "euid-optout-prod-store" "optout-legacy/delta/" "optout/delta/" +run_comparison "UID2-PROD" "553165044900" "scrum-uid2-elevated" "uid2-optout-prod-store" "optout-legacy/delta/" "optout-v2/delta/" if [ $ENVS_RUN -eq 0 ]; then echo "āŒ No matching environment found for: $ENV_FILTER"