From 8f53a84be9f6c285ee382df10a808f352f42c973 Mon Sep 17 00:00:00 2001 From: Nayan Mathur Date: Mon, 8 Dec 2025 17:43:54 +0530 Subject: [PATCH 1/2] Fix race condition in stager.py when starting multiple pipelines concurrently (fixes #36847) --- .../apache_beam/runners/portability/stager.py | 2 +- .../runners/portability/stager_test.py | 56 +++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index aa03082f0d57..17cf6514cace 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -218,7 +218,7 @@ def create_job_resources( is None) else setup_options.requirements_cache) if (setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE and not os.path.exists(requirements_cache_path)): - os.makedirs(requirements_cache_path) + os.makedirs(requirements_cache_path, exist_ok=True) # Stage a requirements file if present. if setup_options.requirements_file is not None: diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py index 233e0c3dcea1..3638326bf9a8 100644 --- a/sdks/python/apache_beam/runners/portability/stager_test.py +++ b/sdks/python/apache_beam/runners/portability/stager_test.py @@ -40,6 +40,9 @@ from apache_beam.runners.internal import names from apache_beam.runners.portability import stager +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import as_completed + _LOGGER = logging.getLogger(__name__) try: @@ -913,6 +916,59 @@ def test_populate_requirements_cache_with_local_files(self): self.assertNotIn('fake_pypi', extra_packages_contents) self.assertIn('local_package', extra_packages_contents) + def test_requirements_cache_creation_no_race_condition(self): + base_cache_dir = self.make_temp_dir() + cache_dir = os.path.join(base_cache_dir, 'test-requirements-cache') + # Ensure the directory doesn't exist initially + if os.path.exists(cache_dir): + shutil.rmtree(cache_dir) + + source_dir = self.make_temp_dir() + requirements_file = os.path.join(source_dir, stager.REQUIREMENTS_FILE) + self.create_temp_file(requirements_file, 'requests>=2.0.0\n') + + def create_resources_with_cache(): + temp_dir = tempfile.mkdtemp() + try: + options = PipelineOptions() + self.update_options(options) + setup_options = options.view_as(SetupOptions) + setup_options.requirements_file = requirements_file + setup_options.requirements_cache = cache_dir + # This should create the cache directory if it doesn't exist + stager.Stager.create_job_resources( + options, + temp_dir, + populate_requirements_cache=self.populate_requirements_cache) + return True, None + except Exception as e: + return False, e + finally: + if os.path.exists(temp_dir): + shutil.rmtree(temp_dir) + + # Run multiple threads concurrently to create + # resources with the same cache dir. + num_threads = 10 + successes = 0 + with ThreadPoolExecutor(max_workers=num_threads) as executor: + futures = [ + executor.submit(create_resources_with_cache) + for _ in range(num_threads) + ] + + for future in as_completed(futures): + success, _ = future.result() + if success: + successes += 1 + # All threads should succeed + self.assertEqual( + successes, + num_threads, + f"Expected all {num_threads} threads to pass, but got errors.") + # Verify that the cache directory exists + self.assertTrue(os.path.isdir(cache_dir)) + class TestStager(stager.Stager): def stage_artifact(self, local_path_to_artifact, artifact_name, sha256): From ce12752f64fcb93f08866ccdeab912748d09ec03 Mon Sep 17 00:00:00 2001 From: Nayan Mathur Date: Tue, 9 Dec 2025 09:22:54 +0530 Subject: [PATCH 2/2] remove race condition test --- .../runners/portability/stager_test.py | 56 ------------------- 1 file changed, 56 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/stager_test.py b/sdks/python/apache_beam/runners/portability/stager_test.py index 3638326bf9a8..233e0c3dcea1 100644 --- a/sdks/python/apache_beam/runners/portability/stager_test.py +++ b/sdks/python/apache_beam/runners/portability/stager_test.py @@ -40,9 +40,6 @@ from apache_beam.runners.internal import names from apache_beam.runners.portability import stager -from concurrent.futures import ThreadPoolExecutor -from concurrent.futures import as_completed - _LOGGER = logging.getLogger(__name__) try: @@ -916,59 +913,6 @@ def test_populate_requirements_cache_with_local_files(self): self.assertNotIn('fake_pypi', extra_packages_contents) self.assertIn('local_package', extra_packages_contents) - def test_requirements_cache_creation_no_race_condition(self): - base_cache_dir = self.make_temp_dir() - cache_dir = os.path.join(base_cache_dir, 'test-requirements-cache') - # Ensure the directory doesn't exist initially - if os.path.exists(cache_dir): - shutil.rmtree(cache_dir) - - source_dir = self.make_temp_dir() - requirements_file = os.path.join(source_dir, stager.REQUIREMENTS_FILE) - self.create_temp_file(requirements_file, 'requests>=2.0.0\n') - - def create_resources_with_cache(): - temp_dir = tempfile.mkdtemp() - try: - options = PipelineOptions() - self.update_options(options) - setup_options = options.view_as(SetupOptions) - setup_options.requirements_file = requirements_file - setup_options.requirements_cache = cache_dir - # This should create the cache directory if it doesn't exist - stager.Stager.create_job_resources( - options, - temp_dir, - populate_requirements_cache=self.populate_requirements_cache) - return True, None - except Exception as e: - return False, e - finally: - if os.path.exists(temp_dir): - shutil.rmtree(temp_dir) - - # Run multiple threads concurrently to create - # resources with the same cache dir. - num_threads = 10 - successes = 0 - with ThreadPoolExecutor(max_workers=num_threads) as executor: - futures = [ - executor.submit(create_resources_with_cache) - for _ in range(num_threads) - ] - - for future in as_completed(futures): - success, _ = future.result() - if success: - successes += 1 - # All threads should succeed - self.assertEqual( - successes, - num_threads, - f"Expected all {num_threads} threads to pass, but got errors.") - # Verify that the cache directory exists - self.assertTrue(os.path.isdir(cache_dir)) - class TestStager(stager.Stager): def stage_artifact(self, local_path_to_artifact, artifact_name, sha256):