Skip to content

[SPARK-38101] Make metadata fetch failure repeat the task, not restart stages#55347

Open
EnricoMi wants to merge 2 commits intoapache:masterfrom
G-Research-Forks:metadata-fetch-failure
Open

[SPARK-38101] Make metadata fetch failure repeat the task, not restart stages#55347
EnricoMi wants to merge 2 commits intoapache:masterfrom
G-Research-Forks:metadata-fetch-failure

Conversation

@EnricoMi
Copy link
Copy Markdown
Contributor

@EnricoMi EnricoMi commented Apr 15, 2026

What changes were proposed in this pull request?

Changes handling of MetadataFetchFailedException from immediate stage retry to immediate task retry to recover from task metadata failures. This is repeated at most spark.task.maxFailures (default: 4) before stage and job is aborted.

This fix handles INTERNAL_ERROR_BROADCAST that occur while fetching task metadata more gracefully:

  1. failing to retrieve map statuses via broadcast variable already throws MetadataFetchFailedException: [SPARK-38101] execuors fail fetching map statuses with INTERNAL_ERROR_BROADCAST #54723
  2. failing to retrieve task binary via broadcast variable now also throws MetadataFetchFailedException.
  3. such MetadataFetchFailedException now retries the task, not the full stage.

Adds config option spark.task.maxFailures.countsMetadataFetchFailures (default: true). If false, task re-attempts due to MetadataFetchFailedException do count towards spark.task.maxFailures, and therefore cannot cause stage failures. Tasks with MetadataFetchFailedException are repeated until they succeed or fail for a different reason.

Fixes #54723.

Why are the changes needed?

Currently, MetadataFetchFailedExceptions are handled like FetchFailedExceptions, causing an immediate stage retry of the affected stage and its mapping stage (the stage that produced the shuffle input). After spark.stage.maxConsecutiveAttempts (default: 4) attempts, the respective job is aborted. This is expensive while an immediate retry of the task would fix the problem.

The MetadataFetchFailedException is thrown when

  1. map statuses cannot be retrieved from driver: [SPARK-38101] execuors fail fetching map statuses with INTERNAL_ERROR_BROADCAST #54723
  2. task binary cannot be retrieved from driver
  3. output location are not known:
    def validateStatus(status: ShuffleOutputStatus, shuffleId: Int, partition: Int) : Unit = {
    if (status == null) {
    // scalastyle:off line.size.limit
    val errorMessage = log"Missing an output location for shuffle ${MDC(SHUFFLE_ID, shuffleId)} partition ${MDC(PARTITION_ID, partition)}"
    // scalastyle:on
    logError(errorMessage)
    throw new MetadataFetchFailedException(shuffleId, partition, errorMessage.message)
    }
    }

While 1. and 2. can be recovered by simply retrying the failed task, 3. indicates an issue with the mapping stage (the stage that produced the shuffle input). Therefore, 2. should through a FetchFailedException to repeat the mapping stage as it currently does.

Does this PR introduce any user-facing change?

Adds config option spark.task.maxFailures.countsMetadataFetchFailures.

How was this patch tested?

Was this patch authored or co-authored using generative AI tooling?

No.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[SPARK-38101] execuors fail fetching map statuses with INTERNAL_ERROR_BROADCAST

1 participant