diff --git a/CHANGES.md b/CHANGES.md index 07f3b4f5accc..f36e83ff72f6 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -95,6 +95,7 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Fixed TensorRT compatibility with version 10.x by implementing new Tensor API (Python) ([#36306](https://github.com/apache/beam/issues/36306)). * Fixed passing of pipeline options to x-lang transforms when called from the Java SDK (Java) ([#36443](https://github.com/apache/beam/issues/36443)). * PulsarIO has now changed support status from incomplete to experimental. Both read and writes should now minimally function (un-partitioned topics, without schema support, timestamp ordered messages for read) (Java) diff --git a/sdks/python/apache_beam/examples/inference/tensorrt_resnet50_inference.py b/sdks/python/apache_beam/examples/inference/tensorrt_resnet50_inference.py new file mode 100644 index 000000000000..b97dff5f71c1 --- /dev/null +++ b/sdks/python/apache_beam/examples/inference/tensorrt_resnet50_inference.py @@ -0,0 +1,248 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Example pipeline using TensorRT 10.x with ONNX model conversion. + +This example demonstrates the TensorRT 10.x handler's ability to: +1. Load ONNX models directly (no pre-conversion needed) +2. Build TensorRT engines on-worker (avoids environment alignment issues) +3. Use the new Tensor API for inference + +**Key Advantage over Legacy TensorRT:** +The on-worker ONNX-to-engine conversion ensures that the TensorRT engine is +built in the exact same environment where inference runs. This eliminates +compatibility issues that occur when pre-building engines on different +GPU/CUDA/TensorRT versions. + +Example Usage: + # Using ONNX model (builds engine on worker) + python tensorrt_resnet50_inference.py \\ + --onnx_path=gs://my-bucket/resnet50.onnx \\ + --input=gs://my-bucket/images.txt \\ + --output=gs://my-bucket/predictions.txt \\ + --runner=DataflowRunner \\ + --project=my-project \\ + --region=us-central1 \\ + --experiment=worker_accelerator=type:nvidia-tesla-t4;count:1;\ +install-nvidia-driver + + # Using pre-built engine (for optimal performance) + python tensorrt_resnet50_inference.py \\ + --engine_path=gs://my-bucket/resnet50.engine \\ + --input=gs://my-bucket/images.txt \\ + --output=gs://my-bucket/predictions.txt +""" + +import argparse +import io +import logging +import numpy as np +from typing import Iterable + +import apache_beam as beam +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference.base import PredictionResult, RunInference +from apache_beam.ml.inference.trt_handler_numpy_compact import \ + TensorRTEngineHandlerNumPy +from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions + +try: + from PIL import Image +except ImportError: + Image = None + + +def read_image(image_path: str) -> tuple[str, np.ndarray]: + """Read and preprocess image for ResNet-50 inference. + + Args: + image_path: Path to image file (supports GCS, local, etc.) + + Returns: + Tuple of (image_path, preprocessed_array) + """ + if Image is None: + raise ImportError( + "Pillow is required for image processing. " + "Install with: pip install pillow") + + with FileSystems().open(image_path, 'r') as f: + img = Image.open(io.BytesIO(f.read())).convert('RGB') + + # ResNet-50 preprocessing: resize to 224x224, normalize to [0,1] + img = img.resize((224, 224), resample=Image.Resampling.BILINEAR) + arr = np.asarray(img, dtype=np.float32) + + # Convert HWC to CHW and add batch dimension + arr = np.transpose(arr, (2, 0, 1)) # HWC -> CHW + arr = np.expand_dims(arr, axis=0) # CHW -> NCHW + + # Normalize to [0, 1] + arr = arr / 255.0 + + return image_path, arr + + +class FormatPrediction(beam.DoFn): + """Format TensorRT predictions for output.""" + def process(self, element: tuple[str, PredictionResult]) -> Iterable[str]: + """Format prediction result. + + Args: + element: Tuple of (image_path, PredictionResult) + + Yields: + Formatted prediction string + """ + image_path, prediction = element + + # Extract output tensors + outputs = prediction.inference + if not outputs: + yield f"{image_path},ERROR: No outputs" + return + + # For ResNet-50, output[0] is typically (1, 1000) logits + logits = np.asarray(outputs[0]) + + if logits.size == 0: + yield f"{image_path},ERROR: Empty output" + return + + # Get top-5 predictions + flat_logits = logits.flatten() + top5_indices = np.argsort(flat_logits)[-5:][::-1] + top5_scores = flat_logits[top5_indices] + + # Format: image_path,class1:score1,class2:score2,... + predictions_str = ','.join( + f"{idx}:{score:.4f}" for idx, score in zip(top5_indices, top5_scores)) + + yield f"{image_path},{predictions_str}" + + +def parse_known_args(argv): + """Parse command line arguments.""" + parser = argparse.ArgumentParser() + + parser.add_argument( + '--onnx_path', + dest='onnx_path', + help='Path to ONNX model file. Engine will be built on worker. ' + 'Use this for maximum compatibility across environments.') + + parser.add_argument( + '--engine_path', + dest='engine_path', + help='Path to pre-built TensorRT engine (.engine). ' + 'Provides best performance but requires matching ' + 'GPU/CUDA/TensorRT versions.') + + parser.add_argument( + '--input', + dest='input', + required=True, + help='Path to text file containing image paths (one per line).') + + parser.add_argument( + '--output', + dest='output', + required=True, + help='Path for output predictions.') + + parser.add_argument( + '--min_batch_size', + dest='min_batch_size', + type=int, + default=1, + help='Minimum batch size for inference. Default: 1') + + parser.add_argument( + '--max_batch_size', + dest='max_batch_size', + type=int, + default=4, + help='Maximum batch size for inference. Default: 4') + + parser.add_argument( + '--max_batch_duration_secs', + dest='max_batch_duration_secs', + type=int, + default=1, + help='Maximum seconds to wait for batch to fill. Default: 1') + + return parser.parse_known_args(argv) + + +def run(argv=None, save_main_session=True): + """Run the TensorRT inference pipeline. + + Args: + argv: Command line arguments + save_main_session: Whether to save main session for pickling + """ + known_args, pipeline_args = parse_known_args(argv) + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + + # Validate arguments + if not known_args.onnx_path and not known_args.engine_path: + raise ValueError("Must provide either --onnx_path or --engine_path") + + if known_args.onnx_path and known_args.engine_path: + raise ValueError( + "Provide only one of --onnx_path or --engine_path, not both") + + # Create handler + if known_args.onnx_path: + logging.info( + "Using ONNX model: %s (will build engine on worker)", + known_args.onnx_path) + handler = TensorRTEngineHandlerNumPy( + min_batch_size=known_args.min_batch_size, + max_batch_size=known_args.max_batch_size, + max_batch_duration_secs=known_args.max_batch_duration_secs, + onnx_path=known_args.onnx_path, + build_on_worker=True, # Key feature: builds in worker environment! + ) + else: + logging.info("Using pre-built engine: %s", known_args.engine_path) + handler = TensorRTEngineHandlerNumPy( + min_batch_size=known_args.min_batch_size, + max_batch_size=known_args.max_batch_size, + max_batch_duration_secs=known_args.max_batch_duration_secs, + engine_path=known_args.engine_path, + ) + + with beam.Pipeline(options=pipeline_options) as p: + predictions = ( + p + | 'ReadImagePaths' >> beam.io.ReadFromText(known_args.input) + | 'LoadAndPreprocess' >> beam.Map(read_image) + | 'ExtractArrays' >> beam.MapTuple(lambda path, arr: (path, arr)) + | 'RunTensorRT' >> RunInference(handler) + | 'FormatOutput' >> beam.ParDo(FormatPrediction())) + + _ = predictions | 'WriteResults' >> beam.io.WriteToText( + known_args.output, + shard_name_template='', + append_trailing_newlines=True) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() diff --git a/sdks/python/apache_beam/ml/inference/tensorrt_inference.py b/sdks/python/apache_beam/ml/inference/tensorrt_inference.py index 0f49489a437a..edc782dbdbeb 100644 --- a/sdks/python/apache_beam/ml/inference/tensorrt_inference.py +++ b/sdks/python/apache_beam/ml/inference/tensorrt_inference.py @@ -25,7 +25,10 @@ from collections.abc import Iterable from collections.abc import Sequence from typing import Any +from typing import Dict +from typing import List from typing import Optional +from typing import Tuple import numpy as np @@ -58,6 +61,42 @@ def _load_engine(engine_path): return engine +def _tensorrt_supports_tensor_api() -> bool: + try: + import tensorrt as trt # noqa: F401 + except Exception: + return False + + engine_reqs = ("num_io_tensors", "get_tensor_name") + ctx_reqs = ("execute_async_v3", "set_input_shape", "set_tensor_address") + return all(hasattr(trt.ICudaEngine, m) for m in engine_reqs) and all( + hasattr(trt.IExecutionContext, m) for m in ctx_reqs) + + +def _require_tensorrt_10() -> None: + if not _tensorrt_supports_tensor_api(): + raise RuntimeError( + "TensorRT 10.x+ required for Tensor API execution on this worker.") + + +def _load_engine_trt10(engine_path): + _require_tensorrt_10() + import tensorrt as trt + + with FileSystems.open(engine_path, 'rb') as f: + blob = f.read() + + logger = trt.Logger(trt.Logger.INFO) + trt.init_libnvinfer_plugins(logger, "") + rt = trt.Runtime(logger) + eng = rt.deserialize_cuda_engine(blob) + if eng is None: + raise RuntimeError( + "Failed to deserialize TensorRT engine. " + "The plan may be corrupt or built with an incompatible TRT.") + return eng + + def _load_onnx(onnx_path): import tensorrt as trt builder = trt.Builder(TRT_LOGGER) @@ -83,6 +122,87 @@ def _build_engine(network, builder): return engine +def _apply_builder_config_args(trt, config, builder_config_args): + if not builder_config_args: + return + memory_pool_limit = builder_config_args.get("memory_pool_limit", None) + if memory_pool_limit is not None: + config.set_memory_pool_limit( + trt.MemoryPoolType.WORKSPACE, int(memory_pool_limit)) + for flag in builder_config_args.get("builder_flags", []): + config.set_flag(flag) + + +def _load_onnx_build_engine_trt10(onnx_path, + builder_config_args: Optional[Dict[str, + Any]]): + if onnx_path.lower().endswith(".engine"): + raise ValueError( + "Provided onnx_path points to .engine; pass it as engine_path instead.") + + _require_tensorrt_10() + import tensorrt as trt + + logger = trt.Logger(trt.Logger.INFO) + trt.init_libnvinfer_plugins(logger, "") + + builder = trt.Builder(logger) + flags = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH) + network = builder.create_network(flags) + parser = trt.OnnxParser(network, logger) + + with FileSystems.open(onnx_path, 'rb') as f: + data = f.read() + + if not parser.parse(data): + LOGGER.error("Failed to parse ONNX: %s", onnx_path) + for i in range(parser.num_errors): + LOGGER.error(parser.get_error(i)) + raise ValueError(f"Failed to parse ONNX: {onnx_path}") + + config = builder.create_builder_config() + config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, 1 << 30) + _apply_builder_config_args(trt, config, builder_config_args) + + if getattr(builder, "platform_has_fast_fp16", False): + config.set_flag(trt.BuilderFlag.FP16) + + if network.num_inputs > 0: + prof = builder.create_optimization_profile() + for i in range(network.num_inputs): + inp = network.get_input(i) + shp = list(inp.shape) + + def _d(v: int, default: int) -> int: + return default if v < 0 else v + + if len(shp) == 4: + min_shape: Tuple[int, ...] = ( + _d(shp[0], 1), _d(shp[1], 3), _d(shp[2], 224), _d(shp[3], 224)) + opt_shape: Tuple[int, ...] = ( + _d(shp[0], 4), _d(shp[1], 3), _d(shp[2], 224), _d(shp[3], 224)) + max_shape: Tuple[int, ...] = ( + _d(shp[0], 8), _d(shp[1], 3), _d(shp[2], 224), _d(shp[3], 224)) + else: + min_shape = tuple(_d(x, 1) for x in shp) + opt_shape = tuple(_d(x, 4 if j == 0 else 1) for j, x in enumerate(shp)) + max_shape = tuple(_d(x, 8 if j == 0 else 1) for j, x in enumerate(shp)) + + prof.set_shape(inp.name, min=min_shape, opt=opt_shape, max=max_shape) + config.add_optimization_profile(prof) + + plan = builder.build_serialized_network(network, config) + if plan is None: + raise RuntimeError( + "build_serialized_network() returned None; check ONNX and profiles.") + + rt = trt.Runtime(logger) + eng = rt.deserialize_cuda_engine(bytes(plan)) + if eng is None: + raise RuntimeError("Failed to deserialize engine after build.") + return eng + + def _assign_or_fail(args): """CUDA error checking.""" from cuda import cuda @@ -98,6 +218,38 @@ def _assign_or_fail(args): return ret +def _resolve_output_shape(shape: Sequence[int] | None, + batch_size: int) -> Tuple[int, ...] | None: + if shape is None: + return None + shp = list(shape) + if shp and shp[0] < 0: + shp[0] = int(batch_size) + if any(d < 0 for d in shp[1:]): + raise RuntimeError(f"Unresolved non-batch dims in output shape: {shape}") + return tuple(shp) + + +def _to_contiguous_batch(x: Sequence[np.ndarray] | np.ndarray) -> np.ndarray: + if isinstance(x, np.ndarray): + return np.ascontiguousarray(x) + if isinstance(x, (list, tuple)): + if len(x) == 1 and isinstance(x[0], np.ndarray): + return np.ascontiguousarray(x[0]) + if all(isinstance(a, np.ndarray) for a in x): + first = x[0].shape + for a in x[1:]: + if len(a.shape) != len(first) or any( + sa != sb for sa, sb in zip(a.shape[1:], first[1:])): + raise ValueError( + "Inconsistent element shapes for concatenation: " + f"{first} vs {a.shape}") + return np.ascontiguousarray(np.concatenate(x, axis=0)) + raise ValueError( + "Batch must be ndarray or sequence of ndarrays of same " + "rank/shape (except batch).") + + class TensorRTEngine: def __init__(self, engine: trt.ICudaEngine): """Implementation of the TensorRTEngine class which handles @@ -172,12 +324,146 @@ def get_engine_attrs(self): self.stream) +class TensorRTEngineTensorApi: + """TRT 10.x engine wrapper using the Tensor API.""" + def __init__(self, engine: Any): + import tensorrt as trt + + self.engine = engine + self.context = engine.create_execution_context() + self.context_lock = threading.RLock() + + self.input_names: List[str] = [] + self.output_names: List[str] = [] + self.dtypes: Dict[str, np.dtype] = {} + self.profile_index = 0 + self._stream = None + self._device_ptrs: Dict[str, int] = {} + self._host_out: Dict[str, np.ndarray] = {} + self._in_nbytes: Optional[int] = None + + for i in range(engine.num_io_tensors): + name = engine.get_tensor_name(i) + mode = engine.get_tensor_mode(name) + if mode == trt.TensorIOMode.INPUT: + self.input_names.append(name) + else: + self.output_names.append(name) + self.dtypes[name] = np.dtype(trt.nptype(engine.get_tensor_dtype(name))) + + @property + def is_trt10(self) -> bool: + return True + + def _ensure_stream(self) -> None: + if self._stream is None: + from cuda import cuda + self._stream = _assign_or_fail(cuda.cuStreamCreate(0)) + + def _free_ptr(self, ptr: int) -> None: + if ptr is None: + return + from cuda import cuda + try: + _assign_or_fail(cuda.cuMemFree(ptr)) + except RuntimeError as exc: + LOGGER.warning('Failed to free CUDA memory pointer: %s', exc) + + def _select_profile(self) -> None: + if hasattr(self.context, "set_optimization_profile_async"): + self._ensure_stream() + self.context.set_optimization_profile_async( + self.profile_index, self._stream) + elif hasattr(self.context, "set_optimization_profile"): + self.context.set_optimization_profile(self.profile_index) + + def _check_shape_in_profile(self, name: str, shape: Sequence[int]) -> None: + mi, _, ma = self.engine.get_tensor_profile_shape(name, self.profile_index) + + def ok(dim: int, lo: int, hi: int) -> bool: + if lo < 0: + lo = dim + if hi < 0: + hi = dim + return lo <= dim <= hi + + if len(shape) != len(mi): + raise RuntimeError( + f"Input '{name}' rank mismatch: given {tuple(shape)}, " + f"profile[{self.profile_index}] min={tuple(mi)} max={tuple(ma)}") + for i, dim in enumerate(shape): + if not ok(dim, mi[i], ma[i]): + raise RuntimeError( + f"Input '{name}' dim {i}={dim} outside " + f"profile[{self.profile_index}] bounds " + f"[min={mi[i]}, max={ma[i]}]. Given={tuple(shape)}, " + f"min={tuple(mi)}, max={tuple(ma)}") + + def ensure_buffers( + self, + batch: np.ndarray, + input_shapes: Optional[Dict[str, Sequence[int]]] = None, + ) -> None: + from cuda import cuda + + self._select_profile() + + shapes: Dict[str, List[int]] = {} + if len(self.input_names) == 1: + shapes[self.input_names[0]] = list(batch.shape) + else: + if not input_shapes: + raise RuntimeError( + f"Engine expects multiple inputs {self.input_names}; " + "provide shapes via " + "inference_args={'input_shapes': {name: shape, ...}}") + for name in self.input_names: + if name not in input_shapes: + raise RuntimeError(f"Missing shape for input tensor '{name}'") + shapes[name] = list(map(int, input_shapes[name])) + + for name, shp in shapes.items(): + self._check_shape_in_profile(name, shp) + self.context.set_input_shape(name, shp) + + in_name = self.input_names[0] + in_dtype = self.dtypes[in_name] + in_nbytes = int(np.prod(shapes[in_name])) * in_dtype.itemsize + if self._device_ptrs.get(in_name) is None or self._in_nbytes != in_nbytes: + if self._device_ptrs.get(in_name) is not None: + self._free_ptr(self._device_ptrs[in_name]) + self._device_ptrs[in_name] = _assign_or_fail(cuda.cuMemAlloc(in_nbytes)) + self._in_nbytes = in_nbytes + + batch_size = shapes[in_name][0] + for name in self.output_names: + dtype = self.dtypes[name] + raw_shape = list(self.context.get_tensor_shape(name)) + shape = _resolve_output_shape(raw_shape, batch_size) + if shape is None: + raise RuntimeError(f"Context returned None shape for output '{name}'") + nbytes = int(np.prod(shape)) * dtype.itemsize + need_new = ( + self._device_ptrs.get(name) is None or + self._host_out.get(name) is None or + self._host_out[name].nbytes != nbytes) + if need_new: + if self._device_ptrs.get(name) is not None: + self._free_ptr(self._device_ptrs[name]) + self._device_ptrs[name] = _assign_or_fail(cuda.cuMemAlloc(nbytes)) + self._host_out[name] = np.empty(shape, dtype=dtype) + + self.context.set_tensor_address(in_name, int(self._device_ptrs[in_name])) + for name in self.output_names: + self.context.set_tensor_address(name, int(self._device_ptrs[name])) + + TensorRTInferenceFn = Callable[ - [Sequence[np.ndarray], TensorRTEngine, Optional[dict[str, Any]]], + [Sequence[np.ndarray], Any, Optional[dict[str, Any]]], Iterable[PredictionResult]] -def _default_tensorRT_inference_fn( +def _legacy_tensorRT_inference_fn( batch: Sequence[np.ndarray], engine: TensorRTEngine, inference_args: Optional[dict[str, @@ -218,9 +504,76 @@ def _default_tensorRT_inference_fn( return utils._convert_to_result(batch, predictions) +def _trt10_inference_fn( + batch: Sequence[np.ndarray] | np.ndarray, + engine_obj: TensorRTEngineTensorApi, + inference_args: Optional[dict[str, Any]] = None, +) -> Iterable[PredictionResult]: + from cuda import cuda + + batch_arr = _to_contiguous_batch(batch) + + input_shapes = None + if inference_args: + if "profile_index" in inference_args: + engine_obj.profile_index = int(inference_args["profile_index"]) + input_shapes = inference_args.get("input_shapes", None) + + ctx = engine_obj.context + with engine_obj.context_lock: + engine_obj._ensure_stream() + engine_obj.ensure_buffers(batch_arr, input_shapes) + + in_name = engine_obj.input_names[0] + _assign_or_fail( + cuda.cuMemcpyHtoDAsync( + engine_obj._device_ptrs[in_name], + batch_arr.ctypes.data, + batch_arr.nbytes, + engine_obj._stream, + )) + + ok = ctx.execute_async_v3(engine_obj._stream) + if not ok: + eng = engine_obj.engine + mi, oi, ma = eng.get_tensor_profile_shape( + in_name, engine_obj.profile_index) + raise RuntimeError( + "TensorRT execute_async_v3 failed. " + f"Batch shape={tuple(batch_arr.shape)}; " + f"profile[{engine_obj.profile_index}] {in_name} " + f"min={tuple(mi)} opt={tuple(oi)} max={tuple(ma)}") + + for name in engine_obj.output_names: + host = engine_obj._host_out[name] + _assign_or_fail( + cuda.cuMemcpyDtoHAsync( + host.ctypes.data, + engine_obj._device_ptrs[name], + host.nbytes, + engine_obj._stream, + )) + _assign_or_fail(cuda.cuStreamSynchronize(engine_obj._stream)) + + outs = [engine_obj._host_out[name] for name in engine_obj.output_names] + + per_item = [[o[i] for o in outs] for i in range(batch_arr.shape[0])] + return utils._convert_to_result(batch_arr, per_item) + + +def _default_tensorRT_inference_fn( + batch: Sequence[np.ndarray], + engine: Any, + inference_args: Optional[dict[str, Any]] = None +) -> Iterable[PredictionResult]: + if isinstance(engine, TensorRTEngineTensorApi): + return _trt10_inference_fn(batch, engine, inference_args) + return _legacy_tensorRT_inference_fn(batch, engine, inference_args) + + class TensorRTEngineHandlerNumPy(ModelHandler[np.ndarray, PredictionResult, - TensorRTEngine]): + Any]): def __init__( self, min_batch_size: int, @@ -230,6 +583,8 @@ def __init__( large_model: bool = False, model_copies: Optional[int] = None, max_batch_duration_secs: Optional[int] = None, + build_on_worker: bool = False, + onnx_builder_config_args: Optional[Dict[str, Any]] = None, **kwargs): """Implementation of the ModelHandler interface for TensorRT. @@ -258,6 +613,10 @@ def __init__( GPU capacity and want to maximize resource utilization. max_batch_duration_secs: the maximum amount of time to buffer a batch before emitting; used in streaming contexts. + build_on_worker: if True and onnx_path is supplied, build a TensorRT + engine on the worker and use it for RunInference. + onnx_builder_config_args: optional configuration overrides applied to the + TensorRT builder config when building from ONNX on worker. kwargs: Additional arguments like 'engine_path' and 'onnx_path' are currently supported. 'env_vars' can be used to set environment variables before loading the model. @@ -269,6 +628,8 @@ def __init__( self.max_batch_size = max_batch_size self.max_batch_duration_secs = max_batch_duration_secs self.inference_fn = inference_fn + self.build_on_worker = build_on_worker + self.onnx_builder_config_args = onnx_builder_config_args or {} if 'engine_path' in kwargs: self.engine_path = kwargs.get('engine_path') elif 'onnx_path' in kwargs: @@ -285,10 +646,30 @@ def batch_elements_kwargs(self): 'max_batch_duration_secs': self.max_batch_duration_secs } - def load_model(self) -> TensorRTEngine: + def load_model(self) -> Any: """Loads and initializes a TensorRT engine for processing.""" - engine = _load_engine(self.engine_path) - return TensorRTEngine(engine) + if hasattr(self, 'engine_path'): + if _tensorrt_supports_tensor_api(): + engine = _load_engine_trt10(self.engine_path) + if hasattr(engine, "num_io_tensors"): + return TensorRTEngineTensorApi(engine) + engine = _load_engine(self.engine_path) + return TensorRTEngine(engine) + + if hasattr(self, 'onnx_path'): + if not self.build_on_worker: + raise ValueError( + "onnx_path provided but build_on_worker=False; " + "use load_onnx/build_engine directly or set build_on_worker=True.") + if _tensorrt_supports_tensor_api(): + engine = _load_onnx_build_engine_trt10( + self.onnx_path, self.onnx_builder_config_args) + return TensorRTEngineTensorApi(engine) + network, builder = _load_onnx(self.onnx_path) + engine = _build_engine(network, builder) + return TensorRTEngine(engine) + + raise ValueError("Expected engine_path or onnx_path to load TensorRT model.") def load_onnx(self) -> tuple[trt.INetworkDefinition, trt.Builder]: """Loads and parses an onnx model for processing.""" @@ -296,9 +677,11 @@ def load_onnx(self) -> tuple[trt.INetworkDefinition, trt.Builder]: def build_engine( self, network: trt.INetworkDefinition, - builder: trt.Builder) -> TensorRTEngine: + builder: trt.Builder) -> Any: """Build an engine according to parsed/created network.""" engine = _build_engine(network, builder) + if _tensorrt_supports_tensor_api() and hasattr(engine, "num_io_tensors"): + return TensorRTEngineTensorApi(engine) return TensorRTEngine(engine) def run_inference( diff --git a/sdks/python/apache_beam/ml/inference/tensorrt_inference_test.py b/sdks/python/apache_beam/ml/inference/tensorrt_inference_test.py index cb010e82cfca..d4972cf42eed 100644 --- a/sdks/python/apache_beam/ml/inference/tensorrt_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/tensorrt_inference_test.py @@ -35,7 +35,7 @@ from apache_beam.ml.inference import utils from apache_beam.ml.inference.base import PredictionResult, RunInference from apache_beam.ml.inference.tensorrt_inference import \ - TensorRTEngineHandlerNumPy + TensorRTEngineHandlerNumPy, TensorRTEngineTensorApi except ImportError: raise unittest.SkipTest('TensorRT dependencies are not installed') @@ -83,6 +83,47 @@ ] +def _tensorrt_has_tensor_api() -> bool: + engine_reqs = ("num_io_tensors", "get_tensor_name") + ctx_reqs = ("execute_async_v3", "set_input_shape", "set_tensor_address") + return all(hasattr(trt.ICudaEngine, m) for m in engine_reqs) and all( + hasattr(trt.IExecutionContext, m) for m in ctx_reqs) + + +def _build_simple_onnx_model(input_size=1, output_path=None): + """Build a simple ONNX model for testing: y = 2x + 0.5.""" + try: + from onnx import TensorProto, helper + except ImportError: + raise unittest.SkipTest('ONNX dependencies are not installed') + + input_tensor = helper.make_tensor_value_info( + 'input', TensorProto.FLOAT, [None, input_size]) + output_tensor = helper.make_tensor_value_info( + 'output', TensorProto.FLOAT, [None, input_size]) + + weight_init = helper.make_tensor( + 'weight', + TensorProto.FLOAT, [input_size, input_size], + [2.0] * (input_size * input_size)) + bias_init = helper.make_tensor( + 'bias', TensorProto.FLOAT, [input_size], [0.5] * input_size) + + matmul_node = helper.make_node('MatMul', ['input', 'weight'], ['matmul_out']) + add_node = helper.make_node('Add', ['matmul_out', 'bias'], ['output']) + + graph = helper.make_graph([matmul_node, add_node], + 'simple_linear', [input_tensor], [output_tensor], + [weight_init, bias_init]) + model = helper.make_model(graph, producer_name='trt_test') + model.opset_import[0].version = 13 + + if output_path: + with open(output_path, 'wb') as f: + f.write(model.SerializeToString()) + return model + + def _compare_prediction_result(a, b): return ((a.example == b.example).all() and all( np.array_equal(actual, expected) @@ -287,6 +328,31 @@ def test_inference_multiple_tensor_features(self): for actual, expected in zip(predictions, TWO_FEATURES_PREDICTIONS): self.assertTrue(_compare_prediction_result(actual, expected)) + @unittest.skipIf( + not _tensorrt_has_tensor_api(), 'TensorRT 10.x Tensor API not available') + def test_build_on_worker_onnx_trt10(self): + """Test TRT10 auto-selection with build_on_worker using local ONNX.""" + import tempfile + + with tempfile.TemporaryDirectory() as tmpdir: + onnx_path = os.path.join(tmpdir, 'test_model.onnx') + _build_simple_onnx_model(input_size=1, output_path=onnx_path) + + inference_runner = TensorRTEngineHandlerNumPy( + min_batch_size=1, + max_batch_size=4, + onnx_path=onnx_path, + build_on_worker=True) + engine = inference_runner.load_model() + self.assertIsInstance(engine, TensorRTEngineTensorApi) + + batch = np.array([[1.0], [2.0], [3.0], [4.0]], dtype=np.float32) + predictions = list(inference_runner.run_inference(batch, engine)) + self.assertEqual(len(predictions), 4) + for i, pred in enumerate(predictions): + expected = batch[i][0] * 2.0 + 0.5 + np.testing.assert_allclose(pred.inference[0], [expected], rtol=1e-5) + @unittest.skipIf(GCSFileSystem is None, 'GCP dependencies are not installed') def test_inference_single_tensor_feature_built_engine(self): """ diff --git a/sdks/python/apache_beam/ml/inference/trt_handler_numpy_compact.py b/sdks/python/apache_beam/ml/inference/trt_handler_numpy_compact.py new file mode 100644 index 000000000000..3e433f0cd200 --- /dev/null +++ b/sdks/python/apache_beam/ml/inference/trt_handler_numpy_compact.py @@ -0,0 +1,532 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import logging +import numpy as np +import threading +from typing import Any, Dict, Iterable, List, Optional, Sequence, Tuple + +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference import utils +from apache_beam.ml.inference.base import ModelHandler, PredictionResult + +LOGGER = logging.getLogger("TensorRTEngineHandlerNumPy_TRT10") + +__all__ = [ + "TensorRTEngine", + "TensorRTEngineHandlerNumPy", +] + + +# --------------------------------------------------------------------- +# CUDA / TensorRT helpers +# --------------------------------------------------------------------- +def _assign_or_fail(args): + """CUDA error checking for cuda-python (Driver API).""" + from cuda import cuda # lazy import to avoid submit-time dependency + + err, *ret = args + if isinstance(err, cuda.CUresult): + if err != cuda.CUresult.CUDA_SUCCESS: + raise RuntimeError(f"CUDA error: {err}") + else: + raise RuntimeError(f"Unknown CUDA error type: {err}") + return ret[0] if len(ret) == 1 else tuple(ret) + + +def _require_tensorrt_10() -> None: + """Assert that TensorRT 10.x Tensor API is available on this worker.""" + try: + import tensorrt as trt # noqa: F401 + except Exception as e: # pragma: no cover + raise RuntimeError("TensorRT is not installed on this worker.") from e + + # TRT 10.x indicators: + # - Engine exposes the Tensor API (num_io_tensors / get_tensor_name) + # - ExecutionContext exposes execute_async_v3 / set_input_shape / + # set_tensor_address + engine_reqs = ("num_io_tensors", "get_tensor_name") + ctx_reqs = ("execute_async_v3", "set_input_shape", "set_tensor_address") + + missing_engine = [m for m in engine_reqs if not hasattr(trt.ICudaEngine, m)] + missing_ctx = [m for m in ctx_reqs if not hasattr(trt.IExecutionContext, m)] + + if missing_engine or missing_ctx: + raise RuntimeError( + "This handler requires TensorRT 10.x+. " + f"Missing on ICudaEngine: {missing_engine or 'OK'}, " + f"Missing on IExecutionContext: {missing_ctx or 'OK'}") + + +# --------------------------------------------------------------------- +# Engine load / build (TRT 10) +# --------------------------------------------------------------------- +def _load_engine(engine_path: str): + """Deserialize a .engine (plan) from FileSystems into a TRT engine.""" + _require_tensorrt_10() + + import tensorrt as trt + + with FileSystems.open(engine_path, "rb") as f: + blob = f.read() + + logger = trt.Logger(trt.Logger.INFO) + trt.init_libnvinfer_plugins(logger, "") + rt = trt.Runtime(logger) + eng = rt.deserialize_cuda_engine(blob) + if eng is None: + raise RuntimeError( + "Failed to deserialize TensorRT engine. " + "The plan may be corrupt or built with an incompatible TRT.") + return eng + + +def _load_onnx_build_engine(onnx_path: str): + """Parse ONNX and build a TRT engine immediately (Tensor API).""" + if onnx_path.lower().endswith(".engine"): + raise ValueError( + "Provided onnx_path points to .engine; pass it as engine_path instead.") + + _require_tensorrt_10() + import tensorrt as trt + + logger = trt.Logger(trt.Logger.INFO) + trt.init_libnvinfer_plugins(logger, "") + + builder = trt.Builder(logger) + flags = 1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH) + network = builder.create_network(flags) + parser = trt.OnnxParser(network, logger) + + with FileSystems.open(onnx_path, "rb") as f: + data = f.read() + + if not parser.parse(data): + LOGGER.error("Failed to parse ONNX: %s", onnx_path) + for i in range(parser.num_errors): + LOGGER.error(parser.get_error(i)) + raise ValueError(f"Failed to parse ONNX: {onnx_path}") + + config = builder.create_builder_config() + # Workbench: ~1GiB workspace (tune for your model/infra) + config.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, 1 << 30) + + if getattr(builder, "platform_has_fast_fp16", False): + config.set_flag(trt.BuilderFlag.FP16) + + # Generic optimization profile for dynamic inputs. + if network.num_inputs > 0: + prof = builder.create_optimization_profile() + for i in range(network.num_inputs): + inp = network.get_input(i) + shp = list(inp.shape) + + def _d(v: int, default: int) -> int: + return default if v < 0 else v + + if len(shp) == 4: + # Assume NCHW; supply defaults where dims are dynamic. + min_shape: Tuple[int, ...] = ( + _d(shp[0], 1), _d(shp[1], 3), _d(shp[2], 224), _d(shp[3], 224)) + opt_shape: Tuple[int, ...] = ( + _d(shp[0], 4), _d(shp[1], 3), _d(shp[2], 224), _d(shp[3], 224)) + max_shape: Tuple[int, ...] = ( + _d(shp[0], 8), _d(shp[1], 3), _d(shp[2], 224), _d(shp[3], 224)) + else: + # Fallback: make batch dynamic, keep others as-is or 1. + min_shape = tuple(_d(x, 1) for x in shp) + opt_shape = tuple(_d(x, 4 if j == 0 else 1) for j, x in enumerate(shp)) + max_shape = tuple(_d(x, 8 if j == 0 else 1) for j, x in enumerate(shp)) + + prof.set_shape(inp.name, min=min_shape, opt=opt_shape, max=max_shape) + + config.add_optimization_profile(prof) + + plan = builder.build_serialized_network(network, config) + if plan is None: + raise RuntimeError( + "build_serialized_network() returned None; check ONNX and profiles.") + + rt = trt.Runtime(logger) + eng = rt.deserialize_cuda_engine(bytes(plan)) + if eng is None: + raise RuntimeError("Failed to deserialize engine after build.") + return eng + + +# --------------------------------------------------------------------- +# Shape & batch helpers +# --------------------------------------------------------------------- +def _resolve_output_shape(shape: Sequence[int] | None, + batch_size: int) -> Tuple[int, ...] | None: + """Replace a leading -1 (batch) with batch_size; any other -1 is an error.""" + if shape is None: + return None + shp = list(shape) + if shp and shp[0] < 0: + shp[0] = int(batch_size) + if any(d < 0 for d in shp[1:]): + raise RuntimeError(f"Unresolved non-batch dims in output shape: {shape}") + return tuple(shp) + + +def _to_contiguous_batch(x: Sequence[np.ndarray] | np.ndarray) -> np.ndarray: + """Accept either an ndarray (already a batch) or a list of ndarrays. + + Concatenates on axis 0. This avoids accidental rank-5 shapes from + upstream batching. + """ + if isinstance(x, np.ndarray): + return np.ascontiguousarray(x) + + if isinstance(x, (list, tuple)): + if len(x) == 1 and isinstance(x[0], np.ndarray): + return np.ascontiguousarray(x[0]) + + if all(isinstance(a, np.ndarray) for a in x): + first = x[0].shape + for a in x[1:]: + if len(a.shape) != len(first) or any( + sa != sb for sa, sb in zip(a.shape[1:], first[1:])): + raise ValueError( + f"Inconsistent element shapes for concatenation: " + f"{first} vs {a.shape}") + return np.ascontiguousarray(np.concatenate(x, axis=0)) + + raise ValueError( + "Batch must be ndarray or sequence of ndarrays of same " + "rank/shape (except batch).") + + +# --------------------------------------------------------------------- +# TRT 10.x engine wrapper (Tensor API only) +# --------------------------------------------------------------------- +class TensorRTEngine: + """TRT 10.x engine wrapper using the Tensor API.""" + def __init__(self, engine: Any): + import tensorrt as trt + + self.engine = engine + self.context = engine.create_execution_context() + self.context_lock = threading.RLock() + + # Tensor API enumeration + self.input_names: List[str] = [] + self.output_names: List[str] = [] + self.dtypes: Dict[str, np.dtype] = {} + + for i in range(engine.num_io_tensors): + name = engine.get_tensor_name(i) + mode = engine.get_tensor_mode(name) + if mode == trt.TensorIOMode.INPUT: + self.input_names.append(name) + else: + self.output_names.append(name) + self.dtypes[name] = np.dtype(trt.nptype(engine.get_tensor_dtype(name))) + + # Lazy allocations + self._device_ptrs: Dict[str, int] = {} # tensor name -> CUdeviceptr + self._host_out: Dict[str, np.ndarray] = {} + self._in_nbytes: int = 0 + self._stream: Optional[int] = None + self.profile_index: int = 0 + + def _ensure_stream(self) -> None: + if self._stream is None: + from cuda import cuda + self._stream = _assign_or_fail(cuda.cuStreamCreate(0)) + + def _free_ptr(self, ptr: Optional[int]) -> None: + if not ptr: + return + from cuda import cuda + try: + _assign_or_fail(cuda.cuMemFree(ptr)) + except Exception: + pass + + def _select_profile(self) -> None: + # Pick optimization profile (sync or async depending on TRT) + if hasattr(self.context, "set_optimization_profile_async"): + self._ensure_stream() + self.context.set_optimization_profile_async( + self.profile_index, self._stream) + elif hasattr(self.context, "set_optimization_profile"): + self.context.set_optimization_profile(self.profile_index) + + def _check_shape_in_profile(self, name: str, shape: Sequence[int]) -> None: + mi, _, ma = self.engine.get_tensor_profile_shape(name, self.profile_index) + + def ok(dim: int, lo: int, hi: int) -> bool: + if lo < 0: + lo = dim + if hi < 0: + hi = dim + return lo <= dim <= hi + + if len(shape) != len(mi): + raise RuntimeError( + f"Input '{name}' rank mismatch: given {tuple(shape)}, " + f"profile[{self.profile_index}] min={tuple(mi)} max={tuple(ma)}") + for i, dim in enumerate(shape): + if not ok(dim, mi[i], ma[i]): + raise RuntimeError( + f"Input '{name}' dim {i}={dim} outside " + f"profile[{self.profile_index}] bounds " + f"[min={mi[i]}, max={ma[i]}]. Given={tuple(shape)}, " + f"min={tuple(mi)}, max={tuple(ma)}") + + def ensure_buffers( + self, + batch: np.ndarray, + input_shapes: Optional[Dict[str, Sequence[int]]] = None, + ) -> None: + """ + Validate shapes, set input shapes, (re)allocate device + host buffers, + and set tensor addresses for Tensor API execution. + """ + from cuda import cuda + + self._select_profile() + + # Derive shapes for inputs + shapes: Dict[str, List[int]] = {} + if len(self.input_names) == 1: + shapes[self.input_names[0]] = list(batch.shape) + else: + if not input_shapes: + raise RuntimeError( + f"Engine expects multiple inputs {self.input_names}; " + "provide shapes via " + "inference_args={'input_shapes': {name: shape, ...}}") + for name in self.input_names: + if name not in input_shapes: + raise RuntimeError(f"Missing shape for input tensor '{name}'") + shapes[name] = list(map(int, input_shapes[name])) + + # Validate and set shapes + for name, shp in shapes.items(): + self._check_shape_in_profile(name, shp) + self.context.set_input_shape(name, shp) + + # Allocate first input device buffer (we copy only this from 'batch') + in_name = self.input_names[0] + in_dtype = self.dtypes[in_name] + in_nbytes = int(np.prod(shapes[in_name])) * in_dtype.itemsize + if self._device_ptrs.get(in_name) is None or self._in_nbytes != in_nbytes: + if self._device_ptrs.get(in_name) is not None: + self._free_ptr(self._device_ptrs[in_name]) + self._device_ptrs[in_name] = _assign_or_fail(cuda.cuMemAlloc(in_nbytes)) + self._in_nbytes = in_nbytes + + # Outputs + batch_size = shapes[in_name][0] + for name in self.output_names: + dtype = self.dtypes[name] + raw_shape = list(self.context.get_tensor_shape(name)) + shape = _resolve_output_shape(raw_shape, batch_size) + if shape is None: + raise RuntimeError(f"Context returned None shape for output '{name}'") + nbytes = int(np.prod(shape)) * dtype.itemsize + need_new = ( + self._device_ptrs.get(name) is None or + self._host_out.get(name) is None or + self._host_out[name].nbytes != nbytes) + if need_new: + if self._device_ptrs.get(name) is not None: + self._free_ptr(self._device_ptrs[name]) + self._device_ptrs[name] = _assign_or_fail(cuda.cuMemAlloc(nbytes)) + self._host_out[name] = np.empty(shape, dtype=dtype) + + # Set tensor addresses + self.context.set_tensor_address(in_name, int(self._device_ptrs[in_name])) + for name in self.output_names: + self.context.set_tensor_address(name, int(self._device_ptrs[name])) + + +# --------------------------------------------------------------------- +# Inference function (TRT 10) +# --------------------------------------------------------------------- +def _trt10_inference_fn( + batch: Sequence[np.ndarray] | np.ndarray, + engine_obj: TensorRTEngine, + inference_args: Optional[dict[str, Any]] = None, +) -> Iterable[PredictionResult]: + """Default inference fn using TensorRT 10 Tensor API.""" + from cuda import cuda + + # Normalize batch to contiguous ndarray (NCHW or whatever the model expects) + batch_arr = _to_contiguous_batch(batch) + + # Optional args + input_shapes = None + if inference_args: + if "profile_index" in inference_args: + engine_obj.profile_index = int(inference_args["profile_index"]) + input_shapes = inference_args.get("input_shapes", None) + + ctx = engine_obj.context + with engine_obj.context_lock: + engine_obj._ensure_stream() + engine_obj.ensure_buffers(batch_arr, input_shapes) + + # HtoD: first input buffer + in_name = engine_obj.input_names[0] + _assign_or_fail( + cuda.cuMemcpyHtoDAsync( + engine_obj._device_ptrs[in_name], + batch_arr.ctypes.data, + batch_arr.nbytes, + engine_obj._stream, + )) + + ok = ctx.execute_async_v3(engine_obj._stream) + if not ok: + eng = engine_obj.engine + mi, oi, ma = eng.get_tensor_profile_shape( + in_name, engine_obj.profile_index) + raise RuntimeError( + "TensorRT execute_async_v3 failed. " + f"Batch shape={tuple(batch_arr.shape)}; " + f"profile[{engine_obj.profile_index}] {in_name} " + f"min={tuple(mi)} opt={tuple(oi)} max={tuple(ma)}") + + # DtoH outputs + for name in engine_obj.output_names: + host = engine_obj._host_out[name] + _assign_or_fail( + cuda.cuMemcpyDtoHAsync( + host.ctypes.data, + engine_obj._device_ptrs[name], + host.nbytes, + engine_obj._stream, + )) + _assign_or_fail(cuda.cuStreamSynchronize(engine_obj._stream)) + + outs = [engine_obj._host_out[name] for name in engine_obj.output_names] + + # One PredictionResult per item + per_item = [[o[i] for o in outs] for i in range(batch_arr.shape[0])] + return utils._convert_to_result(batch_arr, per_item) + + +# --------------------------------------------------------------------- +# Beam ModelHandler (TRT 10 only) +# --------------------------------------------------------------------- +class TensorRTEngineHandlerNumPy(ModelHandler[np.ndarray, + PredictionResult, + TensorRTEngine]): + """Beam ModelHandler pinned to TensorRT 10.x Tensor API. + + Provide exactly one of: + - engine_path: path to a serialized TensorRT plan (.engine) + - onnx_path: path to an ONNX file (requires build_on_worker=True) + """ + def __init__( + self, + min_batch_size: int, + max_batch_size: int, + *, + engine_path: Optional[str] = None, + onnx_path: Optional[str] = None, + build_on_worker: bool = False, # only used if onnx_path is given + inference_fn=_trt10_inference_fn, + large_model: bool = False, + model_copies: Optional[int] = None, + max_batch_duration_secs: Optional[int] = None, + env_vars: Optional[Dict[str, str]] = None, + ): + if engine_path and onnx_path: + raise ValueError( + "Provide only one of engine_path or onnx_path, not both.") + if not engine_path and not onnx_path: + raise ValueError("Provide engine_path (.engine) or onnx_path (.onnx).") + if engine_path and not engine_path.lower().endswith(".engine"): + raise ValueError(f"engine_path must end with .engine, got: {engine_path}") + if onnx_path and onnx_path.lower().endswith(".engine"): + raise ValueError( + f"onnx_path points to .engine: {onnx_path}. Use engine_path instead.") + + self.min_batch_size = int(min_batch_size) + self.max_batch_size = int(max_batch_size) + self.max_batch_duration_secs = max_batch_duration_secs + self.inference_fn = inference_fn + + self.engine_path = engine_path + self.onnx_path = onnx_path + self.build_on_worker = bool(build_on_worker) + self._env_vars = env_vars or {} + + self._share_across_processes = bool( + large_model or (model_copies is not None)) + self._model_copies = int(model_copies or 1) + + # --- ModelHandler API ------------------------------------------------- + + def batch_elements_kwargs(self) -> Dict[str, Any]: + return { + "min_batch_size": self.min_batch_size, + "max_batch_size": self.max_batch_size, + "max_batch_duration_secs": self.max_batch_duration_secs, + } + + def load_model(self) -> TensorRTEngine: + # Ensure environment variables are set before touching TRT + import os + + for k, v in self._env_vars.items(): + os.environ[str(k)] = str(v) + + if self.engine_path: + eng = _load_engine(self.engine_path) + return TensorRTEngine(eng) + + if not self.build_on_worker: + raise RuntimeError( + "onnx_path provided but build_on_worker=False. " + "Enable build_on_worker=True to compile ONNX on workers, " + "or prebuild an engine.") + + eng = _load_onnx_build_engine(self.onnx_path) # type: ignore[arg-type] + return TensorRTEngine(eng) + + def run_inference( + self, + batch: Sequence[np.ndarray] | np.ndarray, + model: TensorRTEngine, + inference_args: Optional[dict[str, Any]] = None, + ) -> Iterable[PredictionResult]: + return self.inference_fn(batch, model, inference_args) + + def get_num_bytes(self, batch: Sequence[np.ndarray] | np.ndarray) -> int: + if isinstance(batch, np.ndarray): + return int(batch.nbytes) + if isinstance(batch, (list, tuple)) and all(isinstance(a, np.ndarray) + for a in batch): + return int(sum(a.nbytes for a in batch)) + arr = np.asarray(batch) + return int(arr.nbytes) + + def get_metrics_namespace(self) -> str: + return "BeamML_TensorRT10" + + def share_model_across_processes(self) -> bool: + return self._share_across_processes + + def model_copies(self) -> int: + return self._model_copies diff --git a/sdks/python/apache_beam/ml/inference/trt_handler_numpy_compact_test.py b/sdks/python/apache_beam/ml/inference/trt_handler_numpy_compact_test.py new file mode 100644 index 000000000000..839807c040de --- /dev/null +++ b/sdks/python/apache_beam/ml/inference/trt_handler_numpy_compact_test.py @@ -0,0 +1,332 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# pytype: skip-file + +import os +import tempfile +import unittest + +import numpy as np +import pytest + +import apache_beam as beam +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that + +# Protect against environments where TensorRT python library is not available. +# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports +try: + from apache_beam.ml.inference.base import PredictionResult, RunInference + from apache_beam.ml.inference.trt_handler_numpy_compact import ( + TensorRTEngine, TensorRTEngineHandlerNumPy) +except ImportError: + raise unittest.SkipTest('TensorRT 10.x dependencies are not installed') + + +# Check if TensorRT is actually available (not just importable) +def _is_tensorrt_available(): + """Check if TensorRT can be imported and used.""" + try: + import tensorrt as trt # noqa: F401 pylint: disable=unused-import + return True + except (ImportError, ModuleNotFoundError): + return False + + +TENSORRT_AVAILABLE = _is_tensorrt_available() + +# Test data +SINGLE_FEATURE_EXAMPLES = np.array([[1.0], [5.0], [-3.0], [10.0]], + dtype=np.float32) + +SINGLE_FEATURE_PREDICTIONS = [ + PredictionResult( + SINGLE_FEATURE_EXAMPLES[i], [ + np.array([SINGLE_FEATURE_EXAMPLES[i][0] * 2.0 + 0.5], + dtype=np.float32) + ]) for i in range(len(SINGLE_FEATURE_EXAMPLES)) +] + +TWO_FEATURES_EXAMPLES = np.array([[1, 5], [3, 10], [-14, 0], [0.5, 0.5]], + dtype=np.float32) + +TWO_FEATURES_PREDICTIONS = [ + PredictionResult( + TWO_FEATURES_EXAMPLES[i], + [ + np.array([ + TWO_FEATURES_EXAMPLES[i][0] * 2.0 + + TWO_FEATURES_EXAMPLES[i][1] * 3 + 0.5 + ], + dtype=np.float32) + ]) for i in range(len(TWO_FEATURES_EXAMPLES)) +] + + +def _compare_prediction_result(a, b): + """Compare two PredictionResult objects.""" + return ((a.example == b.example).all() and all( + np.array_equal(actual, expected) + for actual, expected in zip(a.inference, b.inference))) + + +def _build_simple_onnx_model(input_size=1, output_path=None): + """Build a simple ONNX model for testing: y = 2x + 0.5""" + try: + from onnx import TensorProto, helper + except ImportError: + raise unittest.SkipTest('ONNX dependencies are not installed') + + # Create a simple linear model: y = 2*x + 0.5 + input_tensor = helper.make_tensor_value_info( + 'input', TensorProto.FLOAT, [None, input_size]) + output_tensor = helper.make_tensor_value_info( + 'output', TensorProto.FLOAT, [None, input_size]) + + # Weight tensor: 2.0 + weight_init = helper.make_tensor( + 'weight', + TensorProto.FLOAT, [input_size, input_size], + [2.0] * (input_size * input_size)) + # Bias tensor: 0.5 + bias_init = helper.make_tensor( + 'bias', TensorProto.FLOAT, [input_size], [0.5] * input_size) + + # MatMul node + matmul_node = helper.make_node('MatMul', ['input', 'weight'], ['matmul_out']) + # Add node + add_node = helper.make_node('Add', ['matmul_out', 'bias'], ['output']) + + # Create graph + graph = helper.make_graph([matmul_node, add_node], + 'simple_linear', [input_tensor], [output_tensor], + [weight_init, bias_init]) + + # Create model + model = helper.make_model(graph, producer_name='trt_test') + model.opset_import[0].version = 13 + + if output_path: + with open(output_path, 'wb') as f: + f.write(model.SerializeToString()) + + return model + + +@pytest.mark.uses_tensorrt +class TensorRTEngineHandlerNumPyTest(unittest.TestCase): + """Tests for TensorRTEngineHandlerNumPy with TensorRT 10.x Tensor API.""" + def test_handler_initialization(self): + """Test that handler initializes correctly with required parameters.""" + handler = TensorRTEngineHandlerNumPy( + min_batch_size=1, max_batch_size=4, engine_path='/tmp/test.engine') + self.assertEqual(handler.min_batch_size, 1) + self.assertEqual(handler.max_batch_size, 4) + self.assertEqual(handler.engine_path, '/tmp/test.engine') + + def test_handler_initialization_both_paths_error(self): + """Test that providing both engine_path and onnx_path raises an error.""" + with self.assertRaises(ValueError): + TensorRTEngineHandlerNumPy( + min_batch_size=1, + max_batch_size=4, + engine_path='/tmp/test.engine', + onnx_path='/tmp/test.onnx') + + def test_handler_initialization_no_path_error(self): + """Test that providing neither engine_path nor onnx_path raises an error.""" + with self.assertRaises(ValueError): + TensorRTEngineHandlerNumPy(min_batch_size=1, max_batch_size=4) + + def test_handler_initialization_invalid_engine_path(self): + """Test that providing a non-.engine path raises an error.""" + with self.assertRaises(ValueError): + TensorRTEngineHandlerNumPy( + min_batch_size=1, max_batch_size=4, engine_path='/tmp/test.onnx') + + def test_handler_initialization_invalid_onnx_path(self): + """Test that providing a .engine path as onnx_path raises an error.""" + with self.assertRaises(ValueError): + TensorRTEngineHandlerNumPy( + min_batch_size=1, max_batch_size=4, onnx_path='/tmp/test.engine') + + def test_batch_elements_kwargs(self): + """Test that batch_elements_kwargs returns correct values.""" + handler = TensorRTEngineHandlerNumPy( + min_batch_size=2, + max_batch_size=8, + max_batch_duration_secs=10, + engine_path='/tmp/test.engine') + kwargs = handler.batch_elements_kwargs() + self.assertEqual(kwargs['min_batch_size'], 2) + self.assertEqual(kwargs['max_batch_size'], 8) + self.assertEqual(kwargs['max_batch_duration_secs'], 10) + + def test_get_num_bytes_ndarray(self): + """Test get_num_bytes with numpy ndarray.""" + handler = TensorRTEngineHandlerNumPy( + min_batch_size=1, max_batch_size=4, engine_path='/tmp/test.engine') + batch = np.array([[1, 2], [3, 4]], dtype=np.float32) + num_bytes = handler.get_num_bytes(batch) + self.assertEqual(num_bytes, batch.nbytes) + + def test_get_num_bytes_list(self): + """Test get_num_bytes with list of ndarrays.""" + handler = TensorRTEngineHandlerNumPy( + min_batch_size=1, max_batch_size=4, engine_path='/tmp/test.engine') + batch = [ + np.array([1, 2], dtype=np.float32), np.array([3, 4], dtype=np.float32) + ] + num_bytes = handler.get_num_bytes(batch) + expected = sum(a.nbytes for a in batch) + self.assertEqual(num_bytes, expected) + + def test_get_metrics_namespace(self): + """Test that metrics namespace is correct.""" + handler = TensorRTEngineHandlerNumPy( + min_batch_size=1, max_batch_size=4, engine_path='/tmp/test.engine') + self.assertEqual(handler.get_metrics_namespace(), 'BeamML_TensorRT10') + + def test_share_model_across_processes(self): + """Test share_model_across_processes flag.""" + handler = TensorRTEngineHandlerNumPy( + min_batch_size=1, + max_batch_size=4, + engine_path='/tmp/test.engine', + large_model=True) + self.assertTrue(handler.share_model_across_processes()) + + def test_model_copies(self): + """Test model_copies parameter.""" + handler = TensorRTEngineHandlerNumPy( + min_batch_size=1, + max_batch_size=4, + engine_path='/tmp/test.engine', + model_copies=3) + self.assertEqual(handler.model_copies(), 3) + + @unittest.skipIf(not TENSORRT_AVAILABLE, 'TensorRT 10.x is not installed') + def test_inference_with_onnx_build_on_worker(self): + """Test loading ONNX and building engine on worker.""" + with tempfile.TemporaryDirectory() as tmpdir: + onnx_path = os.path.join(tmpdir, 'test_model.onnx') + _build_simple_onnx_model(input_size=1, output_path=onnx_path) + + handler = TensorRTEngineHandlerNumPy( + min_batch_size=1, + max_batch_size=4, + onnx_path=onnx_path, + build_on_worker=True) + + # Load model + engine = handler.load_model() + self.assertIsInstance(engine, TensorRTEngine) + + # Run inference + batch = np.array([[1.0], [2.0], [3.0], [4.0]], dtype=np.float32) + predictions = list(handler.run_inference(batch, engine)) + + # Verify predictions + self.assertEqual(len(predictions), 4) + for i, pred in enumerate(predictions): + expected = batch[i][0] * 2.0 + 0.5 + np.testing.assert_allclose(pred.inference[0], [expected], rtol=1e-5) + + def test_env_vars_setting(self): + """Test that environment variables are set correctly.""" + handler = TensorRTEngineHandlerNumPy( + min_batch_size=1, + max_batch_size=4, + engine_path='/tmp/test.engine', + env_vars={'TEST_VAR': 'test_value'}) + + # Remove the variable if it exists + os.environ.pop('TEST_VAR', None) + self.assertFalse('TEST_VAR' in os.environ) + + # This would normally be tested during load_model, but we'll just verify + # the env_vars are stored + self.assertEqual(handler._env_vars, {'TEST_VAR': 'test_value'}) + + +@pytest.mark.uses_tensorrt +class TensorRTEngineTest(unittest.TestCase): + """Tests for TensorRTEngine wrapper class.""" + @unittest.skipIf(not TENSORRT_AVAILABLE, 'TensorRT 10.x is not installed') + def test_engine_initialization(self): + """Test that TensorRTEngine initializes correctly.""" + with tempfile.TemporaryDirectory() as tmpdir: + onnx_path = os.path.join(tmpdir, 'test_model.onnx') + _build_simple_onnx_model(input_size=1, output_path=onnx_path) + + handler = TensorRTEngineHandlerNumPy( + min_batch_size=1, + max_batch_size=4, + onnx_path=onnx_path, + build_on_worker=True) + + engine = handler.load_model() + + # Verify engine attributes + self.assertIsNotNone(engine.engine) + self.assertIsNotNone(engine.context) + self.assertIsNotNone(engine.context_lock) + self.assertIsInstance(engine.input_names, list) + self.assertIsInstance(engine.output_names, list) + self.assertGreater(len(engine.input_names), 0) + self.assertGreater(len(engine.output_names), 0) + + +@pytest.mark.uses_tensorrt +class TensorRTRunInferencePipelineTest(unittest.TestCase): + """Integration tests for TensorRT handler in Beam pipelines.""" + @unittest.skipIf(not TENSORRT_AVAILABLE, 'TensorRT 10.x is not installed') + def test_pipeline_with_onnx_model(self): + """Test full pipeline with ONNX model built on worker.""" + with tempfile.TemporaryDirectory() as tmpdir: + onnx_path = os.path.join(tmpdir, 'test_model.onnx') + _build_simple_onnx_model(input_size=1, output_path=onnx_path) + + with TestPipeline() as pipeline: + handler = TensorRTEngineHandlerNumPy( + min_batch_size=4, + max_batch_size=4, + onnx_path=onnx_path, + build_on_worker=True) + + examples = [ + np.array([[1.0]], dtype=np.float32), + np.array([[2.0]], dtype=np.float32), + np.array([[3.0]], dtype=np.float32), + np.array([[4.0]], dtype=np.float32), + ] + + pcoll = pipeline | 'Create' >> beam.Create(examples) + predictions = pcoll | 'RunInference' >> RunInference(handler) + + def check_predictions(predictions): + self.assertEqual(len(predictions), 4) + for i, pred in enumerate(predictions): + expected = examples[i][0][0] * 2.0 + 0.5 + np.testing.assert_allclose(pred.inference[0], [expected], rtol=1e-5) + + assert_that(predictions, check_predictions) + + +if __name__ == '__main__': + unittest.main() diff --git a/sdks/python/apache_beam/ml/inference/trt_handler_tests_requirements.txt b/sdks/python/apache_beam/ml/inference/trt_handler_tests_requirements.txt new file mode 100644 index 000000000000..4c6438f0eec5 --- /dev/null +++ b/sdks/python/apache_beam/ml/inference/trt_handler_tests_requirements.txt @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +tensorrt>=10.0.0 +cuda-python>=12.0.0 +numpy>=1.20.0 +onnx>=1.12.0 diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini index 3eee1a5c0e80..cc1c829623fa 100644 --- a/sdks/python/pytest.ini +++ b/sdks/python/pytest.ini @@ -60,6 +60,7 @@ markers = uses_pytorch: tests that utilize pytorch in some way. uses_sklearn: tests that utilize scikit-learn in some way. uses_tensorflow: tests that utilize tensorflow in some way. + uses_tensorrt: tests that utilize TensorRT in some way. uses_tft: tests that utilizes tensorflow transforms in some way. uses_xgboost: tests that utilize xgboost in some way. uses_onnx: tests that utilizes onnx in some way. diff --git a/sdks/python/test-suites/containers/tensorrt_10x_runinference/README.md b/sdks/python/test-suites/containers/tensorrt_10x_runinference/README.md new file mode 100644 index 000000000000..4de3d25297db --- /dev/null +++ b/sdks/python/test-suites/containers/tensorrt_10x_runinference/README.md @@ -0,0 +1,58 @@ + + +# TensorRT 10.x RunInference Container + +This Docker container provides a GPU-enabled environment for running Apache Beam +pipelines with TensorRT 10.x inference using the new Tensor API. + +## Base Image + +- **Base**: `nvcr.io/nvidia/tensorrt:25.01-py3` +- **Python**: 3.10 +- **TensorRT**: 10.x (included in base image) +- **CUDA**: 12.x (included in base image) + +## Dependencies + +The container includes: +- Apache Beam 2.67.0 with GCP support +- TensorRT 10.x (from base image) +- CUDA Python 12.8 +- PyTorch and TorchVision +- Transformers 4.18.0+ +- OpenCV, Pillow, PyMuPDF for image/document processing +- NumPy 2.0.1 + +## Usage + +This container is designed for Dataflow jobs that require GPU acceleration with +TensorRT 10.x. It supports the new `TensorRTEngineHandlerNumPy` handler from +`apache_beam.ml.inference.trt_handler_numpy_compact`. + +## Building + +```bash +docker build -f tensor_rt_10x.dockerfile -t tensorrt-10x-beam . +``` + +## GPU Requirements + +- NVIDIA GPU with CUDA support +- Compatible with Dataflow GPU workers (e.g., `nvidia-tesla-t4`) diff --git a/sdks/python/test-suites/containers/tensorrt_10x_runinference/tensor_rt_10x.dockerfile b/sdks/python/test-suites/containers/tensorrt_10x_runinference/tensor_rt_10x.dockerfile new file mode 100644 index 000000000000..107041e93ae8 --- /dev/null +++ b/sdks/python/test-suites/containers/tensorrt_10x_runinference/tensor_rt_10x.dockerfile @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ARG BUILD_IMAGE=nvcr.io/nvidia/tensorrt:25.01-py3 + +FROM ${BUILD_IMAGE} + +ENV PATH="/usr/src/tensorrt/bin:${PATH}" + +WORKDIR /workspace + +RUN apt-get update -y && \ + apt-get install -y \ + python3-venv \ + libx11-6 \ + libsm6 \ + libxext6 \ + libxrender-dev \ + libgl1 + +RUN pip install --no-cache-dir apache-beam[gcp]==2.67.0 + +COPY --from=apache/beam_python3.10_sdk:2.67.0 /opt/apache/beam /opt/apache/beam + +RUN pip install --upgrade pip && \ + pip install torch && \ + pip install torchvision && \ + pip install pillow>=8.0.0 && \ + pip install transformers>=4.18.0 && \ + pip install cuda-python==12.8 && \ + pip install opencv-python==4.7.0.72 && \ + pip install PyMuPDF==1.22.5 && \ + pip install requests && \ + pip install numpy==2.0.1 + +ENTRYPOINT ["/opt/apache/beam/boot"]