From 8b1dd74bcbe2929f267222f36d70d1d6ae17ebba Mon Sep 17 00:00:00 2001 From: GnanasundaramSampath Date: Fri, 15 May 2026 14:10:44 +0530 Subject: [PATCH] [azure-ai-ml] Add hdfs input mode mapping for job input serialization --- .../azure/ai/ml/constants/_common.py | 2 ++ .../ml/entities/_job/_input_output_helpers.py | 8 ++++++-- .../unittests/test_pipeline_job_schema.py | 20 +++++++++++++++++++ 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/constants/_common.py b/sdk/ml/azure-ai-ml/azure/ai/ml/constants/_common.py index 2e12d7f41207..55df8f3823e8 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/constants/_common.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/constants/_common.py @@ -836,6 +836,8 @@ class InputOutputModes: """Evaluation download asset type.""" DIRECT = "direct" """Direct asset type.""" + HDFS = "hdfs" + """HDFS asset type.""" class ConnectionTypes: diff --git a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py index 1a13ab414414..dd86a46703ce 100644 --- a/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py +++ b/sdk/ml/azure-ai-ml/azure/ai/ml/entities/_job/_input_output_helpers.py @@ -69,6 +69,8 @@ InputDeliveryMode.DIRECT: InputOutputModes.DIRECT, InputDeliveryMode.EVAL_MOUNT: InputOutputModes.EVAL_MOUNT, InputDeliveryMode.EVAL_DOWNLOAD: InputOutputModes.EVAL_DOWNLOAD, + "Hdfs": InputOutputModes.HDFS, + "hdfs": InputOutputModes.HDFS, } INPUT_MOUNT_MAPPING_TO_REST = { @@ -79,6 +81,7 @@ InputOutputModes.EVAL_MOUNT: InputDeliveryMode.EVAL_MOUNT, InputOutputModes.EVAL_DOWNLOAD: InputDeliveryMode.EVAL_DOWNLOAD, InputOutputModes.DIRECT: InputDeliveryMode.DIRECT, + InputOutputModes.HDFS: "Hdfs", } @@ -249,7 +252,7 @@ def to_rest_dataset_literal_inputs( input_data = LiteralJobInput(value=input_value.path) # set mode attribute manually for binding job input if input_value.mode: - input_data.mode = INPUT_MOUNT_MAPPING_TO_REST[input_value.mode] + input_data.mode = INPUT_MOUNT_MAPPING_TO_REST[input_value.mode.lower()] if getattr(input_value, "path_on_compute", None) is not None: input_data.pathOnCompute = input_value.path_on_compute input_data.job_input_type = JobInputType.LITERAL @@ -279,7 +282,8 @@ def to_rest_dataset_literal_inputs( input_data = LiteralJobInput(value=str(input_value["value"])) # set mode attribute manually for binding job input if "mode" in input_value: - input_data.mode = input_value["mode"] + input_mode = str(input_value["mode"]) + input_data.mode = INPUT_MOUNT_MAPPING_TO_REST.get(input_mode.lower(), input_mode) else: input_data = LiteralJobInput(value=str(input_value)) input_data.job_input_type = JobInputType.LITERAL diff --git a/sdk/ml/azure-ai-ml/tests/pipeline_job/unittests/test_pipeline_job_schema.py b/sdk/ml/azure-ai-ml/tests/pipeline_job/unittests/test_pipeline_job_schema.py index 4e924a4e99ef..a45ab63f0758 100644 --- a/sdk/ml/azure-ai-ml/tests/pipeline_job/unittests/test_pipeline_job_schema.py +++ b/sdk/ml/azure-ai-ml/tests/pipeline_job/unittests/test_pipeline_job_schema.py @@ -32,6 +32,8 @@ from azure.ai.ml.entities._inputs_outputs import Input, Output from azure.ai.ml.entities._job._input_output_helpers import ( INPUT_MOUNT_MAPPING_FROM_REST, + from_rest_inputs_to_dataset_literal, + to_rest_dataset_literal, validate_pipeline_input_key_characters, ) from azure.ai.ml.entities._job.automl.search_space_utils import _convert_sweep_dist_dict_to_str_dict @@ -53,6 +55,24 @@ @pytest.mark.unittest @pytest.mark.pipeline_test class TestPipelineJobSchema: + def test_hdfs_input_mode_round_trip(self): + rest_inputs = to_rest_dataset_literal( + { + "hdfs_input": Input( + type=AssetTypes.URI_FOLDER, + path="azureml://datastores/workspaceblobstore/paths/data/", + mode=InputOutputModes.HDFS, + ) + }, + job_type=None, + ) + + assert rest_inputs["hdfs_input"].mode == "Hdfs" + + sdk_inputs = from_rest_inputs_to_dataset_literal(rest_inputs) + assert isinstance(sdk_inputs["hdfs_input"], Input) + assert sdk_inputs["hdfs_input"].mode == InputOutputModes.HDFS + def test_validate_pipeline_job_keys(self): def validator(key, assert_valid=True): if assert_valid: