diff --git a/.gitignore b/.gitignore index 4d3c028..7de3a2b 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,5 @@ out/ generated/ uid2-optout.iml .DS_Store +optout-data/ +.cache/ \ No newline at end of file diff --git a/compare_delta_folders.py b/compare_delta_folders.py new file mode 100755 index 0000000..368d603 --- /dev/null +++ b/compare_delta_folders.py @@ -0,0 +1,345 @@ +#!/usr/bin/env python3 +"""Compare opt-out records between regular and SQS delta folders in S3.""" + +import argparse +import struct +import sys +import traceback +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional, Set, Tuple + +try: + import boto3 + from botocore.exceptions import ClientError +except ImportError: + print("Error: boto3 not installed. Run: pip install boto3") + sys.exit(1) + +CACHE_DIR = "./.cache/delta-cache/" + +IDENTITY_HASH_SIZE = 32 +ADVERTISING_ID_SIZE = 32 +TIMESTAMP_AND_METADATA_SIZE = 8 + +MIN_VALID_TIMESTAMP = 1577836800 # 2020-01-01 +MAX_VALID_TIMESTAMP = 4102444800 # 2100-01-01 + +TIMESTAMP_MASK = 0x00FFFFFFFFFFFFFF # Masks out the metadata byte + + +class OptOutRecord: + """Represents a single opt-out record from a delta file.""" + + ENTRY_SIZE = IDENTITY_HASH_SIZE + ADVERTISING_ID_SIZE + TIMESTAMP_AND_METADATA_SIZE + + def __init__(self, identity_hash: bytes, advertising_id: bytes, timestamp: int): + self.identity_hash = identity_hash + self.advertising_id = advertising_id + self.timestamp = timestamp + + def is_sentinel(self) -> bool: + """Return True if this record is a sentinel (all zeros or all ones).""" + return (self.identity_hash == b'\x00' * IDENTITY_HASH_SIZE or + self.identity_hash == b'\xff' * IDENTITY_HASH_SIZE) + + def __hash__(self) -> int: + return hash((self.identity_hash, self.advertising_id)) + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, OptOutRecord): + return NotImplemented + return (self.identity_hash == other.identity_hash and + self.advertising_id == other.advertising_id) + + def __repr__(self) -> str: + hash_hex = self.identity_hash.hex()[:16] + id_hex = self.advertising_id.hex()[:16] + try: + dt = datetime.fromtimestamp(self.timestamp) + dt_str = dt.strftime('%Y-%m-%d %H:%M:%S') + except (ValueError, OSError, OverflowError): + dt_str = "INVALID_TS" + return f"OptOutRecord(hash={hash_hex}..., id={id_hex}..., ts={self.timestamp} [{dt_str}])" + + +def parse_records_from_file(data: bytes) -> List[OptOutRecord]: + """Parse binary data into a list of OptOutRecords, filtering invalid entries.""" + records = [] + offset = 0 + entry_size = OptOutRecord.ENTRY_SIZE + timestamp_offset = IDENTITY_HASH_SIZE + ADVERTISING_ID_SIZE + + while offset + entry_size <= len(data): + identity_hash = data[offset:offset + IDENTITY_HASH_SIZE] + advertising_id = data[offset + IDENTITY_HASH_SIZE:offset + timestamp_offset] + timestamp_raw = struct.unpack(' MAX_VALID_TIMESTAMP: + offset += entry_size + continue + + records.append(record) + offset += entry_size + + return records + + +def get_cached_file(bucket: str, key: str) -> Optional[bytes]: + """Return cached file contents if available, otherwise None.""" + filename = key.split('/')[-1] + cache_path = Path(CACHE_DIR) / bucket / filename + if cache_path.exists(): + return cache_path.read_bytes() + return None + + +def save_to_cache(bucket: str, key: str, data: bytes) -> None: + """Save file data to local cache directory.""" + filename = key.split('/')[-1] + cache_path = Path(CACHE_DIR) / bucket / filename + cache_path.parent.mkdir(parents=True, exist_ok=True) + cache_path.write_bytes(data) + + +def download_from_s3(bucket: str, key: str) -> Tuple[bytes, bool]: + """Returns (data, was_cached) tuple.""" + cached = get_cached_file(bucket, key) + if cached is not None: + return cached, True + + try: + s3 = boto3.client('s3') + response = s3.get_object(Bucket=bucket, Key=key) + data = response['Body'].read() + save_to_cache(bucket, key, data) + return data, False + except ClientError as error: + print(f"\nError downloading s3://{bucket}/{key}: {error}") + raise + + +def list_dat_files(bucket: str, prefix: str) -> List[str]: + """List all .dat files in the given S3 bucket and prefix.""" + try: + s3 = boto3.client('s3') + files = [] + paginator = s3.get_paginator('list_objects_v2') + + for page in paginator.paginate(Bucket=bucket, Prefix=prefix): + if 'Contents' not in page: + continue + for obj in page['Contents']: + if obj['Key'].endswith('.dat'): + files.append(obj['Key']) + + return sorted(files) + except ClientError as error: + print(f"Error listing files in s3://{bucket}/{prefix}: {error}") + raise + + +def load_records_from_folder( + bucket: str, prefix: str, date_folder: str, quiet: bool = False +) -> Tuple[Set[OptOutRecord], Dict[str, dict]]: + """Load all records from a single date folder, returning records and file stats.""" + full_prefix = f"{prefix}{date_folder}/" + files = list_dat_files(bucket, full_prefix) + + if not files: + print(f" {date_folder}: no files") + return set(), {} + + all_records = set() + file_stats = {} + total_records = 0 + cached_count = 0 + + for i, file_key in enumerate(files, 1): + filename = file_key.split('/')[-1] + if not quiet: + cache_info = f" ({cached_count} cached)" if cached_count > 0 else "" + progress = f"\r {date_folder}: [{i}/{len(files)}] {total_records} records{cache_info}" + print(progress, end='', flush=True) + + try: + data, from_cache = download_from_s3(bucket, file_key) + if from_cache: + cached_count += 1 + records = parse_records_from_file(data) + total_records += len(records) + + all_records.update(records) + total_entries_in_file = len(data) // OptOutRecord.ENTRY_SIZE + file_stats[filename] = { + 'size': len(data), + 'entries': len(records), + 'total_entries': total_entries_in_file, + 'file_key': file_key + } + except (ClientError, struct.error, ValueError) as error: + print(f"\n ERROR: {error}") + continue + + if not quiet: + cache_info = f" ({cached_count} cached)" if cached_count > 0 else "" + summary = f"\r {date_folder}: {len(files)} files, {total_records} records{cache_info}" + print(summary + " " * 20) + + return all_records, file_stats + + +def load_records_from_multiple_folders( + bucket: str, prefix: str, date_folders: List[str], quiet: bool = False +) -> Tuple[Set[OptOutRecord], Dict[str, dict]]: + """Load and merge records from multiple date folders.""" + all_records = set() + all_stats = {} + + for date_folder in date_folders: + records, stats = load_records_from_folder(bucket, prefix, date_folder, quiet) + all_records.update(records) + all_stats.update(stats) + + return all_records, all_stats + + +def analyze_differences(regular_records: Set[OptOutRecord], + sqs_records: Set[OptOutRecord], + show_samples: int = 10) -> bool: + """ + Compare record sets and print differences. + + Returns True if all regular records exist in SQS. + """ + print("\n\n📊 Analysis Results (unique records)") + print(f"\n Regular: {len(regular_records):,}") + print(f" SQS: {len(sqs_records):,}") + + missing_in_sqs = regular_records - sqs_records + extra_in_sqs = sqs_records - regular_records + common = regular_records & sqs_records + + print(f" Common: {len(common):,}") + print(f" Missing: {len(missing_in_sqs):,}") + print(f" Extra: {len(extra_in_sqs):,}") + + all_records_matched = True + + if missing_in_sqs: + print(f"\n❌ MISSING: {len(missing_in_sqs)} records in regular are NOT in SQS") + print(f" Sample (first {min(show_samples, len(missing_in_sqs))}):") + for i, record in enumerate(list(missing_in_sqs)[:show_samples], 1): + print(f" {i}. {record}") + if len(missing_in_sqs) > show_samples: + print(f" ... and {len(missing_in_sqs) - show_samples} more") + all_records_matched = False + + if extra_in_sqs: + print(f"\n⚠️ EXTRA: {len(extra_in_sqs)} records in SQS are NOT in regular") + print(f" Sample (first {min(show_samples, len(extra_in_sqs))}):") + for i, record in enumerate(list(extra_in_sqs)[:show_samples], 1): + print(f" {i}. {record}") + if len(extra_in_sqs) > show_samples: + print(f" ... and {len(extra_in_sqs) - show_samples} more") + + return all_records_matched + + +def print_file_stats(regular_stats: Dict[str, dict], sqs_stats: Dict[str, dict]) -> None: + """Print summary statistics for regular and SQS delta files.""" + print("\n\n📈 File Statistics") + + print(f"\n Regular Delta Files: {len(regular_stats)}") + if regular_stats: + total_size = sum(s['size'] for s in regular_stats.values()) + total_entries = sum(s['entries'] for s in regular_stats.values()) + print(f" Total size: {total_size:,} bytes") + print(f" Total entries: {total_entries:,} (with duplicates)") + print(f" Avg entries/file: {total_entries / len(regular_stats):.1f}") + + print(f"\n SQS Delta Files: {len(sqs_stats)}") + if sqs_stats: + total_size = sum(s['size'] for s in sqs_stats.values()) + total_entries = sum(s['entries'] for s in sqs_stats.values()) + print(f" Total size: {total_size:,} bytes") + print(f" Total entries: {total_entries:,} (with duplicates)") + print(f" Avg entries/file: {total_entries / len(sqs_stats):.1f}") + + +def main() -> None: + """Entry point: parse arguments and run the comparison.""" + parser = argparse.ArgumentParser( + description='Compare opt-out records between regular and SQS delta folders' + ) + parser.add_argument('--bucket', required=True, help='S3 bucket name') + parser.add_argument('--date', required=True, action='append', dest='dates', + help='Date folder (can be specified multiple times)') + parser.add_argument('--regular-prefix', default='optout/delta/', + help='S3 prefix for regular delta files') + parser.add_argument('--sqs-prefix', default='sqs-delta/delta/', + help='S3 prefix for SQS delta files') + parser.add_argument('--show-samples', type=int, default=10, + help='Number of sample records to show for differences') + parser.add_argument('--quiet', '-q', action='store_true', + help='Suppress download progress output') + + args = parser.parse_args() + + date_display = ', '.join(args.dates) + print(f"🔍 {args.bucket} | Dates: {date_display}") + print(f"\n Regular: {args.regular_prefix}") + + try: + regular_records, regular_stats = load_records_from_multiple_folders( + args.bucket, args.regular_prefix, args.dates, args.quiet + ) + + print(f"\n SQS: {args.sqs_prefix}") + + sqs_records, sqs_stats = load_records_from_multiple_folders( + args.bucket, args.sqs_prefix, args.dates, args.quiet + ) + + if not regular_records and not sqs_records: + print("\n⚠️ No records found in either folder (environment may be empty)") + print_file_stats(regular_stats, sqs_stats) + print("\n✅ SUCCESS: No data to compare (empty environment)") + sys.exit(0) + + if not regular_records: + print("\n⚠️ No records in regular delta folder") + + if not sqs_records: + print("\n⚠️ No records in SQS delta folder") + + print_file_stats(regular_stats, sqs_stats) + + all_records_matched = analyze_differences(regular_records, sqs_records, args.show_samples) + + if all_records_matched: + print("\n✅ SUCCESS: All regular delta records are present in SQS delta") + sys.exit(0) + else: + print("\n❌ FAILURE: Some regular delta records are missing from SQS delta") + sys.exit(1) + + except (ClientError, ValueError, OSError) as error: + print(f"\n❌ Error: {error}") + traceback.print_exc() + sys.exit(1) + except Exception as error: # pylint: disable=broad-except + print(f"\n❌ Unexpected error: {error}") + traceback.print_exc() + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/compare_deltas.sh b/compare_deltas.sh new file mode 100755 index 0000000..a74efa1 --- /dev/null +++ b/compare_deltas.sh @@ -0,0 +1,75 @@ +#!/bin/bash +# Internal script - run via compare_deltas_all_environments.sh + +set -e + +die() { echo "Error: $1" >&2; exit 1; } + +BUCKET="${OPTOUT_S3_BUCKET:-}" +REGULAR_PREFIX="${REGULAR_PREFIX:-optout/delta/}" +SQS_PREFIX="${SQS_PREFIX:-sqs-delta/delta/}" + +DATES=() +EXTRA_ARGS=() + +while [[ $# -gt 0 ]]; do + case $1 in + -h|--help) + echo "Internal script - use compare_deltas_all_environments.sh instead" + exit 0 + ;; + --bucket) + BUCKET="$2" + shift 2 + ;; + --date) + DATES+=("$2") + shift 2 + ;; + --regular-prefix) + REGULAR_PREFIX="$2" + EXTRA_ARGS+=("--regular-prefix" "$2") + shift 2 + ;; + --sqs-prefix) + SQS_PREFIX="$2" + EXTRA_ARGS+=("--sqs-prefix" "$2") + shift 2 + ;; + --show-samples) + EXTRA_ARGS+=("--show-samples" "$2") + shift 2 + ;; + --quiet|-q) + EXTRA_ARGS+=("--quiet") + shift + ;; + -*) + die "Unknown option: $1" + ;; + *) + DATES+=("$1") + shift + ;; + esac +done + +for i in "${!DATES[@]}"; do + DATES[$i]="${DATES[$i]%/}" +done + +[ ${#DATES[@]} -eq 0 ] && die "At least one date argument is required" +[ -z "$BUCKET" ] && die "OPTOUT_S3_BUCKET not set" +[ ! -f "compare_delta_folders.py" ] && die "compare_delta_folders.py not found" + +DATE_ARGS=() +for date in "${DATES[@]}"; do + DATE_ARGS+=("--date" "$date") +done + +python3 compare_delta_folders.py \ + --bucket "$BUCKET" \ + "${DATE_ARGS[@]}" \ + --regular-prefix "$REGULAR_PREFIX" \ + --sqs-prefix "$SQS_PREFIX" \ + "${EXTRA_ARGS[@]}" diff --git a/compare_deltas_all_environments.sh b/compare_deltas_all_environments.sh new file mode 100755 index 0000000..77bb8b8 --- /dev/null +++ b/compare_deltas_all_environments.sh @@ -0,0 +1,86 @@ +#!/bin/bash + +AVAILABLE_ENVS="uid2-test, euid-integ, uid2-integ, euid-prod, uid2-prod" + +show_help() { + echo "Usage: $0 [--env ENV] [DATE...]" + echo "" + echo "Compare opt-out deltas between regular and SQS pipelines across environments." + echo "" + echo "Options:" + echo " --env, -e ENV Run only for specified environment" + echo " --help, -h Show this help message" + echo "" + echo "Examples:" + echo " $0 # All envs, yesterday+today" + echo " $0 2025-12-15 # All envs, specific date" + echo " $0 --env uid2-prod 2025-12-15 # Single env, specific date" + echo "" + echo "Available environments: $AVAILABLE_ENVS" +} + +ENV_FILTER="" +DATES="" + +while [[ $# -gt 0 ]]; do + case $1 in + --help|-h) + show_help + exit 0 + ;; + --env|-e) + ENV_FILTER=$(echo "$2" | tr '[:upper:]' '[:lower:]') + shift 2 + ;; + *) + DATES="$DATES --date $1" + shift + ;; + esac +done + +if [ -z "$DATES" ]; then + DATES="--date $(date -v-1d +%Y-%m-%d) --date $(date +%Y-%m-%d)" +fi + +ENVS_RUN=0 + +run_comparison() { + local env_name="$1" + local account="$2" + local role="$3" + local bucket="$4" + local regular_prefix="$5" + local sqs_prefix="$6" + + local env_lower=$(echo "$env_name" | tr '[:upper:]' '[:lower:]') + if [ -n "$ENV_FILTER" ] && [ "$env_lower" != "$ENV_FILTER" ]; then + return + fi + + ENVS_RUN=$((ENVS_RUN + 1)) + echo "" + echo "======================================== $env_name ========================================" + echo "" + + if ! aws-sso exec --account "$account" --role "$role" -- \ + bash -c "cd /Users/ian.nara/service/uid2-optout && source .venv/bin/activate && export OPTOUT_S3_BUCKET=$bucket && ./compare_deltas.sh $DATES --regular-prefix $regular_prefix --sqs-prefix $sqs_prefix" 2>&1; then + echo "❌ Command failed for $env_name" + fi + + echo "" + echo "======================================== END $env_name ========================================" + echo "" +} + +run_comparison "UID2-TEST" "072245134533" "scrum-uid2-full-access" "uid2-optout-test-store" "optout-legacy/delta/" "optout/delta/" +run_comparison "EUID-INTEG" "101244608629" "scrum-uid2-elevated" "euid-optout-integ-store" "optout-legacy/delta/" "optout/delta/" +run_comparison "UID2-INTEG" "150073873184" "scrum-uid2-elevated" "uid2-optout-integ-store" "optout-legacy/delta/" "uid2-optout-integ/delta/" +run_comparison "EUID-PROD" "409985233527" "scrum-uid2-elevated" "euid-optout-prod-store" "optout-legacy/delta/" "optout/delta/" +run_comparison "UID2-PROD" "553165044900" "scrum-uid2-elevated" "uid2-optout-prod-store" "optout-legacy/delta/" "optout-v2/delta/" + +if [ $ENVS_RUN -eq 0 ]; then + echo "❌ No matching environment found for: $ENV_FILTER" + echo "Available: $AVAILABLE_ENVS" + exit 1 +fi