diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java new file mode 100644 index 000000000000..8c02654b3964 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import java.io.IOException; +import java.io.Serializable; + +/** + * A RateLimiter allows to fetch permits from a rate limiter service and blocks execution when the + * rate limit is exceeded. + * + *
Implementations must be {@link Serializable} as they are passed to workers. + */ +public interface RateLimiter extends Serializable, AutoCloseable { + + /** + * Blocks until the specified number of permits are acquired and returns true if the request was + * allowed or false if the request was rejected. + * + * @param permits Number of permits to acquire. + * @return true if the request was allowed, false if it was rejected (and retries exceeded). + * @throws IOException if there is an error communicating with the rate limiter service. + * @throws InterruptedException if the thread is interrupted while waiting. + */ + boolean allow(int permits) throws IOException, InterruptedException; +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterContext.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterContext.java new file mode 100644 index 000000000000..6387bf5789e4 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterContext.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import java.io.Serializable; + +/** + * A marker interface for context data required to check ratelimit. + * + *
Implementations must be {@link Serializable}. + */ +public interface RateLimiterContext extends Serializable {} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java new file mode 100644 index 000000000000..b4330cd53db5 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/RateLimiterFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.components.ratelimiter; + +import java.io.IOException; +import java.io.Serializable; + +/** + * A factory that manages connections to rate limit service and creates lightweight handles. + * + *
Implementations must be {@link Serializable} as they are passed to workers. The factory + * typically manages the heavy connection (e.g. gRPC stub) and is thread-safe. + */ +public interface RateLimiterFactory extends Serializable, AutoCloseable { + + /** + * Creates a lightweight ratelimiter handle bound to a specific context. + * + *
Use this when passing ratelimiter to IO components, which doesn't need to know about the + * configuration or the underlying ratelimiter service details. This is also useful in DoFns when + * you want to use the ratelimiter in a static way based on the compile time context. + * + * @param context The context for the ratelimit. + * @return A {@link RateLimiter} handle. + */ + RateLimiter getLimiter(RateLimiterContext context); + + /** + * Blocks until the specified number of permits are acquired and returns true if the request was + * allowed or false if the request was rejected. + * + *
Use this for when the ratelimit namespace or descriptors are not known at compile time. + * allows you to use the ratelimiter in a dynamic way based on the runtime data. + * + * @param context The context for the ratelimit. + * @param permits Number of permits to acquire. + * @return true if the request is allowed, false if rejected. + * @throws IOException if there is an error communicating with the ratelimiter service. + * @throws InterruptedException if the thread is interrupted while waiting. + */ + boolean allow(RateLimiterContext context, int permits) throws IOException, InterruptedException; +} diff --git a/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/package-info.java b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/package-info.java new file mode 100644 index 000000000000..556447ad11c2 --- /dev/null +++ b/sdks/java/io/components/src/main/java/org/apache/beam/sdk/io/components/ratelimiter/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Defines ratelimiter utilities for Beam DoFn and IO components. */ +package org.apache.beam.sdk.io.components.ratelimiter;