Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,12 @@ class PyTorchJobConfig(BaseModel):
default=None,
description="Limit for the amount of memory in GiB",
)
efa_interfaces: Optional[int] = Field(
efa: Optional[int] = Field(
default=None,
description="Number of EFA interfaces for the instance",
description="Number of EFA interfaces",
ge=0
)
efa_interfaces_limit: Optional[int] = Field(
efa_limit: Optional[int] = Field(
default=None,
description="Limit for the number of EFA interfaces",
ge=0
Expand Down Expand Up @@ -464,26 +464,26 @@ def build_dict(**kwargs):
**{partition_resource_key: str(self.accelerator_partition_count)} if self.accelerator_partition_count else {},
vcpu=str(self.vcpu) if self.vcpu else None,
memory=str(self.memory) if self.memory else None,
**{"vpc.amazonaws.com/efa": str(self.efa_interfaces)} if self.efa_interfaces else {},
**{"vpc.amazonaws.com/efa": str(self.efa)} if self.efa else {},
)
limits_value = build_dict(
**{partition_resource_key: str(self.accelerator_partition_limit)} if self.accelerator_partition_limit else {},
vcpu=str(self.vcpu_limit) if self.vcpu_limit else None,
memory=str(self.memory_limit) if self.memory_limit else None,
**{"vpc.amazonaws.com/efa": str(self.efa_interfaces_limit)} if self.efa_interfaces_limit else {},
**{"vpc.amazonaws.com/efa": str(self.efa_limit)} if self.efa_limit else {},
)
else:
requests_value = build_dict(
accelerators=str(self.accelerators) if self.accelerators else None,
vcpu=str(self.vcpu) if self.vcpu else None,
memory=str(self.memory) if self.memory else None,
**{"vpc.amazonaws.com/efa": str(self.efa_interfaces)} if self.efa_interfaces else {},
**{"vpc.amazonaws.com/efa": str(self.efa)} if self.efa else {},
)
limits_value = build_dict(
accelerators=str(self.accelerators_limit) if self.accelerators_limit else None,
vcpu=str(self.vcpu_limit) if self.vcpu_limit else None,
memory=str(self.memory_limit) if self.memory_limit else None,
**{"vpc.amazonaws.com/efa": str(self.efa_interfaces_limit)} if self.efa_interfaces_limit else {},
**{"vpc.amazonaws.com/efa": str(self.efa_limit)} if self.efa_limit else {},
)

# Build container
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,12 @@
"minimum": 0,
"description": "Limit for the amount of memory in GiB"
},
"efa_interfaces": {
"efa": {
"type": "integer",
"minimum": 0,
"description": "Number of EFA interfaces for the instance"
"description": "Number of EFA interfaces"
},
"efa_interfaces_limit": {
"efa_limit": {
"type": "integer",
"minimum": 0,
"description": "Limit for the number of EFA interfaces"
Expand Down
85 changes: 82 additions & 3 deletions test/unit_tests/training/test_pytorch_job_template_model.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import unittest
from hyperpod_pytorch_job_template.v1_1.model import PyTorchJobConfig
from hyperpod_pytorch_job_template.v1_0.model import PyTorchJobConfig as PyTorchJobConfigV1_0
from hyperpod_pytorch_job_template.v1_1.template import TEMPLATE_CONTENT
from jinja2 import Template
from sagemaker.hyperpod.training.hyperpod_pytorch_job import HyperPodPytorchJob


Expand Down Expand Up @@ -40,7 +42,7 @@ def test_multi_node_gpu_job_with_efa(self):

def test_user_specified_efa_overrides_default(self):
"""User-specified EFA value takes precedence over the instance default."""
job = self._resolve(job_name="test-efa", accelerators=4, efa_interfaces=2, instance_type="ml.p4d.24xlarge")
job = self._resolve(job_name="test-efa", accelerators=4, efa=2, instance_type="ml.p4d.24xlarge")
requests, limits = self._get_resources(job)

self.assertEqual(int(requests["vpc.amazonaws.com/efa"]), 2)
Expand Down Expand Up @@ -71,14 +73,14 @@ def test_all_resource_types_together(self):
def test_accelerators_without_instance_type_rejected(self):
"""Specifying accelerators without instance_type raises a clear error."""
from sagemaker.hyperpod.training.config.hyperpod_pytorch_job_unified_config import (
Containers, ReplicaSpec, Resources, Spec, Template,
Containers, ReplicaSpec, Resources, Spec, Template as SpecTemplate,
)

job = HyperPodPytorchJob(
metadata={"name": "test-no-instance-type", "namespace": "default"},
replica_specs=[ReplicaSpec(
name="pod",
template=Template(
template=SpecTemplate(
spec=Spec(containers=[Containers(
name="test",
image="pytorch:latest",
Expand All @@ -95,6 +97,83 @@ def test_accelerators_without_instance_type_rejected(self):
HyperPodPytorchJob.allocate_quotas_if_applicable(job)


class TestJinjaTemplateRendering(unittest.TestCase):
"""Test that jinja template variables match schema field names."""

def test_all_resource_fields_render_in_template(self):
"""Verify all schema resource fields are correctly rendered by the jinja template."""
template = Template(TEMPLATE_CONTENT)
rendered = template.render(
job_name="test-resources",
namespace="default",
image="pytorch:latest",
pull_policy="Always",
node_count=2,
accelerators=8,
vcpu=40,
memory=800,
efa=4,
accelerators_limit=8,
vcpu_limit=48,
memory_limit=900,
efa_limit=4,
instance_type="ml.p4d.24xlarge",
queue_name="test-queue",
priority="high",
preferred_topology="topology.kubernetes.io/zone",
required_topology="topology.kubernetes.io/zone",
tasks_per_node=8,
deep_health_check_passed_nodes_only=True,
service_account_name="training-sa",
scheduler_type="custom-scheduler",
max_retry=3,
)
# Requests
self.assertIn("nvidia.com/gpu: 8", rendered)
self.assertIn("cpu: 40", rendered)
self.assertIn("memory: 800Gi", rendered)
self.assertIn("vpc.amazonaws.com/efa: 4", rendered)
# Limits
self.assertIn("cpu: 48", rendered)
self.assertIn("memory: 900Gi", rendered)
self.assertEqual(rendered.count("nvidia.com/gpu: 8"), 2)
self.assertEqual(rendered.count("vpc.amazonaws.com/efa: 4"), 2)
# Replicas
self.assertIn("replicas: 2", rendered)
# Node selector
self.assertIn("node.kubernetes.io/instance-type: ml.p4d.24xlarge", rendered)
self.assertIn('sagemaker.amazonaws.com/deep-health-check-status: "Passed"', rendered)
# Kueue labels
self.assertIn("kueue.x-k8s.io/queue-name: test-queue", rendered)
self.assertIn("kueue.x-k8s.io/priority-class: high", rendered)
# Topology
self.assertIn("kueue.x-k8s.io/podset-preferred-topology: topology.kubernetes.io/zone", rendered)
self.assertIn("kueue.x-k8s.io/podset-required-topology: topology.kubernetes.io/zone", rendered)
# Container config
self.assertIn("imagePullPolicy: Always", rendered)
self.assertIn('nprocPerNode: "8"', rendered)
self.assertIn("serviceAccountName: training-sa", rendered)
self.assertIn("schedulerName: custom-scheduler", rendered)
# Run policy
self.assertIn("jobMaxRetryCount: 3", rendered)

def test_accelerator_partition_fields_render_in_template(self):
"""Verify accelerator partition fields render correctly (mutually exclusive with accelerators)."""
template = Template(TEMPLATE_CONTENT)
rendered = template.render(
job_name="test-mig",
namespace="default",
image="pytorch:latest",
accelerator_partition_type="mig-1g.5gb",
accelerator_partition_count=2,
accelerator_partition_limit=2,
instance_type="ml.p4d.24xlarge",
)
self.assertIn("nvidia.com/mig-1g.5gb: 2", rendered)
self.assertEqual(rendered.count("nvidia.com/mig-1g.5gb: 2"), 2)
self.assertIn('nvidia.com/mig.config.state: "success"', rendered)


class TestDeepHealthCheckNodeSelector(unittest.TestCase):
"""Test that deep_health_check_passed_nodes_only generates the correct nodeSelector label."""

Expand Down
Loading