From 5d4d8d75ec00cf7c9e6de595227f29f59ca758a2 Mon Sep 17 00:00:00 2001 From: pintaoz-aws Date: Wed, 25 Dec 2024 20:12:59 -0800 Subject: [PATCH 1/4] Add an option for user to remove inputs and container artifacts when using local model trainer --- .../modules/local_core/local_container.py | 13 ++++ src/sagemaker/modules/train/model_trainer.py | 5 +- .../modules/train/test_local_model_trainer.py | 73 +++++++++++++++++-- 3 files changed, 82 insertions(+), 9 deletions(-) diff --git a/src/sagemaker/modules/local_core/local_container.py b/src/sagemaker/modules/local_core/local_container.py index 5424f4f865..78627b5c42 100644 --- a/src/sagemaker/modules/local_core/local_container.py +++ b/src/sagemaker/modules/local_core/local_container.py @@ -108,6 +108,8 @@ class _LocalContainer(BaseModel): container_entrypoint: Optional[List[str]] container_arguments: Optional[List[str]] + _temperary_folders: List[str] = [] + def model_post_init(self, __context: Any): """Post init method to perform custom validation and set default values.""" self.hosts = [f"algo-{i}" for i in range(1, self.instance_count + 1)] @@ -146,12 +148,15 @@ def model_post_init(self, __context: Any): def train( self, wait: bool, + remove_inputs_and_container_artifacts: Optional[bool] = True, ) -> str: """Run a training job locally using docker-compose. Args: wait (bool): Whether to wait the training output before exiting. + remove_inputs_and_container_artifacts (Optional[bool]): + Whether to remove inputs and container artifacts after training. """ # create output/data folder since sagemaker-containers 2.0 expects it os.makedirs(os.path.join(self.container_root, "output", "data"), exist_ok=True) @@ -201,6 +206,13 @@ def train( # Print our Job Complete line logger.info("Local training job completed, output artifacts saved to %s", artifacts) + + if remove_inputs_and_container_artifacts: + shutil.rmtree(os.path.join(self.container_root, "input")) + for host in self.hosts: + shutil.rmtree(os.path.join(self.container_root, host)) + for folder in self._temperary_folders: + shutil.rmtree(os.path.join(self.container_root, folder)) return artifacts def retrieve_artifacts( @@ -540,6 +552,7 @@ def _get_data_source_local_path(self, data_source: DataSource): uri = data_source.s3_data_source.s3_uri parsed_uri = urlparse(uri) local_dir = TemporaryDirectory(prefix=os.path.join(self.container_root + "/")).name + self._temperary_folders.append(local_dir) download_folder(parsed_uri.netloc, parsed_uri.path, local_dir, self.sagemaker_session) return local_dir else: diff --git a/src/sagemaker/modules/train/model_trainer.py b/src/sagemaker/modules/train/model_trainer.py index 31decfaca9..b603fafd3f 100644 --- a/src/sagemaker/modules/train/model_trainer.py +++ b/src/sagemaker/modules/train/model_trainer.py @@ -203,6 +203,8 @@ class ModelTrainer(BaseModel): local_container_root (Optional[str]): The local root directory to store artifacts from a training job launched in "LOCAL_CONTAINER" mode. + remove_inputs_and_container_artifacts (Optional[bool]): + Whether to remove inputs and container artifacts after training. """ model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") @@ -227,6 +229,7 @@ class ModelTrainer(BaseModel): hyperparameters: Optional[Dict[str, Any]] = {} tags: Optional[List[Tag]] = None local_container_root: Optional[str] = os.getcwd() + remove_inputs_and_container_artifacts: Optional[bool] = True # Created Artifacts _latest_training_job: Optional[resources.TrainingJob] = PrivateAttr(default=None) @@ -646,7 +649,7 @@ def train( hyper_parameters=string_hyper_parameters, environment=self.environment, ) - local_container.train(wait) + local_container.train(wait, self.remove_inputs_and_container_artifacts) def create_input_data_channel( self, channel_name: str, data_source: DataSourceType, key_prefix: Optional[str] = None diff --git a/tests/integ/sagemaker/modules/train/test_local_model_trainer.py b/tests/integ/sagemaker/modules/train/test_local_model_trainer.py index adb5f85f3e..d7b41056d7 100644 --- a/tests/integ/sagemaker/modules/train/test_local_model_trainer.py +++ b/tests/integ/sagemaker/modules/train/test_local_model_trainer.py @@ -92,10 +92,7 @@ def test_single_container_local_mode_local_data(modules_sagemaker_session): "compressed_artifacts", "artifacts", "model", - "shared", - "input", "output", - "algo-1", ] for directory in directories: @@ -103,7 +100,7 @@ def test_single_container_local_mode_local_data(modules_sagemaker_session): delete_local_path(path) -def test_single_container_local_mode_s3_data(modules_sagemaker_session): +def test_single_container_local_mode_s3_data_remove_input(modules_sagemaker_session): with lock.lock(LOCK_PATH): try: # upload local data to s3 @@ -145,6 +142,70 @@ def test_single_container_local_mode_s3_data(modules_sagemaker_session): training_mode=Mode.LOCAL_CONTAINER, ) + model_trainer.train() + assert os.path.exists(os.path.join(CWD, "compressed_artifacts/model.tar.gz")) + finally: + subprocess.run(["docker", "compose", "down", "-v"]) + + assert not os.path.exists(os.path.join(CWD, "shared")) + assert not os.path.exists(os.path.join(CWD, "input")) + assert not os.path.exists(os.path.join(CWD, "algo-1")) + + directories = [ + "compressed_artifacts", + "artifacts", + "model", + "output", + ] + + for directory in directories: + path = os.path.join(CWD, directory) + delete_local_path(path) + + +def test_single_container_local_mode_s3_data_not_remove_input(modules_sagemaker_session): + with lock.lock(LOCK_PATH): + try: + # upload local data to s3 + session = modules_sagemaker_session + bucket = session.default_bucket() + session.upload_data( + path=os.path.join(SOURCE_DIR, "data/train/"), + bucket=bucket, + key_prefix="data/train", + ) + session.upload_data( + path=os.path.join(SOURCE_DIR, "data/test/"), + bucket=bucket, + key_prefix="data/test", + ) + + source_code = SourceCode( + source_dir=SOURCE_DIR, + entry_script="local_training_script.py", + ) + + compute = Compute( + instance_type="local_cpu", + instance_count=1, + ) + + # read input data from s3 + train_data = InputData(channel_name="train", data_source=f"s3://{bucket}/data/train/") + + test_data = InputData(channel_name="test", data_source=f"s3://{bucket}/data/test/") + + model_trainer = ModelTrainer( + training_image=DEFAULT_CPU_IMAGE, + sagemaker_session=modules_sagemaker_session, + source_code=source_code, + compute=compute, + input_data_config=[train_data, test_data], + base_job_name="local_mode_single_container_s3_data", + training_mode=Mode.LOCAL_CONTAINER, + remove_inputs_and_container_artifacts=False, + ) + model_trainer.train() assert os.path.exists(os.path.join(CWD, "compressed_artifacts/model.tar.gz")) finally: @@ -213,11 +274,7 @@ def test_multi_container_local_mode(modules_sagemaker_session): "compressed_artifacts", "artifacts", "model", - "shared", - "input", "output", - "algo-1", - "algo-2", ] for directory in directories: From e50d68154e4f07516bb3c008526bd80925f77cad Mon Sep 17 00:00:00 2001 From: pintaoz-aws Date: Wed, 25 Dec 2024 22:40:18 -0800 Subject: [PATCH 2/4] fix tests --- .../modules/local_core/local_container.py | 1 + .../modules/train/test_local_model_trainer.py | 68 ++++++++++++++++++- 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/src/sagemaker/modules/local_core/local_container.py b/src/sagemaker/modules/local_core/local_container.py index 78627b5c42..e9709efb8f 100644 --- a/src/sagemaker/modules/local_core/local_container.py +++ b/src/sagemaker/modules/local_core/local_container.py @@ -209,6 +209,7 @@ def train( if remove_inputs_and_container_artifacts: shutil.rmtree(os.path.join(self.container_root, "input")) + shutil.rmtree(os.path.join(self.container_root, "shared")) for host in self.hosts: shutil.rmtree(os.path.join(self.container_root, host)) for folder in self._temperary_folders: diff --git a/tests/integ/sagemaker/modules/train/test_local_model_trainer.py b/tests/integ/sagemaker/modules/train/test_local_model_trainer.py index d7b41056d7..9d8fcd8889 100644 --- a/tests/integ/sagemaker/modules/train/test_local_model_trainer.py +++ b/tests/integ/sagemaker/modules/train/test_local_model_trainer.py @@ -225,7 +225,7 @@ def test_single_container_local_mode_s3_data_not_remove_input(modules_sagemaker_ delete_local_path(path) -def test_multi_container_local_mode(modules_sagemaker_session): +def test_multi_container_local_mode_remove_input(modules_sagemaker_session): with lock.lock(LOCK_PATH): try: source_code = SourceCode( @@ -265,6 +265,68 @@ def test_multi_container_local_mode(modules_sagemaker_session): model_trainer.train() assert os.path.exists(os.path.join(CWD, "compressed_artifacts/model.tar.gz")) + + finally: + subprocess.run(["docker", "compose", "down", "-v"]) + + assert not os.path.exists(os.path.join(CWD, "shared")) + assert not os.path.exists(os.path.join(CWD, "input")) + assert not os.path.exists(os.path.join(CWD, "algo-1")) + assert not os.path.exists(os.path.join(CWD, "algo-2")) + + directories = [ + "compressed_artifacts", + "artifacts", + "model", + "output", + ] + + for directory in directories: + path = os.path.join(CWD, directory) + delete_local_path(path) + + +def test_multi_container_local_mode_not_remove_input(modules_sagemaker_session): + with lock.lock(LOCK_PATH): + try: + source_code = SourceCode( + source_dir=SOURCE_DIR, + entry_script="local_training_script.py", + ) + + distributed = Torchrun( + process_count_per_node=1, + ) + + compute = Compute( + instance_type="local_cpu", + instance_count=2, + ) + + train_data = InputData( + channel_name="train", + data_source=os.path.join(SOURCE_DIR, "data/train/"), + ) + + test_data = InputData( + channel_name="test", + data_source=os.path.join(SOURCE_DIR, "data/test/"), + ) + + model_trainer = ModelTrainer( + training_image=DEFAULT_CPU_IMAGE, + sagemaker_session=modules_sagemaker_session, + source_code=source_code, + distributed=distributed, + compute=compute, + input_data_config=[train_data, test_data], + base_job_name="local_mode_multi_container", + training_mode=Mode.LOCAL_CONTAINER, + remove_inputs_and_container_artifacts=False, + ) + + model_trainer.train() + assert os.path.exists(os.path.join(CWD, "compressed_artifacts/model.tar.gz")) assert os.path.exists(os.path.join(CWD, "algo-1")) assert os.path.exists(os.path.join(CWD, "algo-2")) @@ -274,7 +336,11 @@ def test_multi_container_local_mode(modules_sagemaker_session): "compressed_artifacts", "artifacts", "model", + "shared", + "input", "output", + "algo-1", + "algo-2", ] for directory in directories: From 205f8780e955a058796c27c6b0fb7b2d9278fbde Mon Sep 17 00:00:00 2001 From: pintaoz-aws Date: Thu, 26 Dec 2024 18:07:50 -0800 Subject: [PATCH 3/4] fix typo --- src/sagemaker/modules/local_core/local_container.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sagemaker/modules/local_core/local_container.py b/src/sagemaker/modules/local_core/local_container.py index e9709efb8f..3cfb52e626 100644 --- a/src/sagemaker/modules/local_core/local_container.py +++ b/src/sagemaker/modules/local_core/local_container.py @@ -108,7 +108,7 @@ class _LocalContainer(BaseModel): container_entrypoint: Optional[List[str]] container_arguments: Optional[List[str]] - _temperary_folders: List[str] = [] + _temporary_folders: List[str] = [] def model_post_init(self, __context: Any): """Post init method to perform custom validation and set default values.""" @@ -212,7 +212,7 @@ def train( shutil.rmtree(os.path.join(self.container_root, "shared")) for host in self.hosts: shutil.rmtree(os.path.join(self.container_root, host)) - for folder in self._temperary_folders: + for folder in self._temporary_folders: shutil.rmtree(os.path.join(self.container_root, folder)) return artifacts @@ -553,7 +553,7 @@ def _get_data_source_local_path(self, data_source: DataSource): uri = data_source.s3_data_source.s3_uri parsed_uri = urlparse(uri) local_dir = TemporaryDirectory(prefix=os.path.join(self.container_root + "/")).name - self._temperary_folders.append(local_dir) + self._temporary_folders.append(local_dir) download_folder(parsed_uri.netloc, parsed_uri.path, local_dir, self.sagemaker_session) return local_dir else: From 2dcd3afd64c2eb14d36f9c5147ae7bceca33ab92 Mon Sep 17 00:00:00 2001 From: pintaoz-aws Date: Sun, 29 Dec 2024 17:34:08 -0800 Subject: [PATCH 4/4] remove option --- .../modules/local_core/local_container.py | 16 +-- src/sagemaker/modules/train/model_trainer.py | 5 +- .../modules/train/test_local_model_trainer.py | 128 +----------------- 3 files changed, 9 insertions(+), 140 deletions(-) diff --git a/src/sagemaker/modules/local_core/local_container.py b/src/sagemaker/modules/local_core/local_container.py index 3cfb52e626..448330092d 100644 --- a/src/sagemaker/modules/local_core/local_container.py +++ b/src/sagemaker/modules/local_core/local_container.py @@ -148,15 +148,12 @@ def model_post_init(self, __context: Any): def train( self, wait: bool, - remove_inputs_and_container_artifacts: Optional[bool] = True, ) -> str: """Run a training job locally using docker-compose. Args: wait (bool): Whether to wait the training output before exiting. - remove_inputs_and_container_artifacts (Optional[bool]): - Whether to remove inputs and container artifacts after training. """ # create output/data folder since sagemaker-containers 2.0 expects it os.makedirs(os.path.join(self.container_root, "output", "data"), exist_ok=True) @@ -207,13 +204,12 @@ def train( # Print our Job Complete line logger.info("Local training job completed, output artifacts saved to %s", artifacts) - if remove_inputs_and_container_artifacts: - shutil.rmtree(os.path.join(self.container_root, "input")) - shutil.rmtree(os.path.join(self.container_root, "shared")) - for host in self.hosts: - shutil.rmtree(os.path.join(self.container_root, host)) - for folder in self._temporary_folders: - shutil.rmtree(os.path.join(self.container_root, folder)) + shutil.rmtree(os.path.join(self.container_root, "input")) + shutil.rmtree(os.path.join(self.container_root, "shared")) + for host in self.hosts: + shutil.rmtree(os.path.join(self.container_root, host)) + for folder in self._temporary_folders: + shutil.rmtree(os.path.join(self.container_root, folder)) return artifacts def retrieve_artifacts( diff --git a/src/sagemaker/modules/train/model_trainer.py b/src/sagemaker/modules/train/model_trainer.py index b603fafd3f..31decfaca9 100644 --- a/src/sagemaker/modules/train/model_trainer.py +++ b/src/sagemaker/modules/train/model_trainer.py @@ -203,8 +203,6 @@ class ModelTrainer(BaseModel): local_container_root (Optional[str]): The local root directory to store artifacts from a training job launched in "LOCAL_CONTAINER" mode. - remove_inputs_and_container_artifacts (Optional[bool]): - Whether to remove inputs and container artifacts after training. """ model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") @@ -229,7 +227,6 @@ class ModelTrainer(BaseModel): hyperparameters: Optional[Dict[str, Any]] = {} tags: Optional[List[Tag]] = None local_container_root: Optional[str] = os.getcwd() - remove_inputs_and_container_artifacts: Optional[bool] = True # Created Artifacts _latest_training_job: Optional[resources.TrainingJob] = PrivateAttr(default=None) @@ -649,7 +646,7 @@ def train( hyper_parameters=string_hyper_parameters, environment=self.environment, ) - local_container.train(wait, self.remove_inputs_and_container_artifacts) + local_container.train(wait) def create_input_data_channel( self, channel_name: str, data_source: DataSourceType, key_prefix: Optional[str] = None diff --git a/tests/integ/sagemaker/modules/train/test_local_model_trainer.py b/tests/integ/sagemaker/modules/train/test_local_model_trainer.py index 9d8fcd8889..7947b2fc87 100644 --- a/tests/integ/sagemaker/modules/train/test_local_model_trainer.py +++ b/tests/integ/sagemaker/modules/train/test_local_model_trainer.py @@ -100,7 +100,7 @@ def test_single_container_local_mode_local_data(modules_sagemaker_session): delete_local_path(path) -def test_single_container_local_mode_s3_data_remove_input(modules_sagemaker_session): +def test_single_container_local_mode_s3_data(modules_sagemaker_session): with lock.lock(LOCK_PATH): try: # upload local data to s3 @@ -163,69 +163,7 @@ def test_single_container_local_mode_s3_data_remove_input(modules_sagemaker_sess delete_local_path(path) -def test_single_container_local_mode_s3_data_not_remove_input(modules_sagemaker_session): - with lock.lock(LOCK_PATH): - try: - # upload local data to s3 - session = modules_sagemaker_session - bucket = session.default_bucket() - session.upload_data( - path=os.path.join(SOURCE_DIR, "data/train/"), - bucket=bucket, - key_prefix="data/train", - ) - session.upload_data( - path=os.path.join(SOURCE_DIR, "data/test/"), - bucket=bucket, - key_prefix="data/test", - ) - - source_code = SourceCode( - source_dir=SOURCE_DIR, - entry_script="local_training_script.py", - ) - - compute = Compute( - instance_type="local_cpu", - instance_count=1, - ) - - # read input data from s3 - train_data = InputData(channel_name="train", data_source=f"s3://{bucket}/data/train/") - - test_data = InputData(channel_name="test", data_source=f"s3://{bucket}/data/test/") - - model_trainer = ModelTrainer( - training_image=DEFAULT_CPU_IMAGE, - sagemaker_session=modules_sagemaker_session, - source_code=source_code, - compute=compute, - input_data_config=[train_data, test_data], - base_job_name="local_mode_single_container_s3_data", - training_mode=Mode.LOCAL_CONTAINER, - remove_inputs_and_container_artifacts=False, - ) - - model_trainer.train() - assert os.path.exists(os.path.join(CWD, "compressed_artifacts/model.tar.gz")) - finally: - subprocess.run(["docker", "compose", "down", "-v"]) - directories = [ - "compressed_artifacts", - "artifacts", - "model", - "shared", - "input", - "output", - "algo-1", - ] - - for directory in directories: - path = os.path.join(CWD, directory) - delete_local_path(path) - - -def test_multi_container_local_mode_remove_input(modules_sagemaker_session): +def test_multi_container_local_mode(modules_sagemaker_session): with lock.lock(LOCK_PATH): try: source_code = SourceCode( @@ -284,65 +222,3 @@ def test_multi_container_local_mode_remove_input(modules_sagemaker_session): for directory in directories: path = os.path.join(CWD, directory) delete_local_path(path) - - -def test_multi_container_local_mode_not_remove_input(modules_sagemaker_session): - with lock.lock(LOCK_PATH): - try: - source_code = SourceCode( - source_dir=SOURCE_DIR, - entry_script="local_training_script.py", - ) - - distributed = Torchrun( - process_count_per_node=1, - ) - - compute = Compute( - instance_type="local_cpu", - instance_count=2, - ) - - train_data = InputData( - channel_name="train", - data_source=os.path.join(SOURCE_DIR, "data/train/"), - ) - - test_data = InputData( - channel_name="test", - data_source=os.path.join(SOURCE_DIR, "data/test/"), - ) - - model_trainer = ModelTrainer( - training_image=DEFAULT_CPU_IMAGE, - sagemaker_session=modules_sagemaker_session, - source_code=source_code, - distributed=distributed, - compute=compute, - input_data_config=[train_data, test_data], - base_job_name="local_mode_multi_container", - training_mode=Mode.LOCAL_CONTAINER, - remove_inputs_and_container_artifacts=False, - ) - - model_trainer.train() - assert os.path.exists(os.path.join(CWD, "compressed_artifacts/model.tar.gz")) - assert os.path.exists(os.path.join(CWD, "algo-1")) - assert os.path.exists(os.path.join(CWD, "algo-2")) - - finally: - subprocess.run(["docker", "compose", "down", "-v"]) - directories = [ - "compressed_artifacts", - "artifacts", - "model", - "shared", - "input", - "output", - "algo-1", - "algo-2", - ] - - for directory in directories: - path = os.path.join(CWD, directory) - delete_local_path(path)