From f70b0f2eaabab90798083192a48d6495299df1f3 Mon Sep 17 00:00:00 2001 From: userzhy <48518279+userzhy@users.noreply.github.com> Date: Wed, 24 Dec 2025 05:49:18 +0000 Subject: [PATCH] Fix PubsubUnboundedSink not following 1000 messages per batch limit This commit fixes the issue where PubsubUnboundedSink's WriterFn was not properly enforcing the message count limit per publish batch. The publishBatchSize parameter was being passed to the constructor but not stored or used in the processElement method. Changes: - Add publishBatchSize field to WriterFn class - Store publishBatchSize in both WriterFn constructors - Add message count check in processElement alongside existing byte size check - Update both PubsubSinkDynamicDestinations and PubsubSink to pass publishBatchSize when creating WriterFn instances The fix ensures that batches are split when they reach either the message count limit or the byte size limit, preventing Pubsub from rejecting batches that exceed the 1000 messages per request limit. Fixes #36885 --- .../sdk/io/gcp/pubsub/PubsubUnboundedSink.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index 3fe7d51aec1e..4514770fc421 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -248,6 +248,7 @@ private static class OutgoingData { private final @Nullable ValueProvider topic; private final String timestampAttribute; private final String idAttribute; + private final int publishBatchSize; private final int publishBatchBytes; private final String pubsubRootUrl; @@ -270,6 +271,7 @@ private static class OutgoingData { this.topic = topic; this.timestampAttribute = timestampAttribute; this.idAttribute = idAttribute; + this.publishBatchSize = publishBatchSize; this.publishBatchBytes = publishBatchBytes; this.pubsubRootUrl = null; } @@ -279,12 +281,14 @@ private static class OutgoingData { @Nullable ValueProvider topic, String timestampAttribute, String idAttribute, + int publishBatchSize, int publishBatchBytes, String pubsubRootUrl) { this.pubsubFactory = pubsubFactory; this.topic = topic; this.timestampAttribute = timestampAttribute; this.idAttribute = idAttribute; + this.publishBatchSize = publishBatchSize; this.publishBatchBytes = publishBatchBytes; this.pubsubRootUrl = pubsubRootUrl; } @@ -354,13 +358,14 @@ public void processElement(ProcessContext c) throws Exception { if (currentBatch == null) { currentBatch = new OutgoingData(); orderingKeyBatches.put(currentOrderingKey, currentBatch); - } else if (currentBatch.bytes + message.getMessage().getData().size() > publishBatchBytes) { + } else if (currentBatch.messages.size() >= publishBatchSize + || currentBatch.bytes + message.getMessage().getData().size() > publishBatchBytes) { // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 - // Break large (in bytes) batches into smaller. - // (We've already broken by batch size using the trigger below, though that may - // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since - // the hard limit from Pubsub is by bytes rather than number of messages.) + // Break large batches into smaller by message count or byte size. + // The trigger below breaks by batch size, but may run slightly over. + // Pubsub has hard limits on both bytes and number of messages per batch. + // See https://cloud.google.com/pubsub/quotas#resource_limits for details. // BLOCKS until published. publishBatch(currentBatch.messages, currentBatch.bytes); currentBatch.messages.clear(); @@ -659,6 +664,7 @@ public PDone expand(PCollection> input) { outer.topic, outer.timestampAttribute, outer.idAttribute, + outer.publishBatchSize, outer.publishBatchBytes, outer.pubsubRootUrl))); return PDone.in(input.getPipeline()); @@ -711,6 +717,7 @@ public PDone expand(PCollection input) { outer.topic, outer.timestampAttribute, outer.idAttribute, + outer.publishBatchSize, outer.publishBatchBytes, outer.pubsubRootUrl))); return PDone.in(input.getPipeline());