[Python] Optimize BigQuery copy jobs in file loads using multi-source copy#38983
[Python] Optimize BigQuery copy jobs in file loads using multi-source copy#38983stankiewicz wants to merge 4 commits into
Conversation
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request optimizes BigQuery copy jobs within the Python SDK's file loads path by leveraging BigQuery's multi-source copy feature. By grouping temporary tables into batches of up to 1,200 sources, the change reduces the overhead of triggering individual copy jobs. It also introduces robust handling for write dispositions and fixes a grouping bug to ensure data integrity across different datasets. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
Updates BigQuery file loads in Python SDK to use multi-source copy jobs
when copying temporary tables to the final destination table.
* Update BigQueryWrapper._insert_copy_job to support a list of source
tables, utilizing BigQuery's multi-source copy capability.
* Update TriggerCopyJobs to process temporary tables in batch, splitting
them into chunks of 1,200 (BigQuery limit) and triggering
multi-source copy jobs.
* Implement inline wait for the first chunk in TriggerCopyJobs when
write disposition is WRITE_TRUNCATE or WRITE_EMPTY and there are
multiple chunks. This ensures the destination table is initialized
by the first job before subsequent chunks append to it.
* Fix grouping key in _load_data for WRITE_TRUNCATE/WRITE_EMPTY to use
the full hashable destination instead of just tableId, preventing
incorrect grouping of tables with the same name in different datasets.
* Fix TriggerLoadJobs to use bq_wrapper with mock client in tests,
resolving credential refresh warnings.
* Fix PartitionFiles to avoid yielding empty partitions when a file
exceeds limits.
TAG=agy
CONV=126370d2-f42e-4132-a237-16bd5ccf72a3
898dca4 to
af3b026
Compare
There was a problem hiding this comment.
Code Review
This pull request introduces support for multi-source BigQuery copy jobs by chunking source tables up to a maximum limit of 1200 and utilizing the sourceTables configuration instead of sourceTable. The review feedback highlights three critical issues: first, the removal of self.bq_io_metadata initialization leads to a loss of lineage tracking and job labels; second, a race condition exists where the pipeline does not wait inline for a single-chunk WRITE_TRUNCATE or WRITE_EMPTY job, potentially causing data loss; and third, TriggerDeleteTempTables must be updated to support sourceTables to prevent temporary tables from leaking.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
ahmedabu98
left a comment
There was a problem hiding this comment.
Mostly looks good, just left a few comments/nits
| """ | ||
|
|
||
| TRIGGER_DELETE_TEMP_TABLES = 'TriggerDeleteTempTables' | ||
| MAX_SOURCES_PER_COPY_JOB = 1200 |
There was a problem hiding this comment.
Add a comment referring to the quota doc https://docs.cloud.google.com/bigquery/quotas#copy_jobs?
| full_table_ref = '%s:%s.%s' % ( | ||
| copy_to_reference.projectId, | ||
| copy_to_reference.datasetId, | ||
| copy_to_reference.tableId) |
There was a problem hiding this comment.
nit: maybe use bigquery_tools.get_hashable_destination(copy_to_reference)
| if is_first_time: | ||
| self._observed_tables.add(full_table_ref) | ||
| if self.bq_io_metadata: | ||
| Lineage.sinks().add( | ||
| 'bigquery', | ||
| copy_to_reference.projectId, | ||
| copy_to_reference.datasetId, | ||
| copy_to_reference.tableId) |
There was a problem hiding this comment.
should we only track it in _observed_tables it at the end of the process call, after the copy job completes successfully?
| _bq_uuid( | ||
| '%s:%s.%s' % ( | ||
| copy_from_reference.projectId, | ||
| copy_from_reference.datasetId, | ||
| copy_from_reference.tableId))) | ||
| job_reference = self.bq_wrapper._insert_copy_job( | ||
| project_id, | ||
| copy_job_name, | ||
| copy_from_reference, | ||
| copy_to_reference, | ||
| create_disposition=self.create_disposition, | ||
| write_disposition=write_disposition, | ||
| job_labels=self.bq_io_metadata.add_additional_bq_job_labels()) | ||
|
|
||
| if wait_for_job: | ||
| self.bq_wrapper.wait_for_bq_job(job_reference, sleep_duration_sec=10) | ||
| self.pending_jobs.append( | ||
| GlobalWindows.windowed_value((destination, job_reference))) | ||
| copy_to_reference.projectId, | ||
| copy_to_reference.datasetId, | ||
| copy_to_reference.tableId))) |
There was a problem hiding this comment.
nit: use full_table_ref here
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
Fixes: #38982
Description
This PR optimizes BigQuery copy jobs in the Python SDK's file loads path by utilizing BigQuery's multi-source copy feature. Instead of triggering a separate copy job for each temporary table, we now group
them and trigger a single copy job with up to 1,200 sources (BigQuery limit).
Key Changes
• Multi-source Copy Support: Updated
BigQueryWrapper._insert_copy_jobto accept a list of source tables and construct the API request with sourceTables when multiple sources are provided.• Batch Copying: Re-implemented
TriggerCopyJobsto process temporary tables in batch per destination, splitting them into chunks of 1,200 and using multi-source copy.• Truncate/Empty Handling: Implemented inline wait for the first chunk copy job when using
WRITE_TRUNCATEorWRITE_EMPTYwith multiple chunks. This ensures the destination table is correctlyinitialized (truncated/verified empty) before subsequent chunks append data to it.
• Grouping Key Fix: Fixed a bug in
_load_datawhere tables with the same name in different datasets would be incorrectly grouped together because the grouping key was onlytableId. It now uses the fullhashable_destination.• Test Cleanups & Additions:
• Added
test_copy_jobs_splittingto verify the splitting logic and inline wait logic.• Updated existing tests to match the new behavior (fewer copy jobs).
• Fixed credential warnings in
TriggerLoadJobsduring testing by using the correct mocked client wrapper.• Fixed
PartitionFilesto avoid generating empty partitions.Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.