From 7d1bed6454abb7ec8d749f1cd10769b391733510 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Wed, 19 Nov 2025 01:28:23 +0200 Subject: [PATCH 1/3] make test cleanup conditional to fix performance regression --- .../typehints/native_type_compatibility.py | 1 + sdks/python/conftest.py | 94 ++++++++++++++----- 2 files changed, 69 insertions(+), 26 deletions(-) diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py index 2360df142167..81dcbd07deca 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py @@ -342,6 +342,7 @@ def convert_to_beam_type(typ): # Unwrap Python 3.12 `type` aliases (TypeAliasType) to their underlying value. # This ensures Beam sees the actual aliased type (e.g., tuple[int, ...]). + import sys if sys.version_info >= (3, 12) and TypeAliasType is not None: if isinstance(typ, TypeAliasType): # pylint: disable=isinstance-second-argument-not-valid-type underlying = getattr(typ, '__value__', None) diff --git a/sdks/python/conftest.py b/sdks/python/conftest.py index 855af55911a1..3d5915c81b18 100644 --- a/sdks/python/conftest.py +++ b/sdks/python/conftest.py @@ -34,6 +34,16 @@ def pytest_addoption(parser): '--test-pipeline-options', help='Options to use in test pipelines. NOTE: Tests may ' 'ignore some or all of these options.') + parser.addoption( + '--enable-test-cleanup', + action='store_true', + default=None, + help='Enable expensive cleanup operations. Auto-enabled in CI.') + parser.addoption( + '--disable-test-cleanup', + action='store_true', + default=False, + help='Disable expensive cleanup operations even in CI.') # See pytest.ini for main collection rules. @@ -101,56 +111,88 @@ def configure_beam_rpc_timeouts(): print("Successfully configured Beam RPC timeouts") +def _running_in_ci(): + """Returns True if running in a CI environment.""" + return ( + os.getenv('GITHUB_ACTIONS') == 'true' or + os.getenv('CI') == 'true' or + os.getenv('CONTINUOUS_INTEGRATION') == 'true' + ) + + +def _should_enable_test_cleanup(config): + """Returns True if expensive cleanup operations should run.""" + if config.getoption('--disable-test-cleanup'): + result = False + reason = "disabled via --disable-test-cleanup" + elif config.getoption('--enable-test-cleanup'): + result = True + reason = "enabled via --enable-test-cleanup" + else: + if _running_in_ci(): + result = True + reason = "CI detected" + else: + result = False + reason = "local development" + + # Log once per session + if not hasattr(config, '_cleanup_decision_logged'): + print(f"\n[Test Cleanup] Enabled: {result} ({reason})") + config._cleanup_decision_logged = True + + return result + + @pytest.fixture(autouse=True) -def ensure_clean_state(): +def ensure_clean_state(request): """ - Ensure clean state before each test - to prevent cross-test contamination. + Ensures clean state between tests to prevent contamination. + + Expensive operations (sleeps, extra GC) only run in CI or when + explicitly enabled to keep local tests fast. """ import gc import threading import time - # Force garbage collection to clean up any lingering resources - gc.collect() + enable_cleanup = _should_enable_test_cleanup(request.config) + + if enable_cleanup: + gc.collect() - # Log active thread count for debugging thread_count = threading.active_count() - if thread_count > 50: # Increased threshold since we see 104 threads + if thread_count > 50: print(f"Warning: {thread_count} active threads detected before test") - - # Force a brief pause to let threads settle - time.sleep(0.5) - gc.collect() + if enable_cleanup: + time.sleep(0.5) + gc.collect() yield - # Enhanced cleanup after test try: - # Force more aggressive cleanup - gc.collect() - - # Brief pause to let any async operations complete - time.sleep(0.1) - - # Additional garbage collection - gc.collect() + if enable_cleanup: + gc.collect() + time.sleep(0.1) + gc.collect() except Exception as e: print(f"Warning: Cleanup error: {e}") @pytest.fixture(autouse=True) -def enhance_mock_stability(): - """Enhance mock stability in DinD environment.""" +def enhance_mock_stability(request): + """Improves mock stability in DinD environment.""" import time - # Brief pause before test to ensure clean mock state - time.sleep(0.05) + enable_cleanup = _should_enable_test_cleanup(request.config) + + if enable_cleanup: + time.sleep(0.05) yield - # Brief pause after test to let mocks clean up - time.sleep(0.05) + if enable_cleanup: + time.sleep(0.05) def pytest_configure(config): From 7766462e2892259a991471a02c6cff5108310256 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Thu, 20 Nov 2025 11:49:33 +0200 Subject: [PATCH 2/3] fixed PEP 8 violations and optimize test cleanup --- .../typehints/native_type_compatibility.py | 1 - sdks/python/conftest.py | 31 +++++++++---------- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py index 81dcbd07deca..2360df142167 100644 --- a/sdks/python/apache_beam/typehints/native_type_compatibility.py +++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py @@ -342,7 +342,6 @@ def convert_to_beam_type(typ): # Unwrap Python 3.12 `type` aliases (TypeAliasType) to their underlying value. # This ensures Beam sees the actual aliased type (e.g., tuple[int, ...]). - import sys if sys.version_info >= (3, 12) and TypeAliasType is not None: if isinstance(typ, TypeAliasType): # pylint: disable=isinstance-second-argument-not-valid-type underlying = getattr(typ, '__value__', None) diff --git a/sdks/python/conftest.py b/sdks/python/conftest.py index 3d5915c81b18..3655c8c09949 100644 --- a/sdks/python/conftest.py +++ b/sdks/python/conftest.py @@ -17,8 +17,11 @@ """Pytest configuration and custom hooks.""" +import gc import os import sys +import threading +import time from types import SimpleNamespace import pytest @@ -37,13 +40,9 @@ def pytest_addoption(parser): parser.addoption( '--enable-test-cleanup', action='store_true', - default=None, - help='Enable expensive cleanup operations. Auto-enabled in CI.') - parser.addoption( - '--disable-test-cleanup', - action='store_true', default=False, - help='Disable expensive cleanup operations even in CI.') + help='Enable expensive cleanup operations. Auto-enabled in CI by default. ' + 'Use this flag to explicitly enable cleanup in local development.') # See pytest.ini for main collection rules. @@ -121,11 +120,14 @@ def _running_in_ci(): def _should_enable_test_cleanup(config): - """Returns True if expensive cleanup operations should run.""" - if config.getoption('--disable-test-cleanup'): - result = False - reason = "disabled via --disable-test-cleanup" - elif config.getoption('--enable-test-cleanup'): + """Returns True if expensive cleanup operations should run. + + Result is cached on config object to avoid re-computation per test. + """ + if hasattr(config, '_should_enable_test_cleanup_result'): + return config._should_enable_test_cleanup_result + + if config.getoption('--enable-test-cleanup'): result = True reason = "enabled via --enable-test-cleanup" else: @@ -141,6 +143,7 @@ def _should_enable_test_cleanup(config): print(f"\n[Test Cleanup] Enabled: {result} ({reason})") config._cleanup_decision_logged = True + config._should_enable_test_cleanup_result = result return result @@ -152,10 +155,6 @@ def ensure_clean_state(request): Expensive operations (sleeps, extra GC) only run in CI or when explicitly enabled to keep local tests fast. """ - import gc - import threading - import time - enable_cleanup = _should_enable_test_cleanup(request.config) if enable_cleanup: @@ -182,8 +181,6 @@ def ensure_clean_state(request): @pytest.fixture(autouse=True) def enhance_mock_stability(request): """Improves mock stability in DinD environment.""" - import time - enable_cleanup = _should_enable_test_cleanup(request.config) if enable_cleanup: From 6251616ecb771f9f82a3b3922ac32b9ee2cb4f34 Mon Sep 17 00:00:00 2001 From: aIbrahiim Date: Sun, 23 Nov 2025 16:44:22 +0200 Subject: [PATCH 3/3] Change cleanup fixtures to class scope to reduce test overhead --- sdks/python/conftest.py | 99 ++++++++++++----------------------------- 1 file changed, 29 insertions(+), 70 deletions(-) diff --git a/sdks/python/conftest.py b/sdks/python/conftest.py index 3655c8c09949..683bd433e8a9 100644 --- a/sdks/python/conftest.py +++ b/sdks/python/conftest.py @@ -37,12 +37,6 @@ def pytest_addoption(parser): '--test-pipeline-options', help='Options to use in test pipelines. NOTE: Tests may ' 'ignore some or all of these options.') - parser.addoption( - '--enable-test-cleanup', - action='store_true', - default=False, - help='Enable expensive cleanup operations. Auto-enabled in CI by default. ' - 'Use this flag to explicitly enable cleanup in local development.') # See pytest.ini for main collection rules. @@ -110,86 +104,51 @@ def configure_beam_rpc_timeouts(): print("Successfully configured Beam RPC timeouts") -def _running_in_ci(): - """Returns True if running in a CI environment.""" - return ( - os.getenv('GITHUB_ACTIONS') == 'true' or - os.getenv('CI') == 'true' or - os.getenv('CONTINUOUS_INTEGRATION') == 'true' - ) - - -def _should_enable_test_cleanup(config): - """Returns True if expensive cleanup operations should run. - - Result is cached on config object to avoid re-computation per test. +@pytest.fixture(scope="class", autouse=True) +def ensure_clean_state(): """ - if hasattr(config, '_should_enable_test_cleanup_result'): - return config._should_enable_test_cleanup_result - - if config.getoption('--enable-test-cleanup'): - result = True - reason = "enabled via --enable-test-cleanup" - else: - if _running_in_ci(): - result = True - reason = "CI detected" - else: - result = False - reason = "local development" - - # Log once per session - if not hasattr(config, '_cleanup_decision_logged'): - print(f"\n[Test Cleanup] Enabled: {result} ({reason})") - config._cleanup_decision_logged = True - - config._should_enable_test_cleanup_result = result - return result - - -@pytest.fixture(autouse=True) -def ensure_clean_state(request): + Ensure clean state before each test class + to prevent cross-test contamination. + Runs once per test class instead of per test to reduce overhead. """ - Ensures clean state between tests to prevent contamination. - - Expensive operations (sleeps, extra GC) only run in CI or when - explicitly enabled to keep local tests fast. - """ - enable_cleanup = _should_enable_test_cleanup(request.config) - - if enable_cleanup: - gc.collect() + # Force garbage collection to clean up any lingering resources + gc.collect() + # Log active thread count for debugging thread_count = threading.active_count() if thread_count > 50: - print(f"Warning: {thread_count} active threads detected before test") - if enable_cleanup: - time.sleep(0.5) - gc.collect() + print(f"Warning: {thread_count} active threads detected before test class") + # Force a brief pause to let threads settle + time.sleep(0.5) + gc.collect() yield + # Enhanced cleanup after test class try: - if enable_cleanup: - gc.collect() - time.sleep(0.1) - gc.collect() + # Force more aggressive cleanup + gc.collect() + # Brief pause to let any async operations complete + time.sleep(0.1) + # Additional garbage collection + gc.collect() except Exception as e: print(f"Warning: Cleanup error: {e}") -@pytest.fixture(autouse=True) -def enhance_mock_stability(request): - """Improves mock stability in DinD environment.""" - enable_cleanup = _should_enable_test_cleanup(request.config) - - if enable_cleanup: - time.sleep(0.05) +@pytest.fixture(scope="class", autouse=True) +def enhance_mock_stability(): + """ + Enhance mock stability in DinD environment. + Runs once per test class instead of per test to reduce overhead. + """ + # Brief pause before test class to ensure clean mock state + time.sleep(0.05) yield - if enable_cleanup: - time.sleep(0.05) + # Brief pause after test class to let mocks clean up + time.sleep(0.05) def pytest_configure(config):