Skip to content

[Python] Optimize BigQuery copy jobs in file loads using multi-source copy#38983

Open
stankiewicz wants to merge 4 commits into
apache:masterfrom
stankiewicz:improve_copy_table
Open

[Python] Optimize BigQuery copy jobs in file loads using multi-source copy#38983
stankiewicz wants to merge 4 commits into
apache:masterfrom
stankiewicz:improve_copy_table

Conversation

@stankiewicz

@stankiewicz stankiewicz commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Fixes: #38982

Description

This PR optimizes BigQuery copy jobs in the Python SDK's file loads path by utilizing BigQuery's multi-source copy feature. Instead of triggering a separate copy job for each temporary table, we now group
them and trigger a single copy job with up to 1,200 sources (BigQuery limit).

Key Changes

• Multi-source Copy Support: Updated BigQueryWrapper._insert_copy_job to accept a list of source tables and construct the API request with sourceTables when multiple sources are provided.
• Batch Copying: Re-implemented TriggerCopyJobs to process temporary tables in batch per destination, splitting them into chunks of 1,200 and using multi-source copy.
• Truncate/Empty Handling: Implemented inline wait for the first chunk copy job when using WRITE_TRUNCATE or WRITE_EMPTY with multiple chunks. This ensures the destination table is correctly
initialized (truncated/verified empty) before subsequent chunks append data to it.
• Grouping Key Fix: Fixed a bug in _load_data where tables with the same name in different datasets would be incorrectly grouped together because the grouping key was only tableId . It now uses the full
hashable_destination .
• Test Cleanups & Additions:
• Added test_copy_jobs_splitting to verify the splitting logic and inline wait logic.
• Updated existing tests to match the new behavior (fewer copy jobs).
• Fixed credential warnings in TriggerLoadJobs during testing by using the correct mocked client wrapper.
• Fixed PartitionFiles to avoid generating empty partitions.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

stankiewicz and others added 3 commits June 16, 2026 11:24
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request optimizes BigQuery copy jobs within the Python SDK's file loads path by leveraging BigQuery's multi-source copy feature. By grouping temporary tables into batches of up to 1,200 sources, the change reduces the overhead of triggering individual copy jobs. It also introduces robust handling for write dispositions and fixes a grouping bug to ensure data integrity across different datasets.

Highlights

  • Multi-source Copy Support: Updated BigQueryWrapper._insert_copy_job to support multi-source copy, allowing multiple source tables to be processed in a single BigQuery job.
  • Batch Copying: Re-implemented TriggerCopyJobs to batch temporary tables into chunks of up to 1,200 sources, significantly reducing the number of copy jobs triggered.
  • Truncate/Empty Handling: Implemented an inline wait mechanism for the first chunk when using WRITE_TRUNCATE or WRITE_EMPTY to ensure the destination table is correctly initialized before subsequent chunks append data.
  • Grouping Key Fix: Corrected a bug in _load_data where tables with identical names in different datasets were incorrectly grouped by using the full hashable_destination instead of just the tableId.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

    Updates BigQuery file loads in Python SDK to use multi-source copy jobs
    when copying temporary tables to the final destination table.

    * Update BigQueryWrapper._insert_copy_job to support a list of source
      tables, utilizing BigQuery's multi-source copy capability.
    * Update TriggerCopyJobs to process temporary tables in batch, splitting
      them into chunks of 1,200 (BigQuery limit) and triggering
      multi-source copy jobs.
    * Implement inline wait for the first chunk in TriggerCopyJobs when
      write disposition is WRITE_TRUNCATE or WRITE_EMPTY and there are
      multiple chunks. This ensures the destination table is initialized
      by the first job before subsequent chunks append to it.
    * Fix grouping key in _load_data for WRITE_TRUNCATE/WRITE_EMPTY to use
      the full hashable destination instead of just tableId, preventing
      incorrect grouping of tables with the same name in different datasets.
    * Fix TriggerLoadJobs to use bq_wrapper with mock client in tests,
      resolving credential refresh warnings.
    * Fix PartitionFiles to avoid yielding empty partitions when a file
      exceeds limits.

    TAG=agy
    CONV=126370d2-f42e-4132-a237-16bd5ccf72a3

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for multi-source BigQuery copy jobs by chunking source tables up to a maximum limit of 1200 and utilizing the sourceTables configuration instead of sourceTable. The review feedback highlights three critical issues: first, the removal of self.bq_io_metadata initialization leads to a loss of lineage tracking and job labels; second, a race condition exists where the pipeline does not wait inline for a single-chunk WRITE_TRUNCATE or WRITE_EMPTY job, potentially causing data loss; and third, TriggerDeleteTempTables must be updated to support sourceTables to prevent temporary tables from leaking.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment thread sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Comment thread sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Comment thread sdks/python/apache_beam/io/gcp/bigquery_tools.py

@ahmedabu98 ahmedabu98 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good, just left a few comments/nits

"""

TRIGGER_DELETE_TEMP_TABLES = 'TriggerDeleteTempTables'
MAX_SOURCES_PER_COPY_JOB = 1200

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment referring to the quota doc https://docs.cloud.google.com/bigquery/quotas#copy_jobs?

Comment on lines +552 to +555
full_table_ref = '%s:%s.%s' % (
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe use bigquery_tools.get_hashable_destination(copy_to_reference)

Comment on lines +558 to +565
if is_first_time:
self._observed_tables.add(full_table_ref)
if self.bq_io_metadata:
Lineage.sinks().add(
'bigquery',
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we only track it in _observed_tables it at the end of the process call, after the copy job completes successfully?

Comment on lines 576 to +580
_bq_uuid(
'%s:%s.%s' % (
copy_from_reference.projectId,
copy_from_reference.datasetId,
copy_from_reference.tableId)))
job_reference = self.bq_wrapper._insert_copy_job(
project_id,
copy_job_name,
copy_from_reference,
copy_to_reference,
create_disposition=self.create_disposition,
write_disposition=write_disposition,
job_labels=self.bq_io_metadata.add_additional_bq_job_labels())

if wait_for_job:
self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10)
self.pending_jobs.append(
GlobalWindows.windowed_value((destination, job_reference)))
copy_to_reference.projectId,
copy_to_reference.datasetId,
copy_to_reference.tableId)))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use full_table_ref here

@github-actions

Copy link
Copy Markdown
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature Request]: Multi-Source BigQuery Copy Jobs for Python SDK

2 participants