diff --git a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml index ff7480c320af..04535e41acae 100644 --- a/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml +++ b/.github/workflows/beam_Inference_Python_Benchmarks_Dataflow.yml @@ -92,6 +92,7 @@ jobs: ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Streaming_DistilBert_Base_Uncased.txt ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Sentiment_Batch_DistilBert_Base_Uncased.txt ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_VLLM_Gemma_Batch.txt + ${{ github.workspace }}/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Classification_Rightfit.txt # The env variables are created and populated in the test-arguments-action as "_test_arguments_" - name: get current time run: echo "NOW_UTC=$(date '+%m%d%H%M%S' --utc)" >> $GITHUB_ENV @@ -189,4 +190,15 @@ jobs: -Prunner=DataflowRunner \ -PpythonVersion=3.10 \ -PloadTest.requirementsTxtFile=apache_beam/ml/inference/torch_tests_requirements.txt \ - '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_5 }} --job_name=benchmark-tests-pytorch-imagenet-python-gpu-${{env.NOW_UTC}} --output=gs://temp-storage-for-end-to-end-tests/torch/result_resnet152_gpu-${{env.NOW_UTC}}.txt' \ No newline at end of file + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_5 }} --job_name=benchmark-tests-pytorch-imagenet-python-gpu-${{env.NOW_UTC}} --output=gs://temp-storage-for-end-to-end-tests/torch/result_resnet152_gpu-${{env.NOW_UTC}}.txt' + - name: run PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting) + uses: ./.github/actions/gradle-command-self-hosted-action + timeout-minutes: 180 + with: + gradle-command: :sdks:python:apache_beam:testing:load_tests:run + arguments: | + -PloadTest.mainClass=apache_beam.testing.benchmarks.inference.pytorch_imagenet_rightfit_benchmarks \ + -Prunner=DataflowRunner \ + -PpythonVersion=3.10 \ + -PloadTest.requirementsTxtFile=apache_beam/ml/inference/pytorch_rightfit_requirements.txt \ + '-PloadTest.args=${{ env.beam_Inference_Python_Benchmarks_Dataflow_test_arguments_9 }} --mode=streaming --job_name=benchmark-tests-pytorch-imagenet-rightfit-streaming-${{env.NOW_UTC}} --output_table=apache-beam-testing.beam_run_inference.result_torch_inference_imagenet_stream_rightfit' \ diff --git a/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Classification_Rightfit.txt b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Classification_Rightfit.txt new file mode 100644 index 000000000000..0e19440503c5 --- /dev/null +++ b/.github/workflows/load-tests-pipeline-options/beam_Inference_Python_Benchmarks_Dataflow_Pytorch_Image_Classification_Rightfit.txt @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--region=us-central1 +--machine_type=n1-standard-4 +--num_workers=50 +--disk_size_gb=50 +--autoscaling_algorithm=NONE +--staging_location=gs://temp-storage-for-perf-tests/loadtests +--temp_location=gs://temp-storage-for-perf-tests/loadtests +--requirements_file=apache_beam/ml/inference/pytorch_rightfit_requirements.txt +--publish_to_big_query=true +--metrics_dataset=beam_run_inference +--metrics_table=torch_inference_imagenet_results_stream_rightfit +--influx_measurement=torch_inference_imagenet_stream_rightfit +--pretrained_model_name=efficientnet_b0 +--device=GPU +--input_file=gs://apache-beam-ml/testing/inputs/openimage_50k_benchmark.txt +--runner=DataflowRunner +--mode=streaming +--input_mode=gcs_uris +--input_options={} +--pubsub_topic=projects/apache-beam-testing/topics/images_topic +--pubsub_subscription=projects/apache-beam-testing/subscriptions/images_subscription +--model_state_dict_path=gs://apache-beam-ml/models/efficientnet_b0_state_dict.pth +--rate_limit=250 +--image_size=224 +--top_k=5 +--inference_batch_size=auto +--window_sec=60 +--trigger_proc_time_sec=30 +--enable_dedup=false +--experiments=worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx diff --git a/.test-infra/tools/refresh_looker_metrics.py b/.test-infra/tools/refresh_looker_metrics.py index a4c6999be775..d1b03022647c 100644 --- a/.test-infra/tools/refresh_looker_metrics.py +++ b/.test-infra/tools/refresh_looker_metrics.py @@ -43,6 +43,7 @@ ("82", ["263", "264", "265", "266", "267"]), # PyTorch Sentiment Streaming DistilBERT base uncased ("85", ["268", "269", "270", "271", "272"]), # PyTorch Sentiment Batch DistilBERT base uncased ("86", ["284", "285", "286", "287", "288"]), # VLLM Batch Gemma + ("92", ["289", "290", "291", "292", "293"]), # PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting) ] diff --git a/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py b/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py new file mode 100644 index 000000000000..fa380ed8ac71 --- /dev/null +++ b/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py @@ -0,0 +1,522 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This streaming pipeline performs image classification using an open-source +PyTorch EfficientNet-B0 model optimized for T4 GPUs. +It reads image URIs from Pub/Sub, decodes and preprocesses them in parallel, +and runs inference with adaptive batch sizing for optimal GPU utilization. +The pipeline ensures exactly-once semantics via stateful deduplication and +idempotent BigQuery writes, allowing stable and reproducible performance +measurements under continuous load. +Resources like Pub/Sub topic/subscription cleanup is handled programmatically. +""" + +import argparse +import io +import json +import logging +import threading +import time +from typing import Iterable +from typing import Optional +from typing import Tuple + +import torch +import torch.nn.functional as F + +import apache_beam as beam +from apache_beam.coders import BytesCoder +from apache_beam.io.filesystems import FileSystems +from apache_beam.ml.inference.base import KeyedModelHandler +from apache_beam.ml.inference.base import PredictionResult +from apache_beam.ml.inference.base import RunInference +from apache_beam.ml.inference.pytorch_inference import PytorchModelHandlerTensor +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.runners.runner import PipelineResult +from apache_beam.transforms import userstate +from apache_beam.transforms import window + +from google.cloud import pubsub_v1 +import PIL.Image as PILImage + +# ============ Utility & Preprocessing ============ + +IMAGENET_MEAN = [0.485, 0.456, 0.406] +IMAGENET_STD = [0.229, 0.224, 0.225] + + +def now_millis() -> int: + return int(time.time() * 1000) + + +def read_gcs_file_lines(gcs_path: str) -> Iterable[str]: + """Reads text lines from a GCS file.""" + with FileSystems.open(gcs_path) as f: + for line in f.read().decode("utf-8").splitlines(): + yield line.strip() + + +def load_image_from_uri(uri: str) -> bytes: + with FileSystems.open(uri) as f: + return f.read() + + +def decode_and_preprocess(image_bytes: bytes, size: int = 224) -> torch.Tensor: + """Decode bytes->RGB PIL->resize/crop->tensor->normalize.""" + with PILImage.open(io.BytesIO(image_bytes)) as img: + img = img.convert("RGB") + img.thumbnail((256, 256)) + w, h = img.size + left = (w - size) // 2 + top = (h - size) // 2 + img = img.crop( + (max(0, left), max(0, top), min(w, left + size), min(h, top + size))) + + # To tensor [0..1] + import numpy as np + arr = np.asarray(img).astype("float32") / 255.0 # H,W,3 + # Normalize + arr = (arr - IMAGENET_MEAN) / IMAGENET_STD + # HWC -> CHW + arr = np.transpose(arr, (2, 0, 1)) + return torch.from_numpy(arr) # float32, shape (3,224,224) + + +class RateLimitDoFn(beam.DoFn): + def __init__(self, rate_per_sec: float): + self.delay = 1.0 / rate_per_sec + + def process(self, element): + time.sleep(self.delay) + yield element + + +class MakeKeyDoFn(beam.DoFn): + """Produce (image_id, payload) stable for dedup & BQ insertId.""" + def __init__(self, input_mode: str): + self.input_mode = input_mode + + def process(self, element: str | bytes): + # Input can be raw bytes from Pub/Sub or a GCS URI string, depends on mode + if self.input_mode == "bytes": + # element is bytes message, assume it includes + # {"image_id": "...", "bytes": base64?} or just raw bytes. + import hashlib + b = element if isinstance(element, (bytes, bytearray)) else bytes(element) + image_id = hashlib.sha1(b).hexdigest() + yield image_id, b + else: + # gcs_uris: element is uri string; image_id = sha1(uri) + import hashlib + uri = element.decode("utf-8") if isinstance( + element, (bytes, bytearray)) else str(element) + image_id = hashlib.sha1(uri.encode("utf-8")).hexdigest() + yield image_id, uri + + +class DedupDoFn(beam.DoFn): + seen = userstate.ReadModifyWriteStateSpec('seen', BytesCoder()) + + def process(self, element, seen=beam.DoFn.StateParam(seen)): + if seen.read() == b'1': + return + seen.write(b'1') + yield element + + +class DecodePreprocessDoFn(beam.DoFn): + """Turn (image_id, bytes|uri) -> (image_id, torch.Tensor)""" + def __init__( + self, input_mode: str, image_size: int = 224, decode_threads: int = 4): + self.input_mode = input_mode + self.image_size = image_size + self.decode_threads = decode_threads + + def process(self, kv: Tuple[str, object]): + image_id, payload = kv + start = now_millis() + + try: + if self.input_mode == "bytes": + b = payload if isinstance(payload, + (bytes, bytearray)) else bytes(payload) + else: + uri = payload if isinstance(payload, str) else payload.decode("utf-8") + b = load_image_from_uri(uri) + + tensor = decode_and_preprocess(b, self.image_size) + preprocess_ms = now_millis() - start + yield image_id, {"tensor": tensor, "preprocess_ms": preprocess_ms} + except Exception as e: + logging.warning("Decode failed for %s: %s", image_id, e) + return + + +class PostProcessDoFn(beam.DoFn): + """PredictionResult -> dict row for BQ.""" + def __init__(self, top_k: int, model_name: str): + self.top_k = top_k + self.model_name = model_name + + def process(self, kv: Tuple[str, PredictionResult]): + image_id, pred = kv + logits = pred.inference[ + "logits"] # torch.Tensor [B, num_classes] or [num_classes] + if isinstance(logits, torch.Tensor) and logits.ndim == 1: + logits = logits.unsqueeze(0) + + probs = F.softmax(logits, dim=-1) # [B, C] + values, indices = torch.topk( + probs, k=min(self.top_k, probs.shape[-1]), dim=-1 + ) + + topk = [{ + "class_id": int(idx.item()), "score": float(val.item()) + } for idx, val in zip(indices[0], values[0])] + + yield { + "image_id": image_id, + "model_name": self.model_name, + "topk": json.dumps(topk), + "infer_ms": now_millis(), + } + + +# ============ Args & Helpers ============ + + +def parse_known_args(argv): + parser = argparse.ArgumentParser() + # I/O & runtime + parser.add_argument( + '--mode', default='streaming', choices=['streaming', 'batch']) + parser.add_argument( + '--output_table', + required=True, + help='BigQuery output table: dataset.table') + parser.add_argument( + '--publish_to_big_query', default='true', choices=['true', 'false']) + parser.add_argument( + '--input_mode', default='gcs_uris', choices=['gcs_uris', 'bytes']) + parser.add_argument( + '--input', + required=True, + help='GCS path to file with URIs (for load) OR unused for bytes') + parser.add_argument( + '--pubsub_topic', + default='projects/apache-beam-testing/topics/images_topic') + parser.add_argument( + '--pubsub_subscription', + default='projects/apache-beam-testing/subscriptions/images_subscription') + parser.add_argument( + '--rate_limit', + type=float, + default=None, + help='Elements per second for load pipeline') + + # Model & inference + parser.add_argument( + '--pretrained_model_name', + default='efficientnet_b0', + help='OSS model name (e.g., efficientnet_b0|mobilenetv3_large_100)') + parser.add_argument( + '--model_state_dict_path', + default=None, + help='Optional state_dict to load') + parser.add_argument('--device', default='GPU', choices=['CPU', 'GPU']) + parser.add_argument('--image_size', type=int, default=224) + parser.add_argument('--top_k', type=int, default=5) + parser.add_argument( + '--inference_batch_size', + default='auto', + help='int or "auto"; auto tries 64→32→16') + + # Windows + parser.add_argument('--window_sec', type=int, default=60) + parser.add_argument('--trigger_proc_time_sec', type=int, default=30) + + # Dedup + parser.add_argument( + '--enable_dedup', default='false', choices=['true', 'false']) + + known_args, pipeline_args = parser.parse_known_args(argv) + return known_args, pipeline_args + + +def ensure_pubsub_resources( + project: str, topic_path: str, subscription_path: str): + publisher = pubsub_v1.PublisherClient() + subscriber = pubsub_v1.SubscriberClient() + + topic_name = topic_path.split("/")[-1] + subscription_name = subscription_path.split("/")[-1] + + full_topic_path = publisher.topic_path(project, topic_name) + full_subscription_path = subscriber.subscription_path( + project, subscription_name) + + try: + publisher.get_topic(request={"topic": full_topic_path}) + except Exception: + publisher.create_topic(name=full_topic_path) + + try: + subscriber.get_subscription( + request={"subscription": full_subscription_path}) + except Exception: + subscriber.create_subscription( + name=full_subscription_path, topic=full_topic_path) + + +def cleanup_pubsub_resources( + project: str, topic_path: str, subscription_path: str): + publisher = pubsub_v1.PublisherClient() + subscriber = pubsub_v1.SubscriberClient() + + topic_name = topic_path.split("/")[-1] + subscription_name = subscription_path.split("/")[-1] + + full_topic_path = publisher.topic_path(project, topic_name) + full_subscription_path = subscriber.subscription_path( + project, subscription_name) + + try: + subscriber.delete_subscription( + request={"subscription": full_subscription_path}) + print(f"Deleted subscription: {subscription_name}") + except Exception as e: + print(f"Failed to delete subscription: {e}") + + try: + publisher.delete_topic(request={"topic": full_topic_path}) + print(f"Deleted topic: {topic_name}") + except Exception as e: + print(f"Failed to delete topic: {e}") + + +def override_or_add(args, flag, value): + if flag in args: + idx = args.index(flag) + args[idx + 1] = str(value) + else: + args.extend([flag, str(value)]) + + +# ============ Model factory (timm) ============ + + +def create_timm_m(model_name: str, num_classes: int = 1000): + import timm + model = timm.create_model( + model_name, pretrained=True, num_classes=num_classes) + model.eval() + return model + + +def pick_batch_size(arg: str) -> Optional[int]: + if isinstance(arg, str) and arg.lower() == 'auto': + return None + try: + return int(arg) + except Exception: + return None + + +# ============ Load pipeline ============ + + +def run_load_pipeline(known_args, pipeline_args): + """Reads GCS file with URIs and publishes them to Pub/Sub (for streaming).""" + # enforce smaller/CPU-only defaults for feeder + override_or_add(pipeline_args, '--device', 'CPU') + override_or_add(pipeline_args, '--num_workers', '5') + override_or_add(pipeline_args, '--max_num_workers', '10') + override_or_add( + pipeline_args, '--job_name', f"images-load-pubsub-{int(time.time())}") + override_or_add(pipeline_args, '--project', known_args.project) + pipeline_args = [ + arg for arg in pipeline_args if not arg.startswith("--experiments") + ] + + pipeline_options = PipelineOptions(pipeline_args) + pipeline = beam.Pipeline(options=pipeline_options) + + lines = ( + pipeline + | + 'ReadGCSFile' >> beam.Create(list(read_gcs_file_lines(known_args.input))) + | 'FilterEmpty' >> beam.Filter(lambda line: line.strip())) + if known_args.rate_limit: + lines = lines | 'RateLimit' >> beam.ParDo( + RateLimitDoFn(rate_per_sec=known_args.rate_limit)) + + _ = ( + lines + | 'ToBytes' >> beam.Map(lambda line: line.encode('utf-8')) + | 'ToPubSub' >> beam.io.WriteToPubSub(topic=known_args.pubsub_topic)) + return pipeline.run() + + +# ============ Main pipeline ============ + + +def run( + argv=None, save_main_session=True, test_pipeline=None) -> PipelineResult: + known_args, pipeline_args = parse_known_args(argv) + + ensure_pubsub_resources( + project=known_args.project, + topic_path=known_args.pubsub_topic, + subscription_path=known_args.pubsub_subscription) + + if known_args.mode == 'streaming': + # Start feeder thread that reads URIs from GCS and fills Pub/Sub. + threading.Thread( + target=lambda: + (time.sleep(900), run_load_pipeline(known_args, pipeline_args)), + daemon=True).start() + + # StandardOptions + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = save_main_session + pipeline_options.view_as(StandardOptions).streaming = ( + known_args.mode == 'streaming') + + # Build model handler with right-fitting batch size + desired_batch = pick_batch_size(known_args.inference_batch_size) + tried = [64, 32, 16] if desired_batch is None else [desired_batch] + + # Device + device = 'GPU' if known_args.device.upper() == 'GPU' else 'CPU' + + bs_ok = None + last_err = None + for bs in tried: + try: + model_handler = PytorchModelHandlerTensor( + model_class=lambda: create_timm_m(known_args.pretrained_model_name), + model_params={}, + state_dict_path=known_args.model_state_dict_path, + device=device, + inference_batch_size=bs + if bs is not None else 64, # start guess for warmup + ) + # quick warmup to validate memory (single dummy tensor) + dummy = torch.zeros((3, known_args.image_size, known_args.image_size), + dtype=torch.float32) + _ = model_handler.load_model() # ensures weights on device + with torch.no_grad(): + mdl = model_handler._model + mdl(torch.unsqueeze(dummy, 0)) + bs_ok = bs if bs is not None else 64 + break + except Exception as e: + last_err = e + logging.warning("Batch size %s failed during warmup: %s", bs, e) + continue + + if bs_ok is None: + logging.warning( + "Falling back to batch_size=8 due to previous errors: %s", last_err) + bs_ok = 8 + model_handler = PytorchModelHandlerTensor( + model_class=lambda: create_timm_m(known_args.pretrained_model_name), + model_params={}, + state_dict_path=known_args.model_state_dict_path, + device=device, + inference_batch_size=bs_ok, + ) + + pipeline = test_pipeline or beam.Pipeline(options=pipeline_options) + + if known_args.mode == 'batch': + pcoll = ( + pipeline + | 'ReadURIsBatch' >> beam.Create( + list(read_gcs_file_lines(known_args.input))) + | 'FilterEmptyBatch' >> beam.Filter(lambda s: s.strip())) + else: + pcoll = ( + pipeline + | 'ReadFromPubSub' >> + beam.io.ReadFromPubSub(subscription=known_args.pubsub_subscription) + | 'DecodeUTF8' >> beam.Map(lambda x: x.decode('utf-8')) + | 'Window' >> beam.WindowInto( + window.FixedWindows(known_args.window_sec), + trigger=beam.trigger.AfterProcessingTime( + known_args.trigger_proc_time_sec), + accumulation_mode=beam.trigger.AccumulationMode.DISCARDING, + allowed_lateness=0)) + + keyed = ( + pcoll + | 'MakeKey' >> beam.ParDo(MakeKeyDoFn(input_mode=known_args.input_mode))) + + if known_args.enable_dedup == 'true': + keyed = keyed | 'Dedup' >> beam.ParDo(DedupDoFn()) + + preprocessed = ( + keyed + | 'DecodePreprocess' >> beam.ParDo( + DecodePreprocessDoFn( + input_mode=known_args.input_mode, + image_size=known_args.image_size))) + + to_infer = ( + preprocessed + | 'ToKeyedTensor' >> beam.Map(lambda kv: (kv[0], kv[1]["tensor"]))) + + predictions = ( + to_infer + | 'RunInference' >> RunInference(KeyedModelHandler(model_handler))) + + results = ( + predictions + | 'PostProcess' >> beam.ParDo( + PostProcessDoFn( + top_k=known_args.top_k, + model_name=known_args.pretrained_model_name))) + + if known_args.publish_to_big_query == 'true': + _ = ( + results + | 'WriteToBigQuery' >> beam.io.WriteToBigQuery( + known_args.output_table, + schema= + 'image_id:STRING, model_name:STRING, topk:STRING, infer_ms:INT64', + write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, + create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, + method=beam.io.WriteToBigQuery.Method.STREAMING_INSERTS)) + + result = pipeline.run() + result.wait_until_finish(duration=1800000) # 30 min + try: + result.cancel() + except Exception: + pass + + cleanup_pubsub_resources( + project=known_args.project, + topic_path=known_args.pubsub_topic, + subscription_path=known_args.pubsub_subscription) + + return result + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run() diff --git a/sdks/python/apache_beam/ml/inference/pytorch_rightfit_requirements.txt b/sdks/python/apache_beam/ml/inference/pytorch_rightfit_requirements.txt new file mode 100644 index 000000000000..2b2916c577ea --- /dev/null +++ b/sdks/python/apache_beam/ml/inference/pytorch_rightfit_requirements.txt @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +torch>=2.2.0,<2.8.0 +torchvision>=0.17.0,<0.21.0 +timm>=1.0.7 +Pillow>=10.0.0 +numpy>=1.25.0 +google-cloud-pubsub>=2.15.0 +google-cloud-monitoring>=2.27.0 +protobuf>=4.25.1 +requests>=2.31.0 diff --git a/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_imagenet_rightfit_benchmarks.py b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_imagenet_rightfit_benchmarks.py new file mode 100644 index 000000000000..1528711a801b --- /dev/null +++ b/sdks/python/apache_beam/testing/benchmarks/inference/pytorch_imagenet_rightfit_benchmarks.py @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# pytype: skip-file + +import logging + +from apache_beam.examples.inference import pytorch_imagenet_rightfit +from apache_beam.testing.load_tests.dataflow_cost_benchmark import DataflowCostBenchmark + + +class PytorchImagenetRightfitBenchmarkTest(DataflowCostBenchmark): + def __init__(self): + self.metrics_namespace = 'BeamML_PyTorch' + super().__init__( + metrics_namespace=self.metrics_namespace, + pcollection='PostProcess.out0') + + def test(self): + extra_opts = {} + extra_opts['input'] = self.pipeline.get_option('input_file') + self.result = pytorch_imagenet_rightfit.run( + self.pipeline.get_full_options_as_args(**extra_opts), + test_pipeline=self.pipeline) + + +if __name__ == '__main__': + logging.basicConfig(level=logging.INFO) + PytorchImagenetRightfitBenchmarkTest().run() diff --git a/website/www/site/content/en/performance/_index.md b/website/www/site/content/en/performance/_index.md index 17bdc6f3de0a..b9be9b9f4f1c 100644 --- a/website/www/site/content/en/performance/_index.md +++ b/website/www/site/content/en/performance/_index.md @@ -46,6 +46,7 @@ See the following pages for performance measures recorded when running various B ## Streaming - [PyTorch Sentiment Analysis Streaming DistilBERT base](/performance/pytorchbertsentimentstreaming) +- [PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting)](/performance/pytorchimagenetrightfit) ## Batch diff --git a/website/www/site/content/en/performance/pytorchimagenetrightfit/_index.md b/website/www/site/content/en/performance/pytorchimagenetrightfit/_index.md new file mode 100644 index 000000000000..8ed16690a6ab --- /dev/null +++ b/website/www/site/content/en/performance/pytorchimagenetrightfit/_index.md @@ -0,0 +1,44 @@ +--- +title: "PyTorch Language Modeling BERT base Performance" +--- + + + +# PyTorch Image Classification EfficientNet-B0 Streaming (Right-fitting) + +**Model**: PyTorch Image Classification — EfficientNet-B0 (pretrained on ImageNet) +**Accelerator**: Tesla T4 GPU (right-fitted batch size 64 → 32 → 16 → 8) +**Host**: 50 × n1-standard-4 (4 vCPUs, 15 GB RAM) + +This streaming pipeline performs image classification using an open-source PyTorch EfficientNet-B0 model optimized for T4 GPUs. +It reads image URIs from Pub/Sub, decodes and preprocesses them in parallel, and runs inference with adaptive batch sizing for optimal GPU utilization. +The pipeline ensures exactly-once semantics via stateful deduplication and idempotent BigQuery writes, allowing stable and reproducible performance measurements under continuous load. + +The following graphs show various metrics when running PyTorch Sentiment Analysis Streaming using Hugging Face DistilBERT base uncased model pipeline. +See the [glossary](/performance/glossary) for definitions. + +Full pipeline implementation is available [here](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/inference/pytorch_imagenet_rightfit.py). + +## What is the estimated cost to run the pipeline? + +{{< performance_looks io="pytorchimagenetrightfit" read_or_write="write" section="cost" >}} + +## How has various metrics changed when running the pipeline for different Beam SDK versions? + +{{< performance_looks io="pytorchimagenetrightfit" read_or_write="write" section="version" >}} + +## How has various metrics changed over time when running the pipeline? + +{{< performance_looks io="pytorchimagenetrightfit" read_or_write="write" section="date" >}} diff --git a/website/www/site/data/performance.yaml b/website/www/site/data/performance.yaml index 17a6612160c6..d8dbcb822fed 100644 --- a/website/www/site/data/performance.yaml +++ b/website/www/site/data/performance.yaml @@ -250,3 +250,19 @@ looks: title: AvgThroughputBytesPerSec by Version - id: dKyJy5ZKhkBdSTXRY3wZR6fXzptSs2qm title: AvgThroughputElementsPerSec by Version + pytorchimagenetrightfit: + write: + folder: 92 + cost: + - id: zJhxrMmxJ3zVHH5WZnDQqcBHdFDrBhxK + title: RunTime and EstimatedCost + date: + - id: RybzxZdkXJg6PzQZkBcfxTJkByTf3ZV5 + title: AvgThroughputBytesPerSec by Date + - id: xTVYPytQVH7zXYz7SvphnRV4nQcxCddp + title: AvgThroughputElementsPerSec by Date + version: + - id: dGN6Zr6rh7DfnRtTDCN6GHcNfhSkrbCq + title: AvgThroughputBytesPerSec by Version + - id: VJMWrZh3jXk2mqCZk4NQn3tBrHgGWqnC + title: AvgThroughputElementsPerSec by Version