Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ private static class OutgoingData {
private final @Nullable ValueProvider<TopicPath> topic;
private final String timestampAttribute;
private final String idAttribute;
private final int publishBatchSize;
private final int publishBatchBytes;

private final String pubsubRootUrl;
Expand All @@ -270,6 +271,7 @@ private static class OutgoingData {
this.topic = topic;
this.timestampAttribute = timestampAttribute;
this.idAttribute = idAttribute;
this.publishBatchSize = publishBatchSize;
this.publishBatchBytes = publishBatchBytes;
this.pubsubRootUrl = null;
}
Expand All @@ -279,12 +281,14 @@ private static class OutgoingData {
@Nullable ValueProvider<TopicPath> topic,
String timestampAttribute,
String idAttribute,
int publishBatchSize,
int publishBatchBytes,
String pubsubRootUrl) {
this.pubsubFactory = pubsubFactory;
this.topic = topic;
this.timestampAttribute = timestampAttribute;
this.idAttribute = idAttribute;
this.publishBatchSize = publishBatchSize;
this.publishBatchBytes = publishBatchBytes;
this.pubsubRootUrl = pubsubRootUrl;
}
Expand Down Expand Up @@ -354,13 +358,14 @@ public void processElement(ProcessContext c) throws Exception {
if (currentBatch == null) {
currentBatch = new OutgoingData();
orderingKeyBatches.put(currentOrderingKey, currentBatch);
} else if (currentBatch.bytes + message.getMessage().getData().size() > publishBatchBytes) {
} else if (currentBatch.messages.size() >= publishBatchSize
|| currentBatch.bytes + message.getMessage().getData().size() > publishBatchBytes) {
// TODO(sjvanrossum): https://github.com/apache/beam/issues/31800

// Break large (in bytes) batches into smaller.
// (We've already broken by batch size using the trigger below, though that may
// run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since
// the hard limit from Pubsub is by bytes rather than number of messages.)
// Break large batches into smaller by message count or byte size.
// The trigger below breaks by batch size, but may run slightly over.
// Pubsub has hard limits on both bytes and number of messages per batch.
// See https://cloud.google.com/pubsub/quotas#resource_limits for details.
// BLOCKS until published.
publishBatch(currentBatch.messages, currentBatch.bytes);
currentBatch.messages.clear();
Expand Down Expand Up @@ -659,6 +664,7 @@ public PDone expand(PCollection<KV<String, byte[]>> input) {
outer.topic,
outer.timestampAttribute,
outer.idAttribute,
outer.publishBatchSize,
outer.publishBatchBytes,
outer.pubsubRootUrl)));
return PDone.in(input.getPipeline());
Expand Down Expand Up @@ -711,6 +717,7 @@ public PDone expand(PCollection<byte[]> input) {
outer.topic,
outer.timestampAttribute,
outer.idAttribute,
outer.publishBatchSize,
outer.publishBatchBytes,
outer.pubsubRootUrl)));
return PDone.in(input.getPipeline());
Expand Down
Loading