From f9502cb58272f85396ac81cac8b25601e18e0301 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Sun, 8 Feb 2026 19:41:29 -0800 Subject: [PATCH 1/3] Add RateLimiter Abstractions --- .../components/ratelimiter/RateLimiter.java | 41 +++++++++++++ .../ratelimiter/RateLimiterContext.java | 27 +++++++++ .../ratelimiter/RateLimiterFactory.java | 57 +++++++++++++++++++ settings.gradle.kts | 1 + 4 files changed, 126 insertions(+) create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterContext.java create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java new file mode 100644 index 000000000000..5286127ecf6c --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import java.io.IOException; +import java.io.Serializable; + +/** + * A RateLimiter allows to fetch permits from a rate limiter service and blocks execution when the + * rate limit is exceeded. + * + *

Implementations must be {@link Serializable} as they are passed to workers. + */ +public interface RateLimiter extends Serializable, AutoCloseable { + + /** + * Blocks until the specified number of permits are acquired and returns true if the request was + * allowed or false if the request was rejected. + * + * @param permits Number of permits to acquire. + * @return true if the request was allowed, false if it was rejected (and retries exceeded). + * @throws IOException if there is an error communicating with the rate limiter service. + * @throws InterruptedException if the thread is interrupted while waiting. + */ + boolean allow(int permits) throws IOException, InterruptedException; +} \ No newline at end of file diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterContext.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterContext.java new file mode 100644 index 000000000000..552cde850023 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterContext.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import java.io.Serializable; + +/** + * A marker interface for context data required to check ratelimit. + * + *

Implementations must be {@link Serializable}. + */ +public interface RateLimiterContext extends Serializable {} \ No newline at end of file diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java new file mode 100644 index 000000000000..acdf3af89b3b --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import java.io.IOException; +import java.io.Serializable; + +/** + * A factory that manages connections to rate limit service and creates lightweight handles. + * + *

Implementations must be {@link Serializable} as they are passed to workers. The factory + * typically manages the heavy connection (e.g. gRPC stub) and is thread-safe. + */ +public interface RateLimiterFactory extends Serializable, AutoCloseable { + + /** + * Creates a lightweight ratelimiter handle bound to a specific context. + * + *

Use this when passing ratelimiter to IO components, which doesn't need to know about the + * configuration or the underlying ratelimiter service details. This is also useful in DoFns when + * you want to use the ratelimiter in a static way based on the compile time context. + * + * @param context The context for the ratelimit. + * @return A {@link RateLimiter} handle. + */ + RateLimiter getLimiter(RateLimiterContext context); + + /** + * Blocks until the specified number of permits are acquired and returns true if the request was + * allowed or false if the request was rejected. + * + *

Use this for when the ratelimit namespace or descriptors are not known at compile time. + * allows you to use the ratelimiter in a dynamic way based on the runtime data. + * + * @param context The context for the ratelimit. + * @param permits Number of permits to acquire. + * @return true if the request is allowed, false if rejected. + * @throws IOException if there is an error communicating with the ratelimiter service. + * @throws InterruptedException if the thread is interrupted while waiting. + */ + boolean allow(RateLimiterContext context, int permits) throws IOException, InterruptedException; +} \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index 1b53cb151a69..add4a9ad216a 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -212,6 +212,7 @@ include(":sdks:java:io:azure-cosmos") include(":sdks:java:io:cassandra") include(":sdks:java:io:clickhouse") include(":sdks:java:io:common") +include(":sdks:java:io:components") include(":sdks:java:io:contextualtextio") include(":sdks:java:io:debezium") include(":sdks:java:io:debezium:expansion-service") From 517291afbbbed1b5b8c4d1d1fd867f5ac2e6e386 Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Sun, 8 Feb 2026 19:58:44 -0800 Subject: [PATCH 2/3] fix spotless --- .../apache/beam/sdk/io/components/ratelimiter/RateLimiter.java | 2 +- .../beam/sdk/io/components/ratelimiter/RateLimiterContext.java | 2 +- .../beam/sdk/io/components/ratelimiter/RateLimiterFactory.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java index 5286127ecf6c..8c02654b3964 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java @@ -38,4 +38,4 @@ public interface RateLimiter extends Serializable, AutoCloseable { * @throws InterruptedException if the thread is interrupted while waiting. */ boolean allow(int permits) throws IOException, InterruptedException; -} \ No newline at end of file +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterContext.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterContext.java index 552cde850023..6387bf5789e4 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterContext.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterContext.java @@ -24,4 +24,4 @@ * *

Implementations must be {@link Serializable}. */ -public interface RateLimiterContext extends Serializable {} \ No newline at end of file +public interface RateLimiterContext extends Serializable {} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java index acdf3af89b3b..b4330cd53db5 100644 --- a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java @@ -54,4 +54,4 @@ public interface RateLimiterFactory extends Serializable, AutoCloseable { * @throws InterruptedException if the thread is interrupted while waiting. */ boolean allow(RateLimiterContext context, int permits) throws IOException, InterruptedException; -} \ No newline at end of file +} From 04b5fa940bd8ea132fd23be74161f6716509995f Mon Sep 17 00:00:00 2001 From: Tarun Annapareddy Date: Mon, 9 Feb 2026 11:38:49 -0800 Subject: [PATCH 3/3] add package info --- .../components/ratelimiter/package-info.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/package-info.java diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/package-info.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/package-info.java new file mode 100644 index 000000000000..556447ad11c2 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Defines ratelimiter utilities for Beam DoFn and IO components. */ +package org.apache.beam.sdk.io.components.ratelimiter;