diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go index f7d6ba5ad361..2399fd726dae 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go @@ -30,6 +30,7 @@ import ( jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1" "github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker" "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) type Server struct { @@ -80,6 +81,10 @@ func NewServer(port int, execute func(*Job)) *Server { s.logger.Info("Serving JobManagement", slog.String("endpoint", s.Endpoint())) opts := []grpc.ServerOption{ grpc.MaxRecvMsgSize(math.MaxInt32), + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: 20 * time.Second, // Minimum duration a client should wait before sending a keepalive ping + PermitWithoutStream: true, // Allow pings even if there are no active streams + }), } s.server = grpc.NewServer(opts...) jobpb.RegisterJobServiceServer(s.server, s)