From cbf45e70bf029a9988cbc623b6ca61f6b4c664cc Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sat, 20 Dec 2025 17:48:47 -0500 Subject: [PATCH 1/4] feat: Add support for custom prediction routes in Vertex AI inference using the `invoke_route` parameter and custom response parsing. --- .../ml/inference/vertex_ai_inference.py | 71 ++++++++++++++++++- .../ml/inference/vertex_ai_inference_test.py | 65 +++++++++++++++++ sdks/python/apache_beam/yaml/yaml_ml.py | 8 +++ 3 files changed, 141 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/vertex_ai_inference.py b/sdks/python/apache_beam/ml/inference/vertex_ai_inference.py index 9858b59039c7..4b2753a3033d 100644 --- a/sdks/python/apache_beam/ml/inference/vertex_ai_inference.py +++ b/sdks/python/apache_beam/ml/inference/vertex_ai_inference.py @@ -15,6 +15,7 @@ # limitations under the License. # +import json import logging from collections.abc import Iterable from collections.abc import Mapping @@ -63,6 +64,7 @@ def __init__( experiment: Optional[str] = None, network: Optional[str] = None, private: bool = False, + invoke_route: Optional[str] = None, *, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, @@ -95,6 +97,12 @@ def __init__( private: optional. if the deployed Vertex AI endpoint is private, set to true. Requires a network to be provided as well. + invoke_route: optional. the custom route path to use when invoking + endpoints with arbitrary prediction routes. When specified, uses + `Endpoint.invoke()` instead of `Endpoint.predict()`. The route + should start with a forward slash, e.g., "/predict/v1". + See https://cloud.google.com/vertex-ai/docs/predictions/use-arbitrary-custom-routes + for more information. min_batch_size: optional. the minimum batch size to use when batching inputs. max_batch_size: optional. the maximum batch size to use when batching @@ -104,6 +112,7 @@ def __init__( """ self._batching_kwargs = {} self._env_vars = kwargs.get('env_vars', {}) + self._invoke_route = invoke_route if min_batch_size is not None: self._batching_kwargs["min_batch_size"] = min_batch_size if max_batch_size is not None: @@ -203,9 +212,65 @@ def request( Returns: An iterable of Predictions. """ - prediction = model.predict(instances=list(batch), parameters=inference_args) - return utils._convert_to_result( - batch, prediction.predictions, prediction.deployed_model_id) + if self._invoke_route: + # Use invoke() for endpoints with custom prediction routes + request_body = {"instances": list(batch)} + if inference_args: + request_body["parameters"] = inference_args + response = model.invoke( + request_path=self._invoke_route, + body=json.dumps(request_body).encode("utf-8"), + headers={"Content-Type": "application/json"}) + return self._parse_invoke_response(batch, response) + else: + prediction = model.predict( + instances=list(batch), parameters=inference_args) + return utils._convert_to_result( + batch, prediction.predictions, prediction.deployed_model_id) + + def _parse_invoke_response( + self, batch: Sequence[Any], + response: bytes) -> Iterable[PredictionResult]: + """Parses the response from Endpoint.invoke() into PredictionResults. + + Args: + batch: the original batch of inputs. + response: the raw bytes response from invoke(). + + Returns: + An iterable of PredictionResults. + """ + try: + response_json = json.loads(response.decode("utf-8")) + except (json.JSONDecodeError, UnicodeDecodeError) as e: + LOGGER.warning( + "Failed to decode invoke response as JSON, returning raw bytes: %s", + e) + # Return raw response for each batch item + return [ + PredictionResult(example=example, inference=response) + for example in batch + ] + + # Handle standard Vertex AI response format with "predictions" key + if isinstance(response_json, dict) and "predictions" in response_json: + predictions = response_json["predictions"] + model_id = response_json.get("deployedModelId") + return utils._convert_to_result(batch, predictions, model_id) + + # Handle response as a list of predictions (one per input) + if isinstance(response_json, list) and len(response_json) == len(batch): + return utils._convert_to_result(batch, response_json, None) + + # Handle single prediction response + if len(batch) == 1: + return [PredictionResult(example=batch[0], inference=response_json)] + + # Fallback: return the full response for each batch item + return [ + PredictionResult(example=example, inference=response_json) + for example in batch + ] def batch_elements_kwargs(self) -> Mapping[str, Any]: return self._batching_kwargs diff --git a/sdks/python/apache_beam/ml/inference/vertex_ai_inference_test.py b/sdks/python/apache_beam/ml/inference/vertex_ai_inference_test.py index 91a3b82cf762..3682adedb9dc 100644 --- a/sdks/python/apache_beam/ml/inference/vertex_ai_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/vertex_ai_inference_test.py @@ -48,5 +48,70 @@ def test_exception_on_private_without_network(self): private=True) +class ParseInvokeResponseTest(unittest.TestCase): + """Tests for _parse_invoke_response method.""" + + def _create_handler_with_invoke_route(self, invoke_route="/test"): + """Creates a mock handler with invoke_route for testing.""" + import unittest.mock as mock + with mock.patch.object( + VertexAIModelHandlerJSON, '_retrieve_endpoint', return_value=None): + handler = VertexAIModelHandlerJSON( + endpoint_id="1", + project="testproject", + location="us-central1", + invoke_route=invoke_route) + return handler + + def test_parse_invoke_response_with_predictions_key(self): + """Test parsing response with standard 'predictions' key.""" + handler = self._create_handler_with_invoke_route() + batch = [{"input": "test1"}, {"input": "test2"}] + response = b'{"predictions": ["result1", "result2"], "deployedModelId": "model123"}' + + results = list(handler._parse_invoke_response(batch, response)) + + self.assertEqual(len(results), 2) + self.assertEqual(results[0].example, {"input": "test1"}) + self.assertEqual(results[0].inference, "result1") + self.assertEqual(results[1].example, {"input": "test2"}) + self.assertEqual(results[1].inference, "result2") + + def test_parse_invoke_response_list_format(self): + """Test parsing response as a list of predictions.""" + handler = self._create_handler_with_invoke_route() + batch = [{"input": "test1"}, {"input": "test2"}] + response = b'["result1", "result2"]' + + results = list(handler._parse_invoke_response(batch, response)) + + self.assertEqual(len(results), 2) + self.assertEqual(results[0].inference, "result1") + self.assertEqual(results[1].inference, "result2") + + def test_parse_invoke_response_single_prediction(self): + """Test parsing response with a single prediction.""" + handler = self._create_handler_with_invoke_route() + batch = [{"input": "test1"}] + response = b'{"output": "single result"}' + + results = list(handler._parse_invoke_response(batch, response)) + + self.assertEqual(len(results), 1) + self.assertEqual(results[0].inference, {"output": "single result"}) + + def test_parse_invoke_response_non_json(self): + """Test handling non-JSON response.""" + handler = self._create_handler_with_invoke_route() + batch = [{"input": "test1"}] + response = b'not valid json' + + results = list(handler._parse_invoke_response(batch, response)) + + self.assertEqual(len(results), 1) + self.assertEqual(results[0].inference, response) + + if __name__ == '__main__': unittest.main() + diff --git a/sdks/python/apache_beam/yaml/yaml_ml.py b/sdks/python/apache_beam/yaml/yaml_ml.py index e5a88f54eba7..39098ad7174c 100644 --- a/sdks/python/apache_beam/yaml/yaml_ml.py +++ b/sdks/python/apache_beam/yaml/yaml_ml.py @@ -168,6 +168,7 @@ def __init__( experiment: Optional[str] = None, network: Optional[str] = None, private: bool = False, + invoke_route: Optional[str] = None, min_batch_size: Optional[int] = None, max_batch_size: Optional[int] = None, max_batch_duration_secs: Optional[int] = None): @@ -236,6 +237,12 @@ def __init__( private: If the deployed Vertex AI endpoint is private, set to true. Requires a network to be provided as well. + invoke_route: The custom route path to use when invoking + endpoints with arbitrary prediction routes. When specified, uses + `Endpoint.invoke()` instead of `Endpoint.predict()`. The route + should start with a forward slash, e.g., "/predict/v1". + See https://cloud.google.com/vertex-ai/docs/predictions/use-arbitrary-custom-routes + for more information. min_batch_size: The minimum batch size to use when batching inputs. max_batch_size: The maximum batch size to use when batching @@ -258,6 +265,7 @@ def __init__( experiment=experiment, network=network, private=private, + invoke_route=invoke_route, min_batch_size=min_batch_size, max_batch_size=max_batch_size, max_batch_duration_secs=max_batch_duration_secs) From 33d9f71b74fa1fe5fc24c2eef3ad0cf5b50d4046 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sat, 20 Dec 2025 19:11:20 -0500 Subject: [PATCH 2/4] lint --- .../ml/inference/vertex_ai_inference_test.py | 11 ++++++----- sdks/python/apache_beam/yaml/yaml_ml.py | 3 ++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/vertex_ai_inference_test.py b/sdks/python/apache_beam/ml/inference/vertex_ai_inference_test.py index 3682adedb9dc..488ee57e1a49 100644 --- a/sdks/python/apache_beam/ml/inference/vertex_ai_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/vertex_ai_inference_test.py @@ -50,12 +50,12 @@ def test_exception_on_private_without_network(self): class ParseInvokeResponseTest(unittest.TestCase): """Tests for _parse_invoke_response method.""" - def _create_handler_with_invoke_route(self, invoke_route="/test"): """Creates a mock handler with invoke_route for testing.""" import unittest.mock as mock - with mock.patch.object( - VertexAIModelHandlerJSON, '_retrieve_endpoint', return_value=None): + with mock.patch.object(VertexAIModelHandlerJSON, + '_retrieve_endpoint', + return_value=None): handler = VertexAIModelHandlerJSON( endpoint_id="1", project="testproject", @@ -67,7 +67,9 @@ def test_parse_invoke_response_with_predictions_key(self): """Test parsing response with standard 'predictions' key.""" handler = self._create_handler_with_invoke_route() batch = [{"input": "test1"}, {"input": "test2"}] - response = b'{"predictions": ["result1", "result2"], "deployedModelId": "model123"}' + response = ( + b'{"predictions": ["result1", "result2"], ' + b'"deployedModelId": "model123"}') results = list(handler._parse_invoke_response(batch, response)) @@ -114,4 +116,3 @@ def test_parse_invoke_response_non_json(self): if __name__ == '__main__': unittest.main() - diff --git a/sdks/python/apache_beam/yaml/yaml_ml.py b/sdks/python/apache_beam/yaml/yaml_ml.py index 39098ad7174c..4e750b79ce30 100644 --- a/sdks/python/apache_beam/yaml/yaml_ml.py +++ b/sdks/python/apache_beam/yaml/yaml_ml.py @@ -241,7 +241,8 @@ def __init__( endpoints with arbitrary prediction routes. When specified, uses `Endpoint.invoke()` instead of `Endpoint.predict()`. The route should start with a forward slash, e.g., "/predict/v1". - See https://cloud.google.com/vertex-ai/docs/predictions/use-arbitrary-custom-routes + See + https://cloud.google.com/vertex-ai/docs/predictions/use-arbitrary-custom-routes for more information. min_batch_size: The minimum batch size to use when batching inputs. From b9ae2d82615c59f66e02f645b24b7b94f12d25d2 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sat, 20 Dec 2025 19:37:16 -0500 Subject: [PATCH 3/4] lint 2 --- sdks/python/apache_beam/ml/inference/vertex_ai_inference.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/vertex_ai_inference.py b/sdks/python/apache_beam/ml/inference/vertex_ai_inference.py index 4b2753a3033d..468ccd7b04a1 100644 --- a/sdks/python/apache_beam/ml/inference/vertex_ai_inference.py +++ b/sdks/python/apache_beam/ml/inference/vertex_ai_inference.py @@ -228,9 +228,8 @@ def request( return utils._convert_to_result( batch, prediction.predictions, prediction.deployed_model_id) - def _parse_invoke_response( - self, batch: Sequence[Any], - response: bytes) -> Iterable[PredictionResult]: + def _parse_invoke_response(self, batch: Sequence[Any], + response: bytes) -> Iterable[PredictionResult]: """Parses the response from Endpoint.invoke() into PredictionResults. Args: From 39c9bd51f38684800b4906bb72ef01975e752c10 Mon Sep 17 00:00:00 2001 From: XQ Hu Date: Sat, 20 Dec 2025 20:24:22 -0500 Subject: [PATCH 4/4] fix: ensure invoke response is bytes and add type hint for request_body --- sdks/python/apache_beam/ml/inference/vertex_ai_inference.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/ml/inference/vertex_ai_inference.py b/sdks/python/apache_beam/ml/inference/vertex_ai_inference.py index 468ccd7b04a1..b28fa8507775 100644 --- a/sdks/python/apache_beam/ml/inference/vertex_ai_inference.py +++ b/sdks/python/apache_beam/ml/inference/vertex_ai_inference.py @@ -214,14 +214,14 @@ def request( """ if self._invoke_route: # Use invoke() for endpoints with custom prediction routes - request_body = {"instances": list(batch)} + request_body: dict[str, Any] = {"instances": list(batch)} if inference_args: request_body["parameters"] = inference_args response = model.invoke( request_path=self._invoke_route, body=json.dumps(request_body).encode("utf-8"), headers={"Content-Type": "application/json"}) - return self._parse_invoke_response(batch, response) + return self._parse_invoke_response(batch, bytes(response)) else: prediction = model.predict( instances=list(batch), parameters=inference_args)