From a590652a8b50642e2d0c2cd8cbd5c2f85cc777f4 Mon Sep 17 00:00:00 2001 From: Ana Scolari <127357173+apsscolari@users.noreply.github.com> Date: Tue, 6 May 2025 18:52:02 -0700 Subject: [PATCH] Create sqli_completed.py --- sqli_completed.py | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 sqli_completed.py diff --git a/sqli_completed.py b/sqli_completed.py new file mode 100644 index 0000000..46d1de8 --- /dev/null +++ b/sqli_completed.py @@ -0,0 +1,47 @@ +import re +import click +from pyspark.sql import SparkSession, functions as f + +@click.command() +@click.option('--target_db', default='curated') +@click.option('--target_table', default='client_communication_preferences_journal') +@click.option('--as_of', required=True) +def main(target_db, target_table, as_of): + # Validate as_of parameter + if not re.match(r'^\d{8}$', as_of): + raise ValueError("Invalid as_of format. Expected YYYYMMDD.") + + # Initialize SparkSession + spark = SparkSession.builder \ + .appName("DBWriteApp") \ + .enableHiveSupport() \ # enable if you are writing to Hive or Spark SQL catalog + .getOrCreate() + + qry = f""" + WITH blueshift_active_email_client_agg AS ( + SELECT client_id, + MAX(last_opened_at) AS last_opened_at, + MIN(first_opened_at) AS first_opened_at + FROM blueshift.campaign_activity_kpis + WHERE DATE(last_opened_at) <= TO_DATE('{as_of}', 'yyyyMMdd') + OR last_opened_at IS NULL + OR DATE(first_opened_at) <= TO_DATE('{as_of}', 'yyyyMMdd') + GROUP BY 1 + ) + -- more CTEs / query goes here + SELECT * FROM blueshift_active_email_client_agg + """ + + # Run the query + df = spark.sql(qry).withColumn('start_date', f.col('first_opened_at').cast('timestamp')) + + # Prepare full table name + full_table_name = f"{target_db}.{target_table}" + + # Save DataFrame to specified table (overwrite, append, etc. as appropriate) + df.write.mode('append').saveAsTable(full_table_name) + + spark.stop() + +if __name__ == "__main__": + main()