From 06e560522f8fd2c9e6c81ed8f9877cd8a06bdef6 Mon Sep 17 00:00:00 2001 From: Dwij Patel Date: Wed, 2 Jul 2025 00:31:49 +0530 Subject: [PATCH] fix --- agentops/instrumentation/__init__.py | 242 +++++++++------------------ 1 file changed, 78 insertions(+), 164 deletions(-) diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index 26b398c5c..1eb30d00b 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -75,24 +75,6 @@ class InstrumentorConfig(TypedDict): }, } -# Configuration for utility instrumentors -UTILITY_INSTRUMENTORS: dict[str, InstrumentorConfig] = { - "concurrent.futures": { - "module_name": "agentops.instrumentation.utilities.concurrent_futures", - "class_name": "ConcurrentFuturesInstrumentor", - "min_version": "3.7.0", # Python 3.7+ (concurrent.futures is stdlib) - "package_name": "python", # Special case for stdlib modules - }, -} - -# Define which packages require which utility instrumentors -# This maps package names to the list of utility instrumentors they depend on -UTILITY_DEPENDENCIES: dict[str, list[str]] = { - "mem0": ["concurrent.futures"], # mem0 uses concurrent.futures for parallel processing - # Add more dependencies as needed in the future - # "langchain": ["concurrent.futures", "asyncio"], -} - # Configuration for supported agentic libraries AGENTIC_LIBRARIES: dict[str, InstrumentorConfig] = { "crewai": { @@ -133,7 +115,7 @@ class InstrumentorConfig(TypedDict): } # Combine all target packages for monitoring -TARGET_PACKAGES = set(PROVIDERS.keys()) | set(AGENTIC_LIBRARIES.keys()) | set(UTILITY_INSTRUMENTORS.keys()) +TARGET_PACKAGES = set(PROVIDERS.keys()) | set(AGENTIC_LIBRARIES.keys()) # Create a single instance of the manager # _manager = InstrumentationManager() # Removed @@ -143,7 +125,6 @@ class InstrumentorConfig(TypedDict): _original_builtins_import = builtins.__import__ # Store original import _instrumenting_packages: Set[str] = set() _has_agentic_library: bool = False -_pending_utility_instrumentation: Set[str] = set() # Track packages that need utility instrumentation # New helper function to check module origin @@ -153,16 +134,6 @@ def _is_installed_package(module_obj: ModuleType, package_name_key: str) -> bool rather than a local module, especially when names might collide. `package_name_key` is the key from TARGET_PACKAGES (e.g., 'agents', 'google.adk'). """ - # Special case for stdlib modules (marked with package_name="python" in UTILITY_INSTRUMENTORS) - if ( - package_name_key in UTILITY_INSTRUMENTORS - and UTILITY_INSTRUMENTORS[package_name_key].get("package_name") == "python" - ): - logger.debug( - f"_is_installed_package: Module '{package_name_key}' is a Python standard library module. Considering it an installed package." - ) - return True - if not hasattr(module_obj, "__file__") or not module_obj.__file__: logger.debug( f"_is_installed_package: Module '{package_name_key}' has no __file__, assuming it might be an SDK namespace package. Returning True." @@ -255,7 +226,7 @@ def _uninstrument_providers(): def _should_instrument_package(package_name: str) -> bool: """ Determine if a package should be instrumented based on current state. - Handles special cases for agentic libraries, providers, and utility instrumentors. + Handles special cases for agentic libraries and providers. """ global _has_agentic_library @@ -264,22 +235,6 @@ def _should_instrument_package(package_name: str) -> bool: logger.debug(f"_should_instrument_package: '{package_name}' already instrumented by AgentOps. Skipping.") return False - # Utility instrumentors should only be instrumented when their dependent packages are active - if package_name in UTILITY_INSTRUMENTORS: - # Check if any package that depends on this utility is instrumented - for dependent_package, utilities in UTILITY_DEPENDENCIES.items(): - if package_name in utilities and _is_package_instrumented(dependent_package): - logger.debug( - f"_should_instrument_package: '{package_name}' is a utility instrumentor needed by '{dependent_package}'. Allowing." - ) - return True - - logger.debug( - f"_should_instrument_package: '{package_name}' is a utility instrumentor but no dependent packages are active. Skipping." - ) - return False - - # Only apply agentic/provider logic if it's NOT a utility instrumentor is_target_agentic = package_name in AGENTIC_LIBRARIES is_target_provider = package_name in PROVIDERS @@ -321,136 +276,88 @@ def _should_instrument_package(package_name: str) -> bool: return False -def _instrument_utility_dependencies(package_name: str): - """ - Instrument any utility dependencies required by the given package. - - Args: - package_name: The package that was just instrumented - """ - if package_name in UTILITY_DEPENDENCIES: - utilities_needed = UTILITY_DEPENDENCIES[package_name] - logger.debug( - f"_instrument_utility_dependencies: Package '{package_name}' requires utilities: {utilities_needed}" - ) - - for utility_name in utilities_needed: - if utility_name in UTILITY_INSTRUMENTORS and not _is_package_instrumented(utility_name): - logger.info(f"AgentOps: Instrumenting utility '{utility_name}' required by '{package_name}'") - - # Check if the utility module is available - if utility_name in sys.modules: - _perform_instrumentation(utility_name) - else: - logger.debug( - f"_instrument_utility_dependencies: Utility '{utility_name}' not yet imported, will instrument when imported" - ) - - def _perform_instrumentation(package_name: str): """Helper function to perform instrumentation for a given package.""" global _instrumenting_packages, _active_instrumentors, _has_agentic_library - - # Check if we're already instrumenting this package (prevent circular instrumentation) - if package_name in _instrumenting_packages: - logger.debug( - f"_perform_instrumentation: Already instrumenting '{package_name}', skipping to prevent circular instrumentation" - ) - return - if not _should_instrument_package(package_name): return # Get the appropriate configuration for the package - # Ensure package_name is a key in either PROVIDERS, AGENTIC_LIBRARIES, or UTILITY_INSTRUMENTORS - if ( - package_name not in PROVIDERS - and package_name not in AGENTIC_LIBRARIES - and package_name not in UTILITY_INSTRUMENTORS - ): + # Ensure package_name is a key in either PROVIDERS or AGENTIC_LIBRARIES + if package_name not in PROVIDERS and package_name not in AGENTIC_LIBRARIES: logger.debug( - f"_perform_instrumentation: Package '{package_name}' not found in PROVIDERS, AGENTIC_LIBRARIES, or UTILITY_INSTRUMENTORS. Skipping." + f"_perform_instrumentation: Package '{package_name}' not found in PROVIDERS or AGENTIC_LIBRARIES. Skipping." ) return - config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES.get(package_name) or UTILITY_INSTRUMENTORS[package_name] + config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES.get(package_name) loader = InstrumentorLoader(**config) - # Add to _instrumenting_packages to prevent circular instrumentation - _instrumenting_packages.add(package_name) - - try: - # instrument_one already checks loader.should_activate - instrumentor_instance = instrument_one(loader) - if instrumentor_instance is not None: - # Check if it was *actually* instrumented by instrument_one by seeing if the instrument method was called successfully. - # This relies on instrument_one returning None if its internal .instrument() call failed (if we revert that, this needs adjustment) - # For now, assuming instrument_one returns instance only on full success. - # User request was to return instrumentor even if .instrument() fails. So, we check if _agentops_instrumented_package_key was set by us. - - # Let's assume instrument_one might return an instance whose .instrument() failed. - # The key is set before _active_instrumentors.append, so if it's already there and matches, it means it's a re-attempt on the same package. - # The _is_package_instrumented check at the start of _should_instrument_package should prevent most re-entry for the same package_name. - - # Store the package key this instrumentor is for, to aid _is_package_instrumented - instrumentor_instance._agentops_instrumented_package_key = package_name - - # Add to active_instrumentors only if it's not a duplicate in terms of package_key being instrumented - # This is a safeguard, _is_package_instrumented should catch this earlier. - is_newly_added = True - for existing_inst in _active_instrumentors: - if ( - hasattr(existing_inst, "_agentops_instrumented_package_key") - and existing_inst._agentops_instrumented_package_key == package_name - ): - is_newly_added = False - logger.debug( - f"_perform_instrumentation: Instrumentor for '{package_name}' already in _active_instrumentors. Not adding again." - ) - break - if is_newly_added: - _active_instrumentors.append(instrumentor_instance) - - # If this was an agentic library AND it's newly effectively instrumented. + # instrument_one already checks loader.should_activate + instrumentor_instance = instrument_one(loader) + if instrumentor_instance is not None: + # Check if it was *actually* instrumented by instrument_one by seeing if the instrument method was called successfully. + # This relies on instrument_one returning None if its internal .instrument() call failed (if we revert that, this needs adjustment) + # For now, assuming instrument_one returns instance only on full success. + # User request was to return instrumentor even if .instrument() fails. So, we check if _agentops_instrumented_package_key was set by us. + + # Let's assume instrument_one might return an instance whose .instrument() failed. + # The key is set before _active_instrumentors.append, so if it's already there and matches, it means it's a re-attempt on the same package. + # The _is_package_instrumented check at the start of _should_instrument_package should prevent most re-entry for the same package_name. + + # Store the package key this instrumentor is for, to aid _is_package_instrumented + instrumentor_instance._agentops_instrumented_package_key = package_name + + # Add to active_instrumentors only if it's not a duplicate in terms of package_key being instrumented + # This is a safeguard, _is_package_instrumented should catch this earlier. + is_newly_added = True + for existing_inst in _active_instrumentors: if ( - package_name in AGENTIC_LIBRARIES and not _has_agentic_library - ): # Check _has_agentic_library to ensure this is the *first* one. - # _uninstrument_providers() was already called in _should_instrument_package for the first agentic library. - _has_agentic_library = True - - # Mark package for utility dependency instrumentation - # We defer this to avoid circular imports during package initialization - if package_name not in UTILITY_INSTRUMENTORS and is_newly_added: # Don't recursively instrument utilities - if package_name in UTILITY_DEPENDENCIES: - _pending_utility_instrumentation.add(package_name) - logger.debug( - f"_perform_instrumentation: Marked '{package_name}' for deferred utility instrumentation" - ) - else: - logger.debug( - f"_perform_instrumentation: instrument_one for '{package_name}' returned None. Not added to active instrumentors." - ) - finally: - # Always remove from _instrumenting_packages when done - _instrumenting_packages.discard(package_name) - + hasattr(existing_inst, "_agentops_instrumented_package_key") + and existing_inst._agentops_instrumented_package_key == package_name + ): + is_newly_added = False + logger.debug( + f"_perform_instrumentation: Instrumentor for '{package_name}' already in _active_instrumentors. Not adding again." + ) + break + if is_newly_added: + _active_instrumentors.append(instrumentor_instance) -def _process_pending_utility_instrumentation(): - """Process any pending utility instrumentations.""" - global _pending_utility_instrumentation + # If this was an agentic library AND it's newly effectively instrumented. + if ( + package_name in AGENTIC_LIBRARIES and not _has_agentic_library + ): # Check _has_agentic_library to ensure this is the *first* one. + # _uninstrument_providers() was already called in _should_instrument_package for the first agentic library. + _has_agentic_library = True - if not _pending_utility_instrumentation: - return + # Special case: If mem0 is instrumented, also instrument concurrent.futures + if package_name == "mem0" and is_newly_added: + try: + # Check if concurrent.futures module is available + + # Create config for concurrent.futures instrumentor + concurrent_config = InstrumentorConfig( + module_name="agentops.instrumentation.utilities.concurrent_futures", + class_name="ConcurrentFuturesInstrumentor", + min_version="3.7.0", # Python 3.7+ (concurrent.futures is stdlib) + package_name="python", # Special case for stdlib modules + ) - # Copy and clear to avoid modifying during iteration - pending = _pending_utility_instrumentation.copy() - _pending_utility_instrumentation.clear() + # Create and instrument concurrent.futures + concurrent_loader = InstrumentorLoader(**concurrent_config) + concurrent_instrumentor = instrument_one(concurrent_loader) - for package_name in pending: - try: - _instrument_utility_dependencies(package_name) - except Exception as e: - logger.debug(f"Error instrumenting utility dependencies for {package_name}: {e}") + if concurrent_instrumentor is not None: + concurrent_instrumentor._agentops_instrumented_package_key = "concurrent.futures" + _active_instrumentors.append(concurrent_instrumentor) + logger.info("AgentOps: Instrumented concurrent.futures as a dependency of mem0.") + except Exception as e: + logger.debug(f"Could not instrument concurrent.futures for mem0: {e}") + else: + logger.debug( + f"_perform_instrumentation: instrument_one for '{package_name}' returned None. Not added to active instrumentors." + ) def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), level=0): @@ -460,9 +367,6 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), """ global _instrumenting_packages, _has_agentic_library - # Process any pending utility instrumentations before handling new imports - _process_pending_utility_instrumentation() - # If an agentic library is already instrumented, skip all further instrumentation if _has_agentic_library: return _original_builtins_import(name, globals_dict, locals_dict, fromlist, level) @@ -503,7 +407,7 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), # Instrument all matching packages for package_to_check in packages_to_check: - if not _is_package_instrumented(package_to_check): + if package_to_check not in _instrumenting_packages and not _is_package_instrumented(package_to_check): target_module_obj = sys.modules.get(package_to_check) if target_module_obj: @@ -518,6 +422,7 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), f"_import_monitor: No module object found in sys.modules for '{package_to_check}', proceeding with SDK instrumentation attempt." ) + _instrumenting_packages.add(package_to_check) try: _perform_instrumentation(package_to_check) # If we just instrumented an agentic library, stop @@ -525,6 +430,8 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), break except Exception as e: logger.error(f"Error instrumenting {package_to_check}: {str(e)}") + finally: + _instrumenting_packages.discard(package_to_check) return module @@ -632,7 +539,11 @@ def instrument_all(): package_to_check = target break - if package_to_check and not _is_package_instrumented(package_to_check): + if ( + package_to_check + and package_to_check not in _instrumenting_packages + and not _is_package_instrumented(package_to_check) + ): target_module_obj = sys.modules.get(package_to_check) if target_module_obj: @@ -644,10 +555,13 @@ def instrument_all(): f"instrument_all: No module object found for '{package_to_check}' in sys.modules during startup scan. Proceeding cautiously." ) + _instrumenting_packages.add(package_to_check) try: _perform_instrumentation(package_to_check) except Exception as e: logger.error(f"Error instrumenting {package_to_check}: {str(e)}") + finally: + _instrumenting_packages.discard(package_to_check) def uninstrument_all():