diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index 3fe7d51aec1e..4514770fc421 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -248,6 +248,7 @@ private static class OutgoingData { private final @Nullable ValueProvider topic; private final String timestampAttribute; private final String idAttribute; + private final int publishBatchSize; private final int publishBatchBytes; private final String pubsubRootUrl; @@ -270,6 +271,7 @@ private static class OutgoingData { this.topic = topic; this.timestampAttribute = timestampAttribute; this.idAttribute = idAttribute; + this.publishBatchSize = publishBatchSize; this.publishBatchBytes = publishBatchBytes; this.pubsubRootUrl = null; } @@ -279,12 +281,14 @@ private static class OutgoingData { @Nullable ValueProvider topic, String timestampAttribute, String idAttribute, + int publishBatchSize, int publishBatchBytes, String pubsubRootUrl) { this.pubsubFactory = pubsubFactory; this.topic = topic; this.timestampAttribute = timestampAttribute; this.idAttribute = idAttribute; + this.publishBatchSize = publishBatchSize; this.publishBatchBytes = publishBatchBytes; this.pubsubRootUrl = pubsubRootUrl; } @@ -354,13 +358,14 @@ public void processElement(ProcessContext c) throws Exception { if (currentBatch == null) { currentBatch = new OutgoingData(); orderingKeyBatches.put(currentOrderingKey, currentBatch); - } else if (currentBatch.bytes + message.getMessage().getData().size() > publishBatchBytes) { + } else if (currentBatch.messages.size() >= publishBatchSize + || currentBatch.bytes + message.getMessage().getData().size() > publishBatchBytes) { // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 - // Break large (in bytes) batches into smaller. - // (We've already broken by batch size using the trigger below, though that may - // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since - // the hard limit from Pubsub is by bytes rather than number of messages.) + // Break large batches into smaller by message count or byte size. + // The trigger below breaks by batch size, but may run slightly over. + // Pubsub has hard limits on both bytes and number of messages per batch. + // See https://cloud.google.com/pubsub/quotas#resource_limits for details. // BLOCKS until published. publishBatch(currentBatch.messages, currentBatch.bytes); currentBatch.messages.clear(); @@ -659,6 +664,7 @@ public PDone expand(PCollection> input) { outer.topic, outer.timestampAttribute, outer.idAttribute, + outer.publishBatchSize, outer.publishBatchBytes, outer.pubsubRootUrl))); return PDone.in(input.getPipeline()); @@ -711,6 +717,7 @@ public PDone expand(PCollection input) { outer.topic, outer.timestampAttribute, outer.idAttribute, + outer.publishBatchSize, outer.publishBatchBytes, outer.pubsubRootUrl))); return PDone.in(input.getPipeline());