From 0b871e317e32759739627827a25ce5ce69d8a1f0 Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Fri, 5 Dec 2025 14:35:41 -0500 Subject: [PATCH 1/2] Fix too-many-pings on FnAPI runner under grpc mode (#37013) * Fix too-many-pings on FnAPI runner under grpc mode * Fix lints --- .../portability/fn_api_runner/worker_handlers.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py index 338f6ece57c0..d79b381f2d78 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py @@ -465,10 +465,15 @@ def __init__( # received or sent over the data plane. The actual buffer size # is controlled in a layer above. Also, options to keep the server alive # when too many pings are received. - options = [("grpc.max_receive_message_length", -1), - ("grpc.max_send_message_length", -1), - ("grpc.http2.max_pings_without_data", 0), - ("grpc.http2.max_ping_strikes", 0)] + options = [ + ("grpc.max_receive_message_length", -1), + ("grpc.max_send_message_length", -1), + ("grpc.http2.max_pings_without_data", 0), + ("grpc.http2.max_ping_strikes", 0), + # match `grpc.keepalive_time_ms` defined in the client + # (channel_factory.py) + ("grpc.http2.min_ping_interval_without_data_ms", 20_000), + ] self.state = state self.provision_info = provision_info From afce87a0048522c7ccdd232b455f7c18c983940a Mon Sep 17 00:00:00 2001 From: Shunping Huang Date: Mon, 8 Dec 2025 13:57:26 -0500 Subject: [PATCH 2/2] Set keepalive policy for prism grpc server. (#37021) --- .../go/pkg/beam/runners/prism/internal/jobservices/server.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go index f7d6ba5ad361..2399fd726dae 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go @@ -30,6 +30,7 @@ import ( jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) type Server struct { @@ -80,6 +81,10 @@ func NewServer(port int, execute func(*Job)) *Server { s.logger.Info("Serving JobManagement", slog.String("endpoint", s.Endpoint())) opts := []grpc.ServerOption{ grpc.MaxRecvMsgSize(math.MaxInt32), + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: 20 * time.Second, // Minimum duration a client should wait before sending a keepalive ping + PermitWithoutStream: true, // Allow pings even if there are no active streams + }), } s.server = grpc.NewServer(opts...) jobpb.RegisterJobServiceServer(s.server, s)