-
-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Add PySpark ETL best practices cursorrules #203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
rishikaidnani
wants to merge
4
commits into
PatrickJS:main
Choose a base branch
from
rishikaidnani:add-pyspark-etl-best-practices
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
376 changes: 376 additions & 0 deletions
376
rules/pyspark-etl-best-practices-cursorrules-prompt-file/.cursorrules
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,376 @@ | ||
| You are an expert in PySpark, Spark SQL, Apache Iceberg, and production data engineering. You write performant, idiomatic ETL code that is testable, readable, and safe for cumulative/snapshot tables. | ||
|
|
||
| Follow these rules when generating or reviewing PySpark code. | ||
|
|
||
| # PySpark ETL Best Practices | ||
|
|
||
| ## 1. Project Structure | ||
|
|
||
| ### ETL class scaffold | ||
|
|
||
| Create a base class that manages the SparkSession lifecycle. Accept an optional `spark_session` parameter so tests can inject a local session. Use an abstract method for the job logic. | ||
|
|
||
| ```python | ||
| from abc import ABC, abstractmethod | ||
| from pyspark.sql import SparkSession | ||
|
|
||
| class BaseETL(ABC): | ||
| def __init__(self, config, app_name="ETL Job", spark_session=None): | ||
| self.spark = spark_session or SparkSession.builder.appName(app_name).getOrCreate() | ||
| self.config = config | ||
| self.logger = logging.getLogger(self.__class__.__name__) | ||
|
|
||
| @abstractmethod | ||
| def run_job(self): ... | ||
|
|
||
| def stop(self): | ||
| self.spark.stop() | ||
| ``` | ||
|
|
||
| ### Config — use a factory function | ||
|
|
||
| Keep the dataclass as pure data and put CLI parsing in a standalone factory function. This makes configs easy to construct in tests without touching `sys.argv`. | ||
|
|
||
| ```python | ||
| @dataclass | ||
| class MyConfig: | ||
| read_date: int = 20200101 | ||
|
|
||
| def create_config() -> MyConfig: | ||
| parser = argparse.ArgumentParser() | ||
| parser.add_argument("--read_date", type=int, default=20200101) | ||
| args = parser.parse_args() | ||
| return MyConfig(read_date=args.read_date) | ||
| ``` | ||
|
|
||
| ### Pipeline composition with `.transform()` | ||
|
|
||
| Keep `run_job` as orchestration. Each step is a named method. | ||
|
|
||
| ```python | ||
| events = self.read_source().transform(self.enrich).transform(self.merge_with_existing) | ||
| ``` | ||
|
|
||
| ### Use a shared reader for partition-aware reads | ||
|
|
||
| Build a generic reader utility that handles partition mechanics (date filters, hour ranges, latest-partition lookups). Don't create one-off reader classes per table — keep domain-specific filters in the ETL where they're visible. | ||
|
|
||
| ```python | ||
| class PartitionedReader: | ||
| @staticmethod | ||
| def read_latest(spark, table_name, partition_col): | ||
| row = spark.read.table(table_name).agg(F.max(partition_col)).first() | ||
| if row is None or row[0] is None: | ||
| return spark.createDataFrame([], spark.read.table(table_name).schema) | ||
| return spark.read.table(table_name).filter(F.col(partition_col) == row[0]) | ||
|
|
||
| @staticmethod | ||
| def read_by_date(spark, table_name, partition_col, date_value): | ||
| return spark.read.table(table_name).filter(F.col(partition_col) == date_value) | ||
|
|
||
| # Reader handles partitioning | ||
| events = PartitionedReader.read_by_date(spark, "catalog.my_table", "event_date", 20260319) | ||
|
|
||
| # Business filters stay in the ETL | ||
| events = events.filter(F.col("event_type").isin("login", "purchase")) | ||
| ``` | ||
|
|
||
| ### Shared merge utilities | ||
|
|
||
| For simple outer-join-with-coalesce merges, build a reusable merge function that handles aliasing, join key coalescing, and per-column defaults. Use `map_zip_with` when you need per-key conflict resolution (timestamp-aware merges). | ||
|
|
||
| ## 2. Code Style | ||
|
|
||
| ### Use `F.col()` — always use the `F.` prefix | ||
|
|
||
| Import functions as `import pyspark.sql.functions as F` and use `F.col()`, `F.when()`, `F.lit()`, etc. throughout. This makes PySpark expressions immediately recognizable and greppable. | ||
|
|
||
| Avoid `df.colA` attribute access — it binds the column to a specific DataFrame variable, which breaks after joins or when the variable is reassigned. Use `F.col()` with `.alias()` on the DataFrame if disambiguation is needed. | ||
|
|
||
| ```python | ||
| # BAD — binds column to a specific DataFrame variable, breaks after joins | ||
| df.select(F.lower(df1.colA), F.upper(df2.colB)) | ||
|
|
||
| # GOOD | ||
| df.select(F.lower(F.col('colA')), F.upper(F.col('colB'))) | ||
| ``` | ||
|
|
||
| ### Extract complex conditions into named variables | ||
|
|
||
| Limit logic inside `.filter()` or `F.when()` to 3 expressions. Extract the rest. | ||
|
|
||
| ```python | ||
| # BAD — redundant logic hidden in nested parentheses | ||
| F.when((F.col('status') == 'Delivered') | (((F.datediff('date_a', 'date_b') < 0) & ...)), 'Active') | ||
|
|
||
| # GOOD | ||
| is_delivered = (F.col('status') == 'Delivered') | ||
| date_passed = (F.datediff(F.col('date_a'), F.col('date_b')) < 0) | ||
| has_registration = (F.col('registration').rlike('.+')) | ||
| F.when(is_delivered | (date_passed & has_registration), 'Active') | ||
| ``` | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| ### Prefer `select` over `withColumn` chains | ||
|
|
||
| `select` specifies the output schema in one pass. `withColumn` chains create intermediate DataFrames and can degrade performance — each call triggers a new projection in the query plan. | ||
|
|
||
| ```python | ||
| # BAD — 3 intermediate DataFrames | ||
| df = df.withColumn("a", F.col("a").cast("double")) | ||
| df = df.withColumn("b", F.upper(F.col("b"))) | ||
| df = df.withColumn("c", F.lit(1)) | ||
|
|
||
| # GOOD — 1 DataFrame, explicit schema contract | ||
| df = df.select( | ||
| F.col("a").cast("double"), | ||
| F.upper(F.col("b")).alias("b"), | ||
| F.lit(1).alias("c"), | ||
| ) | ||
| ``` | ||
|
|
||
| ### Use `alias` over `withColumnRenamed` | ||
|
|
||
| ```python | ||
| # BAD | ||
| df.select('key', 'comments').withColumnRenamed('comments', 'num_comments') | ||
|
|
||
| # GOOD | ||
| df.select('key', F.col('comments').alias('num_comments')) | ||
| ``` | ||
|
|
||
| ### Chaining limits | ||
|
|
||
| Max 5 statements per chain. Separate by operation type (select/filter vs withColumn vs join). | ||
|
|
||
| ```python | ||
| # BAD — mixed concerns in one chain | ||
| df = (df.select('a', 'b', 'key') | ||
| .filter(F.col('a') == 'x') | ||
| .withColumn('ratio', F.col('a') / F.col('b')) | ||
| .join(df2, 'key', how='inner') | ||
| .drop('b')) | ||
|
|
||
| # GOOD — separated by concern | ||
| df = df.select('a', 'b', 'key').filter(F.col('a') == 'x') | ||
| df = df.withColumn('ratio', F.col('a') / F.col('b')) | ||
| df = df.join(df2, 'key', how='inner').drop('b') | ||
| ``` | ||
|
|
||
| ## 3. Joins | ||
|
|
||
| ### Always specify `how=` explicitly | ||
|
|
||
| ```python | ||
| # BAD | ||
| df.join(other, 'key') | ||
|
|
||
| # GOOD | ||
| df.join(other, 'key', how='inner') | ||
| ``` | ||
|
|
||
| ### Prefer left joins over right joins | ||
|
|
||
| Flip the DataFrame order and use `left` instead of `right` for readability — the primary DataFrame stays on the left. | ||
|
|
||
| ```python | ||
| flights = flights.join(aircraft, 'aircraft_id', how='left') | ||
| ``` | ||
|
|
||
| ### Use `.alias()` for disambiguation after joins | ||
|
|
||
| ```python | ||
| # BAD — renaming every column before join | ||
| for c in columns: | ||
| flights = flights.withColumnRenamed(c, 'flights_' + c) | ||
|
|
||
| # GOOD — alias the whole DataFrame | ||
| flights = flights.alias('f') | ||
| parking = parking.alias('p') | ||
| result = flights.join(parking, 'code', how='left').select( | ||
| F.col('f.start_time').alias('flight_start'), | ||
| F.col('p.total_time').alias('parking_total'), | ||
| ) | ||
| ``` | ||
|
|
||
| ### Broadcast small dimension tables | ||
|
|
||
| When joining a large fact DataFrame with a small lookup/dimension table, wrap the small side in `F.broadcast()` to skip the shuffle on the small side. | ||
|
|
||
| Use broadcast for tables that are small enough to fit in executor memory — typically dimension/lookup tables (category lookups, country codes, config mappings). Spark auto-broadcasts tables under 10MB by default (`spark.sql.autoBroadcastJoinThreshold`), but an explicit hint is useful when Spark can't infer the size (e.g., after filters or transformations). | ||
|
|
||
| To check if a table is broadcast-worthy during development: | ||
| - **Spark UI**: after a run, check the SQL tab — scan sizes are shown per table | ||
| - **Quick row count**: `spark.read.table("catalog.my_dim").count()` (dev only, not in production code) | ||
| - **Query plan**: `df.explain()` — Spark shows `BroadcastHashJoin` if it auto-broadcasts, `SortMergeJoin` if it doesn't | ||
|
|
||
| ```python | ||
| df.join(F.broadcast(category_dim), 'category_id', how='left') | ||
| ``` | ||
|
|
||
| ### Never use `.dropDuplicates()` as a crutch | ||
|
|
||
| If duplicate rows appear, find the root cause. `.dropDuplicates()` masks the problem and adds shuffle overhead. | ||
|
|
||
| ## 4. Window Functions | ||
|
|
||
| Use `from pyspark.sql import Window as W` alongside `import pyspark.sql.functions as F`. | ||
|
|
||
| ### Always specify an explicit frame | ||
|
|
||
| Without a frame, Spark picks one that changes depending on whether `orderBy` is present. | ||
|
|
||
| ```python | ||
| # BAD — F.sum gives running sum with orderBy, total without. Surprising. | ||
| w = W.partitionBy('key').orderBy('num') | ||
|
|
||
| # GOOD — explicit about what you want | ||
| w = W.partitionBy('key').orderBy('num').rowsBetween(W.unboundedPreceding, W.unboundedFollowing) | ||
| ``` | ||
|
|
||
| ### `row_number` + filter vs `first` — know the difference | ||
|
|
||
| - `row_number` + filter = **drop rows**, keep the best one | ||
| - `first` over window = **overwrite a column value**, keep all rows | ||
|
|
||
| ### Use `ignorenulls=True` with `first` and `last` | ||
|
|
||
| Without it, a null in the first row gives null for the entire partition. | ||
|
|
||
| ```python | ||
| # BAD — returns None if first row is null | ||
| F.first('version').over(window) | ||
|
|
||
| # GOOD | ||
| F.first('version', ignorenulls=True).over(window) | ||
| ``` | ||
|
|
||
| ### Avoid empty `partitionBy()` | ||
|
|
||
| It forces all data into one partition. Use `.agg()` instead for global aggregations. | ||
|
|
||
| ```python | ||
| # BAD — single partition, kills performance | ||
| w = W.partitionBy() | ||
| df.select(F.sum('num').over(w)) | ||
|
|
||
| # GOOD | ||
| df.agg(F.sum('num').alias('total')) | ||
| ``` | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| ## 5. Map & Array Higher-Order Functions | ||
|
|
||
| ### Use `map_zip_with` when merging maps with complex logic | ||
|
|
||
| `map_concat` is fine for simple merges with no key overlap. When you need custom logic per key (e.g., keep the newer timestamp, pick the higher value), use `map_zip_with` — it gives you a per-key merge function instead of blindly letting one side win. | ||
|
|
||
| ```python | ||
| # BAD — no control over conflict resolution | ||
| map_concat(existing_map, new_map) | ||
|
|
||
| # GOOD — keep the entry with the later timestamp | ||
| map_zip_with(new_map, existing_map, | ||
| lambda key, v1, v2: ( | ||
| F.when(v1.isNull(), v2) | ||
| .when(v2.isNull(), v1) | ||
| .otherwise(F.when(v1.event_ts >= v2.event_ts, v1).otherwise(v2)) | ||
| ) | ||
| ) | ||
| ``` | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| ### Use `transform` + `array_max` to extract from nested structs | ||
|
|
||
| ```python | ||
| # Extract max event_ts from a map of structs | ||
| array_max(transform(map_values(my_map), lambda x: x.event_ts)) | ||
| ``` | ||
|
|
||
| ### Avoid UDFs — use native Spark functions first | ||
|
|
||
| UDFs break Catalyst optimization and add serialization overhead. Before writing one, check if a built-in Spark function or higher-order function can do the job. | ||
|
|
||
| ## 6. Cumulative / Snapshot Table Patterns | ||
|
|
||
| ### Merges must be idempotent | ||
|
|
||
| Re-running with the same data should produce the same result, not create duplicates. | ||
|
|
||
| ### Merges must be order-independent | ||
|
|
||
| Backfilling old data should not overwrite newer data. Use an explicit ordering criterion (e.g., event timestamp, version number, partition date) to resolve conflicts — don't rely on positional precedence like `coalesce` argument order. | ||
|
|
||
| ### Validate primary key uniqueness after writes | ||
|
|
||
| Add audit steps that validate primary key uniqueness and check for nulls in key columns. | ||
|
|
||
| ## 7. Data Quality & Performance | ||
|
|
||
| ### Use `F.lit(None)` for empty columns, never empty strings | ||
|
|
||
| ```python | ||
| # BAD | ||
| df = df.withColumn('foo', F.lit('')) | ||
| df = df.withColumn('foo', F.lit('NA')) | ||
|
|
||
| # GOOD | ||
| df = df.withColumn('foo', F.lit(None)) | ||
| ``` | ||
|
|
||
| ### Avoid `.otherwise()` as a general catch-all | ||
|
|
||
| Unknown values silently collapse into the otherwise bucket, hiding data quality issues. | ||
|
|
||
| ```python | ||
| # BAD — a new platform_type you didn't anticipate becomes "Other" silently | ||
| F.when(F.col('platform_type') == 'android', 'Mobile') | ||
| .when(F.col('platform_type') == 'ios', 'Mobile') | ||
| .otherwise('Other') | ||
|
|
||
| # GOOD — unmapped values stay null, surfacing gaps in your logic | ||
| F.when(F.col('platform_type') == 'android', 'Mobile') | ||
| .when(F.col('platform_type') == 'ios', 'Mobile') | ||
| ``` | ||
|
|
||
| ### No `.show()`, `.collect()`, `.printSchema()` in production | ||
|
|
||
| These trigger full materialization or add unnecessary driver overhead. Use them only for local debugging, never in deployed ETL code. `.count()` is acceptable when used intentionally (e.g., logging row counts for monitoring, forcing materialization before a DAG fork). | ||
|
|
||
| ### Use `persist()` intentionally | ||
|
|
||
| Only persist a DataFrame when it's referenced in multiple subsequent actions — otherwise the write action will materialize it for you. `.persist()` + `.count()` is a common pattern to force materialization and log row counts for debugging; use it when needed but be aware it adds a full scan. | ||
|
|
||
| Choose the storage level based on your use case: | ||
| - `MEMORY_AND_DISK` — safe default, spills to disk if memory is tight | ||
| - `MEMORY_ONLY` — faster but risks recomputation if evicted | ||
| - `DISK_ONLY` — for very large DataFrames that don't fit in memory | ||
|
|
||
| ## 8. Iceberg Write Patterns | ||
|
|
||
| ### Use `.byName()` for schema evolution safety | ||
|
|
||
| Column ordering doesn't matter — Spark matches by name, not position. | ||
|
|
||
| ```python | ||
| df.write.byName().mode("overwrite").insertInto("catalog.my_table") | ||
| ``` | ||
|
|
||
| ### Use `__partitions` metadata table for latest partition reads | ||
|
|
||
| Iceberg exposes a `__partitions` metadata table. Use it to find the latest snapshot instead of scanning the full table. | ||
|
|
||
| ```python | ||
| partition_df = spark.read.table("catalog.my_table__partitions").select( | ||
| "partition.partition_date", "partition.partition_hour" | ||
| ) | ||
| max_partition = partition_df.orderBy( | ||
| F.col("partition_date").desc(), F.col("partition_hour").desc() | ||
| ).first() | ||
| if max_partition is None: | ||
| raise ValueError("No partitions found in catalog.my_table") | ||
| latest_date = max_partition["partition_date"] | ||
| ``` | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| ### Understand `write.distribution-mode` | ||
|
|
||
| - `"none"` — no re-shuffle before writing. Fastest, but output file sizes depend on upstream partitioning. | ||
| - `"hash"` — redistributes data by partition key. Produces evenly sized files but adds a shuffle. | ||
| - `"range"` — sorts data by partition key before writing. Good for ordered scan performance but most expensive. | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.