From c3d938c927c58935a3ed6c5fd67364d9a2283b68 Mon Sep 17 00:00:00 2001 From: Rishika Idnani Date: Sat, 21 Mar 2026 11:09:55 -0700 Subject: [PATCH 1/4] Add PySpark ETL best practices cursorrules --- README.md | 1 + .../.cursorrules | 370 ++++++++++++++++++ .../README.md | 22 ++ 3 files changed, 393 insertions(+) create mode 100644 rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules create mode 100644 rules/pyspark-etl-best-practices-cursorrules-prompt-file/README.md diff --git a/README.md b/README.md index 63c6a2ff..3632c7bc 100644 --- a/README.md +++ b/README.md @@ -240,6 +240,7 @@ By creating a `.cursorrules` file in your project's root directory, you can leve - [Python Best Practices](./rules/python-cursorrules-prompt-file-best-practices/.cursorrules) - Cursor rules for Python development with best practices integration. - [Python Developer](./rules/python-developer-cursorrules-prompt-file/.cursorrules) - Cursor rules for Python development with developer integration. - [Python Projects Guide](./rules/python-projects-guide-cursorrules-prompt-file/.cursorrules) - Cursor rules for Python development with projects guide integration. +- [PySpark ETL Best Practices](./rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules) - Cursor rules for PySpark ETL development with code style, joins, window functions, map operations, and Iceberg patterns. - [PyTorch (scikit-learn)](./rules/pytorch-scikit-learn-cursorrules-prompt-file/.cursorrules) - Cursor rules for PyTorch development with scikit-learn integration. - [R Best Practices](./rules/r-cursorrules-prompt-file-best-practices/.cursorrules) - Cursor rules for R development with best practices integration. - [Solidity (Foundry)](./rules/solidity-foundry-cursorrules-prompt-file/.cursorrules) - Cursor rules for Solidity development with Foundry integration. diff --git a/rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules b/rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules new file mode 100644 index 00000000..1a318a9e --- /dev/null +++ b/rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules @@ -0,0 +1,370 @@ +You are an expert in PySpark, Spark SQL, Apache Iceberg, and production data engineering. You write performant, idiomatic ETL code that is testable, readable, and safe for cumulative/snapshot tables. + +Follow these rules when generating or reviewing PySpark code. + +# PySpark ETL Best Practices + +## 1. Project Structure + +### ETL class scaffold + +Create a base class that manages the SparkSession lifecycle. Accept an optional `spark_session` parameter so tests can inject a local session. Use an abstract method for the job logic. + +```python +from abc import ABC, abstractmethod +from pyspark.sql import SparkSession + +class BaseETL(ABC): + def __init__(self, config, app_name="ETL Job", spark_session=None): + self.spark = spark_session or SparkSession.builder.appName(app_name).getOrCreate() + self.config = config + self.logger = logging.getLogger(self.__class__.__name__) + + @abstractmethod + def run_job(self): ... + + def stop(self): + self.spark.stop() +``` + +### Config — use a factory function + +Keep the dataclass as pure data and put CLI parsing in a standalone factory function. This makes configs easy to construct in tests without touching `sys.argv`. + +```python +@dataclass +class MyConfig: + read_date: int = 20200101 + +def create_config() -> MyConfig: + parser = argparse.ArgumentParser() + parser.add_argument("--read_date", type=int, default=20200101) + args = parser.parse_args() + return MyConfig(read_date=args.read_date) +``` + +### Pipeline composition with `.transform()` + +Keep `run_job` as orchestration. Each step is a named method. + +```python +events = self.read_source().transform(self.enrich).transform(self.merge_with_existing) +``` + +### Use a shared reader for partition-aware reads + +Build a generic reader utility that handles partition mechanics (date filters, hour ranges, latest-partition lookups). Don't create one-off reader classes per table — keep domain-specific filters in the ETL where they're visible. + +```python +class PartitionedReader: + @staticmethod + def read_latest(spark, table_name, partition_col): + max_val = spark.read.table(table_name).agg(F.max(partition_col)).first()[0] + return spark.read.table(table_name).filter(F.col(partition_col) == max_val) + + @staticmethod + def read_by_date(spark, table_name, partition_col, date_value): + return spark.read.table(table_name).filter(F.col(partition_col) == date_value) + +# Reader handles partitioning +events = PartitionedReader.read_by_date(spark, "catalog.my_table", "event_date", 20260319) + +# Business filters stay in the ETL +events = events.filter(F.col("event_type").isin("login", "purchase")) +``` + +### Shared merge utilities + +For simple outer-join-with-coalesce merges, build a reusable merge function that handles aliasing, join key coalescing, and per-column defaults. Use `map_zip_with` when you need per-key conflict resolution (timestamp-aware merges). + +## 2. Code Style + +### Use `F.col()` — always use the `F.` prefix + +Import functions as `import pyspark.sql.functions as F` and use `F.col()`, `F.when()`, `F.lit()`, etc. throughout. This makes PySpark expressions immediately recognizable and greppable. + +Avoid `df.colA` attribute access — it binds the column to a specific DataFrame variable, which breaks after joins or when the variable is reassigned. Use `F.col()` with `.alias()` on the DataFrame if disambiguation is needed. + +```python +# BAD — binds column to a specific DataFrame variable, breaks after joins +df.select(F.lower(df1.colA), F.upper(df2.colB)) + +# GOOD +df.select(F.lower(F.col('colA')), F.upper(F.col('colB'))) +``` + +### Extract complex conditions into named variables + +Limit logic inside `.filter()` or `F.when()` to 3 expressions. Extract the rest. + +```python +# BAD — redundant logic hidden in nested parentheses +F.when((F.col('status') == 'Delivered') | (((F.datediff('date_a', 'date_b') < 0) & ...)), 'Active') + +# GOOD +is_delivered = (F.col('status') == 'Delivered') +date_passed = (F.datediff('date_a', 'date_b') < 0) +has_registration = (F.col('registration').rlike('.+')) +F.when(is_delivered | (date_passed & has_registration), 'Active') +``` + +### Prefer `select` over `withColumn` chains + +`select` specifies the output schema in one pass. `withColumn` chains create intermediate DataFrames and can degrade performance — each call triggers a new projection in the query plan. + +```python +# BAD — 3 intermediate DataFrames +df = df.withColumn("a", F.col("a").cast("double")) +df = df.withColumn("b", F.upper(F.col("b"))) +df = df.withColumn("c", F.lit(1)) + +# GOOD — 1 DataFrame, explicit schema contract +df = df.select( + F.col("a").cast("double"), + F.upper(F.col("b")).alias("b"), + F.lit(1).alias("c"), +) +``` + +### Use `alias` over `withColumnRenamed` + +```python +# BAD +df.select('key', 'comments').withColumnRenamed('comments', 'num_comments') + +# GOOD +df.select('key', F.col('comments').alias('num_comments')) +``` + +### Chaining limits + +Max 5 statements per chain. Separate by operation type (select/filter vs withColumn vs join). + +```python +# BAD — mixed concerns in one chain +df = (df.select('a', 'b', 'key') + .filter(F.col('a') == 'x') + .withColumn('ratio', F.col('a') / F.col('b')) + .join(df2, 'key', how='inner') + .drop('b')) + +# GOOD — separated by concern +df = df.select('a', 'b', 'key').filter(F.col('a') == 'x') +df = df.withColumn('ratio', F.col('a') / F.col('b')) +df = df.join(df2, 'key', how='inner').drop('b') +``` + +## 3. Joins + +### Always specify `how=` explicitly + +```python +# BAD +df.join(other, 'key') + +# GOOD +df.join(other, 'key', how='inner') +``` + +### Prefer left joins over right joins + +Flip the DataFrame order and use `left` instead of `right` for readability — the primary DataFrame stays on the left. + +```python +flights = flights.join(aircraft, 'aircraft_id', how='left') +``` + +### Use `.alias()` for disambiguation after joins + +```python +# BAD — renaming every column before join +for c in columns: + flights = flights.withColumnRenamed(c, 'flights_' + c) + +# GOOD — alias the whole DataFrame +flights = flights.alias('f') +parking = parking.alias('p') +result = flights.join(parking, 'code', how='left').select( + F.col('f.start_time').alias('flight_start'), + F.col('p.total_time').alias('parking_total'), +) +``` + +### Broadcast small dimension tables + +When joining a large fact DataFrame with a small lookup/dimension table, wrap the small side in `F.broadcast()` to skip the shuffle on the small side. + +Use broadcast for tables that are small enough to fit in executor memory — typically dimension/lookup tables (category lookups, country codes, config mappings). Spark auto-broadcasts tables under 10MB by default (`spark.sql.autoBroadcastJoinThreshold`), but an explicit hint is useful when Spark can't infer the size (e.g., after filters or transformations). + +To check if a table is broadcast-worthy during development: +- **Spark UI**: after a run, check the SQL tab — scan sizes are shown per table +- **Quick row count**: `spark.read.table("catalog.my_dim").count()` (dev only, not in production code) +- **Query plan**: `df.explain()` — Spark shows `BroadcastHashJoin` if it auto-broadcasts, `SortMergeJoin` if it doesn't + +```python +df.join(F.broadcast(category_dim), 'category_id', how='left') +``` + +### Never use `.dropDuplicates()` as a crutch + +If duplicate rows appear, find the root cause. `.dropDuplicates()` masks the problem and adds shuffle overhead. + +## 4. Window Functions + +### Always specify an explicit frame + +Without a frame, Spark picks one that changes depending on whether `orderBy` is present. + +```python +# BAD — F.sum gives running sum with orderBy, total without. Surprising. +w = W.partitionBy('key').orderBy('num') + +# GOOD — explicit about what you want +w = W.partitionBy('key').orderBy('num').rowsBetween(W.unboundedPreceding, W.unboundedFollowing) +``` + +### `row_number` + filter vs `first` — know the difference + +- `row_number` + filter = **drop rows**, keep the best one +- `first` over window = **overwrite a column value**, keep all rows + +### Use `ignorenulls=True` with `first` and `last` + +Without it, a null in the first row gives null for the entire partition. + +```python +# BAD — returns None if first row is null +F.first('version').over(window) + +# GOOD +F.first('version', ignorenulls=True).over(window) +``` + +### Avoid empty `partitionBy()` + +It forces all data into one partition. Use `.agg()` instead for global aggregations. + +```python +# BAD — single partition, kills performance +w = W.partitionBy() +df.select(F.sum('num').over(w)) + +# GOOD +df.agg(F.sum('num').alias('total')) +``` + +## 5. Map & Array Higher-Order Functions + +### Use `map_zip_with` when merging maps with complex logic + +`map_concat` is fine for simple merges with no key overlap. When you need custom logic per key (e.g., keep the newer timestamp, pick the higher value), use `map_zip_with` — it gives you a per-key merge function instead of blindly letting one side win. + +```python +# BAD — no control over conflict resolution +map_concat(existing_map, new_map) + +# GOOD — keep the entry with the later timestamp +map_zip_with(new_map, existing_map, + lambda key, v1, v2: ( + when(v1.isNull(), v2) + .when(v2.isNull(), v1) + .otherwise(when(v1.event_ts >= v2.event_ts, v1).otherwise(v2)) + ) +) +``` + +### Use `transform` + `array_max` to extract from nested structs + +```python +# Extract max event_ts from a map of structs +array_max(transform(map_values(my_map), lambda x: x.event_ts)) +``` + +### Avoid UDFs — use native Spark functions first + +UDFs break Catalyst optimization and add serialization overhead. Before writing one, check if a built-in Spark function or higher-order function can do the job. + +## 6. Cumulative / Snapshot Table Patterns + +### Merges must be idempotent + +Re-running with the same data should produce the same result, not create duplicates. + +### Merges must be order-independent + +Backfilling old data should not overwrite newer data. Use an explicit ordering criterion (e.g., event timestamp, version number, partition date) to resolve conflicts — don't rely on positional precedence like `coalesce` argument order. + +### Validate primary key uniqueness after writes + +Add audit steps that validate primary key uniqueness and check for nulls in key columns. + +## 7. Data Quality & Performance + +### Use `F.lit(None)` for empty columns, never empty strings + +```python +# BAD +df = df.withColumn('foo', F.lit('')) +df = df.withColumn('foo', F.lit('NA')) + +# GOOD +df = df.withColumn('foo', F.lit(None)) +``` + +### Avoid `.otherwise()` as a general catch-all + +Unknown values silently collapse into the otherwise bucket, hiding data quality issues. + +```python +# BAD — a new platform_type you didn't anticipate becomes "Other" silently +F.when(F.col('platform_type') == 'android', 'Mobile') + .when(F.col('platform_type') == 'ios', 'Mobile') + .otherwise('Other') + +# GOOD — unmapped values stay null, surfacing gaps in your logic +F.when(F.col('platform_type') == 'android', 'Mobile') + .when(F.col('platform_type') == 'ios', 'Mobile') +``` + +### No `.show()`, `.collect()`, `.printSchema()` in production + +These trigger full materialization or add unnecessary driver overhead. Use them only for local debugging, never in deployed ETL code. `.count()` is acceptable when used intentionally (e.g., logging row counts for monitoring, forcing materialization before a DAG fork). + +### Use `persist()` intentionally + +Only persist a DataFrame when it's referenced in multiple subsequent actions — otherwise the write action will materialize it for you. `.persist()` + `.count()` is a common pattern to force materialization and log row counts for debugging; use it when needed but be aware it adds a full scan. + +Choose the storage level based on your use case: +- `MEMORY_AND_DISK` — safe default, spills to disk if memory is tight +- `MEMORY_ONLY` — faster but risks recomputation if evicted +- `DISK_ONLY` — for very large DataFrames that don't fit in memory + +## 8. Iceberg Write Patterns + +### Use `.byName()` for schema evolution safety + +Column ordering doesn't matter — Spark matches by name, not position. + +```python +df.write.byName().mode("overwrite").insertInto("catalog.my_table") +``` + +### Use `__partitions` metadata table for latest partition reads + +Iceberg exposes a `__partitions` metadata table. Use it to find the latest snapshot instead of scanning the full table. + +```python +partition_df = spark.read.table("catalog.my_table__partitions").select( + "partition.partition_date", "partition.partition_hour" +) +max_partition = partition_df.orderBy( + F.col("partition_date").desc(), F.col("partition_hour").desc() +).first() +latest_date = max_partition["partition_date"] +``` + +### Understand `write.distribution-mode` + +- `"none"` — no re-shuffle before writing. Fastest, but output file sizes depend on upstream partitioning. +- `"hash"` — redistributes data by partition key. Produces evenly sized files but adds a shuffle. +- `"range"` — sorts data by partition key before writing. Good for ordered scan performance but most expensive. diff --git a/rules/pyspark-etl-best-practices-cursorrules-prompt-file/README.md b/rules/pyspark-etl-best-practices-cursorrules-prompt-file/README.md new file mode 100644 index 00000000..8cb6add8 --- /dev/null +++ b/rules/pyspark-etl-best-practices-cursorrules-prompt-file/README.md @@ -0,0 +1,22 @@ +# PySpark ETL Best Practices + +Production-tested PySpark & ETL best practices — code style, joins, window functions, map operations, cumulative table patterns, and Iceberg write patterns. + +## Usage + +Copy the `.cursorrules` file to the root of your PySpark project. + +## Rules Summary + +- ETL class scaffold, config factory pattern, pipeline composition with `.transform()` +- `F.col()` prefix convention, named conditions, `select` over `withColumn` +- Explicit `how=` on joins, `.alias()` for disambiguation, `F.broadcast()` for small dims +- Window functions with explicit frames, `ignorenulls=True`, `row_number` vs `first` +- `map_zip_with` for conflict-aware map merges, avoid UDFs +- Idempotent and order-independent cumulative table merges +- Data quality guardrails (`.otherwise()` pitfalls, `F.lit(None)`, intentional `persist()`) +- Iceberg write patterns (`.byName()`, `__partitions` metadata, `write.distribution-mode`) + +## Credits + +Inspired by the [Palantir PySpark Style Guide](https://github.com/palantir/pyspark-style-guide) and production experience debugging data skew, cumulative table merges, and Iceberg write patterns. From 5052287a9418cb4fbf08d152ebde7a41f5959b63 Mon Sep 17 00:00:00 2001 From: Rishika Idnani Date: Sat, 21 Mar 2026 11:18:05 -0700 Subject: [PATCH 2/4] Handle empty table edge case in PartitionedReader.read_latest --- .../.cursorrules | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules b/rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules index 1a318a9e..d17b57b6 100644 --- a/rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules +++ b/rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules @@ -59,8 +59,10 @@ Build a generic reader utility that handles partition mechanics (date filters, h class PartitionedReader: @staticmethod def read_latest(spark, table_name, partition_col): - max_val = spark.read.table(table_name).agg(F.max(partition_col)).first()[0] - return spark.read.table(table_name).filter(F.col(partition_col) == max_val) + row = spark.read.table(table_name).agg(F.max(partition_col)).first() + if row is None or row[0] is None: + return spark.createDataFrame([], spark.read.table(table_name).schema) + return spark.read.table(table_name).filter(F.col(partition_col) == row[0]) @staticmethod def read_by_date(spark, table_name, partition_col, date_value): From 8c3441eca8ac2f09f77fec4666db013cf783b666 Mon Sep 17 00:00:00 2001 From: Rishika Idnani Date: Sat, 21 Mar 2026 11:19:16 -0700 Subject: [PATCH 3/4] Handle F.col --- .../.cursorrules | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules b/rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules index d17b57b6..82470f23 100644 --- a/rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules +++ b/rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules @@ -105,7 +105,7 @@ F.when((F.col('status') == 'Delivered') | (((F.datediff('date_a', 'date_b') < 0) # GOOD is_delivered = (F.col('status') == 'Delivered') -date_passed = (F.datediff('date_a', 'date_b') < 0) +date_passed = (F.datediff(F.col('date_a'), F.col('date_b')) < 0) has_registration = (F.col('registration').rlike('.+')) F.when(is_delivered | (date_passed & has_registration), 'Active') ``` From 3b4df49b028f04b77df6641afde92835c19f5105 Mon Sep 17 00:00:00 2001 From: Rishika Idnani Date: Sat, 21 Mar 2026 11:21:23 -0700 Subject: [PATCH 4/4] Add PySpark ETL best practices cursorrules --- .../.cursorrules | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules b/rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules index 82470f23..babac361 100644 --- a/rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules +++ b/rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules @@ -213,6 +213,8 @@ If duplicate rows appear, find the root cause. `.dropDuplicates()` masks the pro ## 4. Window Functions +Use `from pyspark.sql import Window as W` alongside `import pyspark.sql.functions as F`. + ### Always specify an explicit frame Without a frame, Spark picks one that changes depending on whether `orderBy` is present. @@ -268,9 +270,9 @@ map_concat(existing_map, new_map) # GOOD — keep the entry with the later timestamp map_zip_with(new_map, existing_map, lambda key, v1, v2: ( - when(v1.isNull(), v2) + F.when(v1.isNull(), v2) .when(v2.isNull(), v1) - .otherwise(when(v1.event_ts >= v2.event_ts, v1).otherwise(v2)) + .otherwise(F.when(v1.event_ts >= v2.event_ts, v1).otherwise(v2)) ) ) ``` @@ -362,6 +364,8 @@ partition_df = spark.read.table("catalog.my_table__partitions").select( max_partition = partition_df.orderBy( F.col("partition_date").desc(), F.col("partition_hour").desc() ).first() +if max_partition is None: + raise ValueError("No partitions found in catalog.my_table") latest_date = max_partition["partition_date"] ```