From cdcb384cde5d1b741f9c44889bc4ae6085d9bd3b Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 6 Feb 2026 11:46:21 +0100 Subject: [PATCH 1/2] Fix temp table cleanup for continuous read in BigQueryIO --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 239 ++++++++++++++++-- .../bigquery/BigQueryStorageStreamSource.java | 21 +- .../bigquery/CleanupTempTableDoFnTest.java | 103 ++++++++ 3 files changed, 343 insertions(+), 20 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/CleanupTempTableDoFnTest.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 7c0ab785ae7e..6fb115577368 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference; import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.BAD_RECORD_TAG; import static org.apache.beam.sdk.transforms.errorhandling.BadRecordRouter.RECORDING_ROUTER; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; @@ -114,9 +115,14 @@ import org.apache.beam.sdk.schemas.FieldAccessDescriptor; import org.apache.beam.sdk.schemas.ProjectionProducer; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; +import org.apache.beam.sdk.transforms.DoFn.StateId; +import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -138,6 +144,7 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.Row; @@ -932,11 +939,18 @@ abstract Builder setBadRecordErrorHandler( DynamicRead() {} class CreateBoundedSourceForTable - extends DoFn, BigQueryStorageStreamSource> { + extends DoFn< + KV, KV>> { + + private final TupleTag> cleanupInfoTag; + + CreateBoundedSourceForTable(TupleTag> cleanupInfoTag) { + this.cleanupInfoTag = cleanupInfoTag; + } @ProcessElement public void processElement( - OutputReceiver> receiver, + MultiOutputReceiver receiver, @Element KV kv, PipelineOptions options) throws Exception { @@ -961,7 +975,14 @@ public void processElement( // shards long desiredChunkSize = getDesiredChunkSize(options, output); List> split = output.split(desiredChunkSize, options); - split.stream().forEach(source -> receiver.output(source)); + split.stream() + .forEach( + source -> + receiver + .get( + new TupleTag>>( + "mainOutput")) + .output(KV.of(kv.getKey(), source))); } else { // run query BigQueryStorageQuerySource querySource = @@ -997,7 +1018,21 @@ public void processElement( // shards long desiredChunkSize = getDesiredChunkSize(options, output); List> split = output.split(desiredChunkSize, options); - split.stream().forEach(source -> receiver.output(source)); + split.stream() + .forEach( + source -> + receiver + .get( + new TupleTag>>( + "mainOutput")) + .output(KV.of(kv.getKey(), source.withFromQuery()))); + boolean datasetCreatedByBeam = getQueryTempDataset() == null; + CleanupInfo cleanupInfo = + new CleanupInfo( + queryResultTable.getTableReference(), datasetCreatedByBeam, split.size()); + receiver + .get(cleanupInfoTag) + .output(KV.of(kv.getKey(), CleanupOperationMessage.initialize(cleanupInfo))); } } @@ -1010,6 +1045,9 @@ private long getDesiredChunkSize( @Override public PCollection expand(PCollection input) { TupleTag rowTag = new TupleTag<>(); + TupleTag>> streamTag = new TupleTag<>("mainOutput"); + TupleTag> cleanupInfoTag = new TupleTag<>(); + PCollection> addJobId = input .apply( @@ -1024,25 +1062,194 @@ public String apply(BigQueryDynamicReadDescriptor input) { .apply("Checkpoint", Redistribute.byKey()); PCollectionTuple resultTuple = - addJobId - .apply("Create streams", ParDo.of(new CreateBoundedSourceForTable())) + addJobId.apply( + "Create streams", + ParDo.of(new CreateBoundedSourceForTable(cleanupInfoTag)) + .withOutputTags(streamTag, TupleTagList.of(cleanupInfoTag))); + + PCollection>> streams = + resultTuple + .get(streamTag) .setCoder( - SerializableCoder.of(new TypeDescriptor>() {})) - .apply("Redistribute", Redistribute.arbitrarily()) - .apply( - "Read Streams with storage read api", - ParDo.of( - new TypedRead.ReadTableSource( - rowTag, getParseFn(), getBadRecordRouter())) - .withOutputTags(rowTag, TupleTagList.of(BAD_RECORD_TAG))); + KvCoder.of( + StringUtf8Coder.of(), + SerializableCoder.of( + new TypeDescriptor>() {}))) + .apply("Redistribute", Redistribute.arbitrarily()); + + PCollectionTuple readResultTuple = + streams.apply( + "Read Streams with storage read api", + ParDo.of( + new ReadDynamicStreamSource( + rowTag, getParseFn(), getBadRecordRouter(), cleanupInfoTag)) + .withOutputTags(rowTag, TupleTagList.of(BAD_RECORD_TAG).and(cleanupInfoTag))); + + PCollectionList.of(resultTuple.get(cleanupInfoTag)) + .and(readResultTuple.get(cleanupInfoTag)) + .apply(Flatten.pCollections()) + .apply("CleanupTempTables", ParDo.of(new CleanupTempTableDoFn(getBigQueryServices()))); + getBadRecordErrorHandler() .addErrorCollection( - resultTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline()))); - return resultTuple.get(rowTag).setCoder(getOutputCoder()); + readResultTuple + .get(BAD_RECORD_TAG) + .setCoder(BadRecord.getCoder(input.getPipeline()))); + return readResultTuple.get(rowTag).setCoder(getOutputCoder()); } } /** Implementation of {@link BigQueryIO#read()}. */ + static class CleanupInfo implements Serializable { + private final String projectId; + private final String datasetId; + private final String tableId; + private final boolean datasetCreatedByBeam; + private final int totalStreams; + + public CleanupInfo(TableReference tableRef, boolean datasetCreatedByBeam, int totalStreams) { + if (tableRef != null) { + this.projectId = tableRef.getProjectId(); + this.datasetId = tableRef.getDatasetId(); + this.tableId = tableRef.getTableId(); + } else { + this.projectId = null; + this.datasetId = null; + this.tableId = null; + } + this.datasetCreatedByBeam = datasetCreatedByBeam; + this.totalStreams = totalStreams; + } + + public TableReference getTableReference() { + if (projectId == null || datasetId == null || tableId == null) { + return null; + } + return new TableReference() + .setProjectId(projectId) + .setDatasetId(datasetId) + .setTableId(tableId); + } + + public boolean isDatasetCreatedByBeam() { + return datasetCreatedByBeam; + } + + public int getTotalStreams() { + return totalStreams; + } + } + + static class CleanupOperationMessage implements Serializable { + private final @Nullable CleanupInfo cleanupInfo; + private final boolean isStreamCompletion; + + private CleanupOperationMessage(@Nullable CleanupInfo cleanupInfo, boolean isStreamCompletion) { + this.cleanupInfo = cleanupInfo; + this.isStreamCompletion = isStreamCompletion; + } + + public static CleanupOperationMessage streamComplete() { + return new CleanupOperationMessage(null, true); + } + + public static CleanupOperationMessage initialize(CleanupInfo cleanupInfo) { + return new CleanupOperationMessage(cleanupInfo, false); + } + + public @Nullable CleanupInfo getCleanupInfo() { + return cleanupInfo; + } + + public boolean isStreamCompletion() { + return isStreamCompletion; + } + } + + static class CleanupTempTableDoFn extends DoFn, Void> { + private final BigQueryServices bqServices; + private static final Logger LOG = LoggerFactory.getLogger(CleanupTempTableDoFn.class); + + @StateId("cleanupInfo") + private final StateSpec> cleanupInfoSpec = StateSpecs.value(); + + @StateId("completedStreams") + private final StateSpec> completedStreamsSpec = StateSpecs.value(); + + CleanupTempTableDoFn(BigQueryServices bqServices) { + this.bqServices = bqServices; + } + + @ProcessElement + public void processElement( + @Element KV element, + @StateId("cleanupInfo") ValueState cleanupInfoState, + @StateId("completedStreams") ValueState completedStreamsState, + PipelineOptions options) + throws Exception { + + CleanupOperationMessage msg = element.getValue(); + CleanupInfo cleanupInfo = cleanupInfoState.read(); + int completed = firstNonNull(completedStreamsState.read(), 0); + + if (msg.isStreamCompletion()) { + completed += 1; + completedStreamsState.write(completed); + } else { + cleanupInfoState.write(msg.getCleanupInfo()); + cleanupInfo = msg.getCleanupInfo(); + } + + if (cleanupInfo != null + && cleanupInfo.getTotalStreams() > 0 + && completed == cleanupInfo.getTotalStreams()) { + TableReference tempTable = cleanupInfo.getTableReference(); + try (DatasetService datasetService = + bqServices.getDatasetService(options.as(BigQueryOptions.class))) { + LOG.info("Deleting temporary table with query results {}", tempTable); + datasetService.deleteTable(tempTable); + if (cleanupInfo.isDatasetCreatedByBeam()) { + LOG.info("Deleting temporary dataset with query results {}", tempTable.getDatasetId()); + datasetService.deleteDataset(tempTable.getProjectId(), tempTable.getDatasetId()); + } + } catch (Exception e) { + LOG.warn("Failed to delete temporary BigQuery table {}", tempTable, e); + } + cleanupInfoState.clear(); + completedStreamsState.clear(); + } + } + } + + private static class ReadDynamicStreamSource + extends DoFn>, T> { + private final TypedRead.ReadTableSource readTableSource; + private final TupleTag> streamCompletionTag; + + ReadDynamicStreamSource( + TupleTag rowTag, + SerializableFunction parseFn, + BadRecordRouter badRecordRouter, + TupleTag> streamCompletionTag) { + this.readTableSource = new TypedRead.ReadTableSource<>(rowTag, parseFn, badRecordRouter); + this.streamCompletionTag = streamCompletionTag; + } + + @ProcessElement + public void processElement( + @Element KV> element, + MultiOutputReceiver receiver, + PipelineOptions options) + throws Exception { + readTableSource.processElement(element.getValue(), receiver, options); + if (element.getValue().getFromQuery()) { + receiver + .get(streamCompletionTag) + .output(KV.of(element.getKey(), CleanupOperationMessage.streamComplete())); + } + } + } + public static class Read extends PTransform> { private final TypedRead inner; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java index 124a708eed6b..6c722b9f43f2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java @@ -77,7 +77,13 @@ public static BigQueryStorageStreamSource create( toJsonString(Preconditions.checkArgumentNotNull(tableSchema, "tableSchema")), parseFn, outputCoder, - bqServices); + bqServices, + false); + } + + public BigQueryStorageStreamSource withFromQuery() { + return new BigQueryStorageStreamSource<>( + readSession, readStream, jsonTableSchema, parseFn, outputCoder, bqServices, true); } @Override @@ -106,13 +112,13 @@ public int hashCode() { */ public BigQueryStorageStreamSource fromExisting(ReadStream newReadStream) { return new BigQueryStorageStreamSource<>( - readSession, newReadStream, jsonTableSchema, parseFn, outputCoder, bqServices); + readSession, newReadStream, jsonTableSchema, parseFn, outputCoder, bqServices, fromQuery); } public BigQueryStorageStreamSource fromExisting( SerializableFunction parseFn) { return new BigQueryStorageStreamSource<>( - readSession, readStream, jsonTableSchema, parseFn, outputCoder, bqServices); + readSession, readStream, jsonTableSchema, parseFn, outputCoder, bqServices, fromQuery); } private final ReadSession readSession; @@ -121,6 +127,7 @@ public BigQueryStorageStreamSource fromExisting( private final SerializableFunction parseFn; private final Coder outputCoder; private final BigQueryServices bqServices; + private final boolean fromQuery; private BigQueryStorageStreamSource( ReadSession readSession, @@ -128,13 +135,19 @@ private BigQueryStorageStreamSource( String jsonTableSchema, SerializableFunction parseFn, Coder outputCoder, - BigQueryServices bqServices) { + BigQueryServices bqServices, + boolean fromQuery) { this.readSession = Preconditions.checkArgumentNotNull(readSession, "readSession"); this.readStream = Preconditions.checkArgumentNotNull(readStream, "stream"); this.jsonTableSchema = Preconditions.checkArgumentNotNull(jsonTableSchema, "jsonTableSchema"); this.parseFn = Preconditions.checkArgumentNotNull(parseFn, "parseFn"); this.outputCoder = Preconditions.checkArgumentNotNull(outputCoder, "outputCoder"); this.bqServices = Preconditions.checkArgumentNotNull(bqServices, "bqServices"); + this.fromQuery = fromQuery; + } + + public boolean getFromQuery() { + return fromQuery; } @Override diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/CleanupTempTableDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/CleanupTempTableDoFnTest.java new file mode 100644 index 000000000000..5b0537363c74 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/CleanupTempTableDoFnTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.api.services.bigquery.model.TableReference; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.CleanupInfo; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.CleanupOperationMessage; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.CleanupTempTableDoFn; +import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; +import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class CleanupTempTableDoFnTest { + + @Rule public transient TestPipeline p = TestPipeline.create(); + + @Before + public void setUp() { + FakeDatasetService.setUp(); + } + + @Test + public void testCleanupTempTableDoFn() throws Exception { + FakeDatasetService fakeDatasetService = new FakeDatasetService(); + FakeBigQueryServices fakeBqServices = + new FakeBigQueryServices().withDatasetService(fakeDatasetService); + + TableReference tableRef = + new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); + + fakeDatasetService.createDataset( + tableRef.getProjectId(), tableRef.getDatasetId(), "", "", null); + fakeDatasetService.createTable( + new com.google.api.services.bigquery.model.Table().setTableReference(tableRef)); + + assertTrue( + fakeDatasetService.getDataset(tableRef.getProjectId(), tableRef.getDatasetId()) != null); + assertTrue(fakeDatasetService.getTable(tableRef) != null); + + CleanupInfo cleanupInfo = new CleanupInfo(tableRef, true, 2); + + PCollection> input = + p.apply( + Create.of( + KV.of("job1", CleanupOperationMessage.initialize(cleanupInfo)), + KV.of("job1", CleanupOperationMessage.streamComplete()), + KV.of("job1", CleanupOperationMessage.streamComplete()))); + + input.apply(ParDo.of(new CleanupTempTableDoFn(fakeBqServices))); + + p.run().waitUntilFinish(); + + // The dataset is deleted, so getDataset and getTable should throw a 404 Exception or return + // null + try { + fakeDatasetService.getDataset(tableRef.getProjectId(), tableRef.getDatasetId()); + fail("Dataset should have been deleted"); + } catch (Exception e) { + assertTrue( + e.getMessage().contains("Tried to get a dataset") + || e.getMessage().contains("Not Found")); + } + + try { + TableReference deletedRef = + new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); + Object table = fakeDatasetService.getTable(deletedRef); + assertTrue(table == null); + } catch (Exception e) { + assertTrue( + e.getMessage().contains("Tried to get a dataset") + || e.getMessage().contains("Not Found")); + } + } +} From 1b1096de0f45f3b32f91cf985dba62175c72483f Mon Sep 17 00:00:00 2001 From: Radek Stankiewicz Date: Fri, 6 Feb 2026 14:15:24 +0100 Subject: [PATCH 2/2] Fix missing coder for CleanupOperationMessage side outputs --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 6fb115577368..90ee614f07d0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1085,8 +1085,22 @@ public String apply(BigQueryDynamicReadDescriptor input) { rowTag, getParseFn(), getBadRecordRouter(), cleanupInfoTag)) .withOutputTags(rowTag, TupleTagList.of(BAD_RECORD_TAG).and(cleanupInfoTag))); - PCollectionList.of(resultTuple.get(cleanupInfoTag)) - .and(readResultTuple.get(cleanupInfoTag)) + PCollection> cleanupMessages1 = + resultTuple + .get(cleanupInfoTag) + .setCoder( + KvCoder.of( + StringUtf8Coder.of(), SerializableCoder.of(CleanupOperationMessage.class))); + + PCollection> cleanupMessages2 = + readResultTuple + .get(cleanupInfoTag) + .setCoder( + KvCoder.of( + StringUtf8Coder.of(), SerializableCoder.of(CleanupOperationMessage.class))); + + PCollectionList.of(cleanupMessages1) + .and(cleanupMessages2) .apply(Flatten.pCollections()) .apply("CleanupTempTables", ParDo.of(new CleanupTempTableDoFn(getBigQueryServices())));