Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
## Bugfixes

* Fixed FirestoreV1 Beam connectors allow configuring inconsistent project/database IDs between RPC requests and routing headers #36895 (Java) ([#36895](https://github.com/apache/beam/issues/36895)).
* Fixed BigQuery client not receiving explicit project ID, causing failures in cross-project and impersonation scenarios (Python) ([#36857](https://github.com/apache/beam/issues/36857)).

## Known Issues

Expand Down
17 changes: 13 additions & 4 deletions sdks/python/apache_beam/io/gcp/bigquery_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
from apache_beam.metrics import monitoring_infos
from apache_beam.metrics.metric import Metrics
from apache_beam.options import value_provider
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.transforms import DoFn
from apache_beam.typehints.row_type import RowTypeConstraint
Expand Down Expand Up @@ -360,10 +361,18 @@ class BigQueryWrapper(object):
HISTOGRAM_METRIC_LOGGER = MetricLogger()

def __init__(self, client=None, temp_dataset_id=None, temp_table_ref=None):
self.client = client or BigQueryWrapper._bigquery_client(PipelineOptions())
self.gcp_bq_client = client or gcp_bigquery.Client(
client_info=ClientInfo(
user_agent="apache-beam-%s" % apache_beam.__version__))
if client is not None:
self.client = client
self.gcp_bq_client = client
else:
pipeline_options = PipelineOptions()
self.client = BigQueryWrapper._bigquery_client(pipeline_options)

project = pipeline_options.view_as(GoogleCloudOptions).project
self.gcp_bq_client = gcp_bigquery.Client(
project=project,
client_info=ClientInfo(
user_agent="apache-beam-%s" % apache_beam.__version__))

self._unique_row_id = 0
# For testing scenarios where we pass in a client we do not want a
Expand Down
24 changes: 24 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_tools_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,30 @@ def test_user_agent_insert_all(
call = http_mock.request.mock_calls[-2]
self.assertIn('apache-beam-', call[2]['headers']['User-Agent'])

def test_wrapper_uses_injected_client_for_both_low_and_high_level(self):
"""If a client is provided, it should be used for both clients."""
client = mock.Mock()
wrapper = beam.io.gcp.bigquery_tools.BigQueryWrapper(client)

# Both low-level and high-level clients should be exactly the injected one.
self.assertIs(wrapper.client, client)
self.assertIs(wrapper.gcp_bq_client, client)

@unittest.skipIf(
beam.io.gcp.bigquery_tools.gcp_bigquery is None,
"bigquery library not available in this env")
@mock.patch('apache_beam.io.gcp.bigquery_tools.gcp_bigquery.Client')
def test_gcp_bigquery_client_is_constructed_with_project_kwarg(
self, mock_bq_client_ctor):
"""BigQueryWrapper should always pass an explicit project kwarg."""
# Constructing the wrapper without an injected client should create a
# google.cloud.bigquery.Client with an explicit "project" keyword arg,
# even if its value is None.
beam.io.gcp.bigquery_tools.BigQueryWrapper()

_, kwargs = mock_bq_client_ctor.call_args
self.assertIn('project', kwargs)

# the function create_temporary_dataset() in the wrapper does not call
# google.cloud.bigquery, so it is fine to just mock it
@mock.patch(
Expand Down
Loading