From 8b803ab86674570a24a407805ed93de4f3b2dff9 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Fri, 16 Jan 2026 16:02:08 -0800 Subject: [PATCH 1/8] initial pyJWT instrumentation --- drift/core/drift_sdk.py | 9 ++ drift/instrumentation/pyjwt/__init__.py | 5 + .../instrumentation/pyjwt/instrumentation.py | 100 ++++++++++++++++++ 3 files changed, 114 insertions(+) create mode 100644 drift/instrumentation/pyjwt/__init__.py create mode 100644 drift/instrumentation/pyjwt/instrumentation.py diff --git a/drift/core/drift_sdk.py b/drift/core/drift_sdk.py index 968d1bc..dabafa1 100644 --- a/drift/core/drift_sdk.py +++ b/drift/core/drift_sdk.py @@ -481,6 +481,15 @@ def _init_auto_instrumentations(self) -> None: except Exception as e: logger.debug(f"Socket instrumentation initialization failed: {e}") + # PyJWT instrumentation for JWT verification bypass + try: + from ..instrumentation.pyjwt import PyJWTInstrumentation + + _ = PyJWTInstrumentation(mode=self.mode) + logger.debug("PyJWT instrumentation registered (REPLAY mode)") + except Exception as e: + logger.debug(f"PyJWT instrumentation registration failed: {e}") + def create_env_vars_snapshot(self) -> None: """Create a span capturing all environment variables. diff --git a/drift/instrumentation/pyjwt/__init__.py b/drift/instrumentation/pyjwt/__init__.py new file mode 100644 index 0000000..d99bb0e --- /dev/null +++ b/drift/instrumentation/pyjwt/__init__.py @@ -0,0 +1,5 @@ +"""PyJWT instrumentation for REPLAY mode.""" + +from .instrumentation import PyJWTInstrumentation + +__all__ = ["PyJWTInstrumentation"] diff --git a/drift/instrumentation/pyjwt/instrumentation.py b/drift/instrumentation/pyjwt/instrumentation.py new file mode 100644 index 0000000..775df85 --- /dev/null +++ b/drift/instrumentation/pyjwt/instrumentation.py @@ -0,0 +1,100 @@ +"""PyJWT instrumentation for REPLAY mode. + +Patches PyJWT to disable all verification during test replay: +1. _merge_options - returns all verification options as False +2. _verify_signature - no-op (defense in depth) +3. _validate_claims - no-op (defense in depth) + +Only active in REPLAY mode. +""" + +from __future__ import annotations + +import logging +from types import ModuleType + +from ...core.types import TuskDriftMode +from ..base import InstrumentationBase + +logger = logging.getLogger(__name__) + + +class PyJWTInstrumentation(InstrumentationBase): + """Patches PyJWT to disable verification in REPLAY mode.""" + + def __init__( + self, mode: TuskDriftMode = TuskDriftMode.DISABLED, enabled: bool = True + ) -> None: + self.mode = mode + should_enable = enabled and mode == TuskDriftMode.REPLAY + + super().__init__( + name="PyJWTInstrumentation", + module_name="jwt", + supported_versions="*", + enabled=should_enable, + ) + + def patch(self, module: ModuleType) -> None: + if self.mode != TuskDriftMode.REPLAY: + return + + self._patch_merge_options() + self._patch_signature_verification() + self._patch_claim_validation() + logger.debug("[PyJWTInstrumentation] All patches applied") + + def _patch_signature_verification(self) -> None: + """No-op signature verification.""" + try: + from jwt import api_jws + + def patched_verify_signature(self, *args, **kwargs): + logger.debug("[PyJWTInstrumentation] _verify_signature called - skipping verification") + return None + + api_jws.PyJWS._verify_signature = patched_verify_signature + logger.debug("[PyJWTInstrumentation] Patched PyJWS._verify_signature") + except Exception as e: + logger.warning(f"[PyJWTInstrumentation] Failed to patch _verify_signature: {e}") + + def _patch_claim_validation(self) -> None: + """No-op claim validation.""" + try: + from jwt import api_jwt + + def patched_validate_claims(self, *args, **kwargs): + logger.debug("[PyJWTInstrumentation] _validate_claims called - skipping validation") + return None + + api_jwt.PyJWT._validate_claims = patched_validate_claims + logger.debug("[PyJWTInstrumentation] Patched PyJWT._validate_claims") + except Exception as e: + logger.warning(f"[PyJWTInstrumentation] Failed to patch _validate_claims: {e}") + + def _patch_merge_options(self) -> None: + """Patch _merge_options to always return disabled verification options.""" + try: + from jwt import api_jwt + + disabled_options = { + "verify_signature": False, + "verify_exp": False, + "verify_nbf": False, + "verify_iat": False, + "verify_aud": False, + "verify_iss": False, + "verify_sub": False, + "verify_jti": False, + "require": [], + "strict_aud": False, + } + + def patched_merge_options(self, options=None): + logger.debug("[PyJWTInstrumentation] _merge_options called - returning disabled options") + return disabled_options + + api_jwt.PyJWT._merge_options = patched_merge_options + logger.debug("[PyJWTInstrumentation] Patched PyJWT._merge_options") + except Exception as e: + logger.warning(f"[PyJWTInstrumentation] Failed to patch _merge_options: {e}") From 3303afe2137126fcdc63fa5f09613176d9e3bb49 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Fri, 16 Jan 2026 18:30:13 -0800 Subject: [PATCH 2/8] urllib instrumentation + span stauts error updates --- drift/core/content_type_utils.py | 4 +- drift/core/drift_sdk.py | 10 + drift/core/mock_utils.py | 4 +- drift/core/trace_blocking_manager.py | 4 +- drift/instrumentation/django/middleware.py | 4 +- .../fastapi/instrumentation.py | 4 +- .../instrumentation/pyjwt/instrumentation.py | 4 +- drift/instrumentation/urllib/__init__.py | 5 + .../urllib/e2e-tests/.tusk/config.yaml | 27 + .../urllib/e2e-tests/Dockerfile | 21 + .../urllib/e2e-tests/docker-compose.yml | 19 + .../urllib/e2e-tests/entrypoint.py | 34 + .../urllib/e2e-tests/requirements.txt | 2 + drift/instrumentation/urllib/e2e-tests/run.sh | 64 ++ .../urllib/e2e-tests/src/app.py | 333 ++++++ .../urllib/e2e-tests/src/test_requests.py | 57 + .../instrumentation/urllib/instrumentation.py | 1002 +++++++++++++++++ drift/instrumentation/wsgi/handler.py | 3 +- pyproject.toml | 1 + 19 files changed, 1588 insertions(+), 14 deletions(-) create mode 100644 drift/instrumentation/urllib/__init__.py create mode 100644 drift/instrumentation/urllib/e2e-tests/.tusk/config.yaml create mode 100644 drift/instrumentation/urllib/e2e-tests/Dockerfile create mode 100644 drift/instrumentation/urllib/e2e-tests/docker-compose.yml create mode 100644 drift/instrumentation/urllib/e2e-tests/entrypoint.py create mode 100644 drift/instrumentation/urllib/e2e-tests/requirements.txt create mode 100755 drift/instrumentation/urllib/e2e-tests/run.sh create mode 100644 drift/instrumentation/urllib/e2e-tests/src/app.py create mode 100644 drift/instrumentation/urllib/e2e-tests/src/test_requests.py create mode 100644 drift/instrumentation/urllib/instrumentation.py diff --git a/drift/core/content_type_utils.py b/drift/core/content_type_utils.py index a4e0aea..3ca99ca 100644 --- a/drift/core/content_type_utils.py +++ b/drift/core/content_type_utils.py @@ -17,7 +17,7 @@ "application/vnd.api+json": DecodedType.JSON, # Plain Text (ALLOWED) "text/plain": DecodedType.PLAIN_TEXT, - # HTML (BLOCKED) + # HTML "text/html": DecodedType.HTML, "application/xhtml+xml": DecodedType.HTML, # CSS (BLOCKED) @@ -113,7 +113,7 @@ # Only JSON and plain text are acceptable (matches Node SDK) # All other content types will cause trace blocking -ACCEPTABLE_DECODED_TYPES = {DecodedType.JSON, DecodedType.PLAIN_TEXT} +ACCEPTABLE_DECODED_TYPES = {DecodedType.JSON, DecodedType.PLAIN_TEXT, DecodedType.HTML} def get_decoded_type(content_type: str | None) -> DecodedType | None: diff --git a/drift/core/drift_sdk.py b/drift/core/drift_sdk.py index dabafa1..5eca8fe 100644 --- a/drift/core/drift_sdk.py +++ b/drift/core/drift_sdk.py @@ -406,6 +406,16 @@ def _init_auto_instrumentations(self) -> None: except ImportError: pass + try: + import urllib.request + + from ..instrumentation.urllib import UrllibInstrumentation + + _ = UrllibInstrumentation() + logger.debug("urllib instrumentation initialized") + except ImportError: + pass + # Initialize PostgreSQL instrumentation before Django # Instrument BOTH psycopg2 and psycopg if available # This allows apps to use either or both diff --git a/drift/core/mock_utils.py b/drift/core/mock_utils.py index 8616dcc..7bc481e 100644 --- a/drift/core/mock_utils.py +++ b/drift/core/mock_utils.py @@ -183,7 +183,9 @@ def find_mock_response_sync( mock_response = sdk.request_mock_sync(mock_request) if not mock_response or not mock_response.found: - logger.debug(f"No matching mock found for {trace_id} with input value: {input_value}") + logger.debug( + f"No matching mock found for {trace_id} with input value: {input_value}, input schema: {input_schema_merges}, input schema hash: {outbound_span.input_schema_hash}, input value hash: {outbound_span.input_value_hash}" + ) return None logger.debug(f"Found mock response for {trace_id}") diff --git a/drift/core/trace_blocking_manager.py b/drift/core/trace_blocking_manager.py index 6c46eba..7c7e019 100644 --- a/drift/core/trace_blocking_manager.py +++ b/drift/core/trace_blocking_manager.py @@ -204,7 +204,7 @@ def should_block_span(span: CleanSpanData) -> bool: """Check if a span should be blocked due to size or server error status. Blocks the trace if: - 1. The span is a SERVER span with ERROR status (e.g., HTTP >= 300) + 1. The span is a SERVER span with ERROR status (e.g., HTTP >= 400) 2. The span exceeds the maximum size limit (1MB) This matches Node SDK behavior in TdSpanExporter.ts. @@ -221,7 +221,7 @@ def should_block_span(span: CleanSpanData) -> bool: span_name = span.name blocking_manager = TraceBlockingManager.get_instance() - # Check 1: Block SERVER spans with ERROR status (e.g., HTTP >= 300) + # Check 1: Block SERVER spans with ERROR status (e.g., HTTP >= 400) if span.kind == SpanKind.SERVER and span.status.code == StatusCode.ERROR: logger.debug(f"Blocking trace {trace_id} - server span '{span_name}' has error status") blocking_manager.block_trace(trace_id, reason="server_error") diff --git a/drift/instrumentation/django/middleware.py b/drift/instrumentation/django/middleware.py index cb5f486..23b1568 100644 --- a/drift/instrumentation/django/middleware.py +++ b/drift/instrumentation/django/middleware.py @@ -390,8 +390,8 @@ def dict_to_schema_merges(merges_dict): duration_seconds = duration_ns // 1_000_000_000 duration_nanos = duration_ns % 1_000_000_000 - # Match Node SDK: >= 300 is considered an error (redirects, client errors, server errors) - if status_code >= 300: + # Match Node SDK: >= 400 is considered an error + if status_code >= 400: status = SpanStatus(code=StatusCode.ERROR, message=f"HTTP {status_code}") else: status = SpanStatus(code=StatusCode.OK, message="") diff --git a/drift/instrumentation/fastapi/instrumentation.py b/drift/instrumentation/fastapi/instrumentation.py index 7a37073..6fcb700 100644 --- a/drift/instrumentation/fastapi/instrumentation.py +++ b/drift/instrumentation/fastapi/instrumentation.py @@ -530,8 +530,8 @@ def _finalize_span( TuskDrift.get_instance() status_code = response_data.get("status_code", 200) - # Match Node SDK: >= 300 is considered an error (redirects, client errors, server errors) - if status_code >= 300: + # Match Node SDK: >= 400 is considered an error + if status_code >= 400: span_info.span.set_status(Status(OTelStatusCode.ERROR, f"HTTP {status_code}")) else: span_info.span.set_status(Status(OTelStatusCode.OK)) diff --git a/drift/instrumentation/pyjwt/instrumentation.py b/drift/instrumentation/pyjwt/instrumentation.py index 775df85..2fb2d1d 100644 --- a/drift/instrumentation/pyjwt/instrumentation.py +++ b/drift/instrumentation/pyjwt/instrumentation.py @@ -22,9 +22,7 @@ class PyJWTInstrumentation(InstrumentationBase): """Patches PyJWT to disable verification in REPLAY mode.""" - def __init__( - self, mode: TuskDriftMode = TuskDriftMode.DISABLED, enabled: bool = True - ) -> None: + def __init__(self, mode: TuskDriftMode = TuskDriftMode.DISABLED, enabled: bool = True) -> None: self.mode = mode should_enable = enabled and mode == TuskDriftMode.REPLAY diff --git a/drift/instrumentation/urllib/__init__.py b/drift/instrumentation/urllib/__init__.py new file mode 100644 index 0000000..9d02da8 --- /dev/null +++ b/drift/instrumentation/urllib/__init__.py @@ -0,0 +1,5 @@ +"""urllib.request instrumentation module.""" + +from .instrumentation import UrllibInstrumentation + +__all__ = ["UrllibInstrumentation"] diff --git a/drift/instrumentation/urllib/e2e-tests/.tusk/config.yaml b/drift/instrumentation/urllib/e2e-tests/.tusk/config.yaml new file mode 100644 index 0000000..41294f6 --- /dev/null +++ b/drift/instrumentation/urllib/e2e-tests/.tusk/config.yaml @@ -0,0 +1,27 @@ +version: 1 + +service: + id: "urllib-e2e-test-id" + name: "urllib-e2e-test" + port: 8000 + start: + command: "python src/app.py" + readiness_check: + command: "curl -f http://localhost:8000/health" + timeout: 45s + interval: 5s + +tusk_api: + url: "http://localhost:8000" + +test_execution: + concurrent_limit: 10 + batch_size: 10 + timeout: 30s + +recording: + sampling_rate: 1.0 + export_spans: false + +replay: + enable_telemetry: false diff --git a/drift/instrumentation/urllib/e2e-tests/Dockerfile b/drift/instrumentation/urllib/e2e-tests/Dockerfile new file mode 100644 index 0000000..f817d5d --- /dev/null +++ b/drift/instrumentation/urllib/e2e-tests/Dockerfile @@ -0,0 +1,21 @@ +FROM python-e2e-base:latest + +# Copy SDK source for editable install +COPY . /sdk + +# Copy test files +COPY drift/instrumentation/urllib/e2e-tests /app + +WORKDIR /app + +# Install dependencies (requirements.txt uses -e /sdk for SDK) +RUN pip install -q -r requirements.txt + +# Make entrypoint executable +RUN chmod +x entrypoint.py + +# Create .tusk directories +RUN mkdir -p /app/.tusk/traces /app/.tusk/logs + +# Run entrypoint +ENTRYPOINT ["python", "entrypoint.py"] diff --git a/drift/instrumentation/urllib/e2e-tests/docker-compose.yml b/drift/instrumentation/urllib/e2e-tests/docker-compose.yml new file mode 100644 index 0000000..c27ba21 --- /dev/null +++ b/drift/instrumentation/urllib/e2e-tests/docker-compose.yml @@ -0,0 +1,19 @@ +services: + app: + build: + context: ../../../.. + dockerfile: drift/instrumentation/urllib/e2e-tests/Dockerfile + args: + - TUSK_CLI_VERSION=${TUSK_CLI_VERSION:-latest} + environment: + - PORT=8000 + - TUSK_ANALYTICS_DISABLED=1 + - PYTHONUNBUFFERED=1 + working_dir: /app + volumes: + # Mount SDK source for hot reload (no rebuild needed for SDK changes) + - ../../../..:/sdk + # Mount app source for development + - ./src:/app/src + # Mount .tusk folder to persist traces + - ./.tusk:/app/.tusk diff --git a/drift/instrumentation/urllib/e2e-tests/entrypoint.py b/drift/instrumentation/urllib/e2e-tests/entrypoint.py new file mode 100644 index 0000000..b59c9af --- /dev/null +++ b/drift/instrumentation/urllib/e2e-tests/entrypoint.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 +""" +E2E Test Entrypoint for Urllib Instrumentation + +This script orchestrates the full e2e test lifecycle: +1. Setup: Install dependencies +2. Record: Start app in RECORD mode, execute requests +3. Test: Run Tusk CLI tests +4. Teardown: Cleanup and return exit code +""" + +import sys +from pathlib import Path + +# Add SDK to path for imports +sys.path.insert(0, "/sdk") + +from drift.instrumentation.e2e_common.base_runner import E2ETestRunnerBase + + +class UrllibE2ETestRunner(E2ETestRunnerBase): + """E2E test runner for Urllib instrumentation.""" + + def __init__(self): + import os + + port = int(os.getenv("PORT", "8000")) + super().__init__(app_port=port) + + +if __name__ == "__main__": + runner = UrllibE2ETestRunner() + exit_code = runner.run() + sys.exit(exit_code) diff --git a/drift/instrumentation/urllib/e2e-tests/requirements.txt b/drift/instrumentation/urllib/e2e-tests/requirements.txt new file mode 100644 index 0000000..584c5ae --- /dev/null +++ b/drift/instrumentation/urllib/e2e-tests/requirements.txt @@ -0,0 +1,2 @@ +-e /sdk +Flask>=3.1.2 diff --git a/drift/instrumentation/urllib/e2e-tests/run.sh b/drift/instrumentation/urllib/e2e-tests/run.sh new file mode 100755 index 0000000..d67bae1 --- /dev/null +++ b/drift/instrumentation/urllib/e2e-tests/run.sh @@ -0,0 +1,64 @@ +#!/bin/bash + +# Exit on error +set -e + +# Accept optional port parameter (default: 8000) +APP_PORT=${1:-8000} +export APP_PORT + +# Generate unique docker compose project name +# Get the instrumentation name (parent directory of e2e-tests) +TEST_NAME="$(basename "$(dirname "$(pwd)")")" +PROJECT_NAME="python-${TEST_NAME}-${APP_PORT}" + +# Colors for output +GREEN='\033[0;32m' +RED='\033[0;31m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +echo -e "${BLUE}========================================${NC}" +echo -e "${BLUE}Running Python E2E Test: ${TEST_NAME}${NC}" +echo -e "${BLUE}Port: ${APP_PORT}${NC}" +echo -e "${BLUE}========================================${NC}" +echo "" + +# Cleanup function +cleanup() { + echo "" + echo -e "${YELLOW}Cleaning up containers...${NC}" + docker compose -p "$PROJECT_NAME" down -v 2>/dev/null || true +} + +# Register cleanup on exit +trap cleanup EXIT + +# Build containers +echo -e "${BLUE}Building containers...${NC}" +docker compose -p "$PROJECT_NAME" build --no-cache + +# Run the test container +echo -e "${BLUE}Starting test...${NC}" +echo "" + +# Run container and capture exit code (always use port 8000 inside container) +# Disable set -e temporarily to capture exit code +set +e +docker compose -p "$PROJECT_NAME" run --rm app +EXIT_CODE=$? +set -e + +echo "" +if [ $EXIT_CODE -eq 0 ]; then + echo -e "${GREEN}========================================${NC}" + echo -e "${GREEN}Test passed!${NC}" + echo -e "${GREEN}========================================${NC}" +else + echo -e "${RED}========================================${NC}" + echo -e "${RED}Test failed with exit code ${EXIT_CODE}${NC}" + echo -e "${RED}========================================${NC}" +fi + +exit $EXIT_CODE diff --git a/drift/instrumentation/urllib/e2e-tests/src/app.py b/drift/instrumentation/urllib/e2e-tests/src/app.py new file mode 100644 index 0000000..81bb048 --- /dev/null +++ b/drift/instrumentation/urllib/e2e-tests/src/app.py @@ -0,0 +1,333 @@ +"""Flask test app for e2e tests - urllib.request instrumentation testing.""" + +import json +from concurrent.futures import ThreadPoolExecutor +from urllib.request import Request, build_opener, urlopen + +from flask import Flask, jsonify +from flask import request as flask_request +from opentelemetry import context as otel_context + +from drift import TuskDrift + +# Initialize SDK +sdk = TuskDrift.initialize( + api_key="tusk-test-key", + log_level="debug", +) + +app = Flask(__name__) + + +def _run_with_context(ctx, fn, *args, **kwargs): + """Helper to run a function with OpenTelemetry context in a thread pool.""" + token = otel_context.attach(ctx) + try: + return fn(*args, **kwargs) + finally: + otel_context.detach(token) + + +# Health check endpoint +@app.route("/health", methods=["GET"]) +def health(): + return jsonify({"status": "healthy"}) + + +# GET request - simple urlopen with string URL +@app.route("/api/get-json", methods=["GET"]) +def get_json(): + """Test basic GET request using urlopen with string URL.""" + try: + with urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# GET request using Request object with custom headers +@app.route("/api/get-with-request-object", methods=["GET"]) +def get_with_request_object(): + """Test GET request using Request object with custom headers.""" + try: + req = Request( + "https://jsonplaceholder.typicode.com/posts/1", + headers={ + "Accept": "application/json", + "User-Agent": "urllib-test/1.0", + "X-Custom-Header": "test-value", + }, + ) + with urlopen(req, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# GET request with query parameters in URL +@app.route("/api/get-with-params", methods=["GET"]) +def get_with_params(): + """Test GET request with query parameters.""" + try: + with urlopen("https://jsonplaceholder.typicode.com/comments?postId=1", timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify({"count": len(data), "first": data[0] if data else None}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# POST request with JSON body +@app.route("/api/post-json", methods=["POST"]) +def post_json(): + """Test POST request with JSON body.""" + try: + post_data = flask_request.get_json() or {} + body = json.dumps( + { + "title": post_data.get("title", "Test Title"), + "body": post_data.get("body", "Test Body"), + "userId": post_data.get("userId", 1), + } + ).encode("utf-8") + + req = Request( + "https://jsonplaceholder.typicode.com/posts", + data=body, + headers={"Content-Type": "application/json"}, + ) + with urlopen(req, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data), 201 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# POST request with form-encoded data +@app.route("/api/post-form", methods=["POST"]) +def post_form(): + """Test POST request with form-encoded data.""" + try: + from urllib.parse import urlencode + + body = urlencode({"field1": "value1", "field2": "value2"}).encode("utf-8") + req = Request( + "https://httpbin.org/post", + data=body, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + with urlopen(req, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# PUT request +@app.route("/api/put-json", methods=["PUT"]) +def put_json(): + """Test PUT request with JSON body.""" + try: + body = json.dumps( + { + "id": 1, + "title": "Updated Title", + "body": "Updated Body", + "userId": 1, + } + ).encode("utf-8") + + req = Request( + "https://jsonplaceholder.typicode.com/posts/1", + data=body, + headers={"Content-Type": "application/json"}, + method="PUT", + ) + with urlopen(req, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# PATCH request +@app.route("/api/patch-json", methods=["PATCH"]) +def patch_json(): + """Test PATCH request with partial JSON body.""" + try: + body = json.dumps({"title": "Patched Title"}).encode("utf-8") + + req = Request( + "https://jsonplaceholder.typicode.com/posts/1", + data=body, + headers={"Content-Type": "application/json"}, + method="PATCH", + ) + with urlopen(req, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# DELETE request +@app.route("/api/delete", methods=["DELETE"]) +def delete_resource(): + """Test DELETE request.""" + try: + req = Request( + "https://jsonplaceholder.typicode.com/posts/1", + method="DELETE", + ) + with urlopen(req, timeout=10) as response: + return jsonify({"status": "deleted", "status_code": response.status}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Sequential chained requests +@app.route("/api/chain", methods=["GET"]) +def chain_requests(): + """Test sequential chained requests.""" + try: + # First request: get a user + with urlopen("https://jsonplaceholder.typicode.com/users/1", timeout=10) as response: + user = json.loads(response.read().decode("utf-8")) + + # Second request: get posts by that user + with urlopen(f"https://jsonplaceholder.typicode.com/posts?userId={user['id']}", timeout=10) as response: + posts = json.loads(response.read().decode("utf-8")) + + # Third request: get comments on the first post + if posts: + with urlopen( + f"https://jsonplaceholder.typicode.com/posts/{posts[0]['id']}/comments", timeout=10 + ) as response: + comments = json.loads(response.read().decode("utf-8")) + else: + comments = [] + + return jsonify( + { + "user": user["name"], + "post_count": len(posts), + "first_post_comments": len(comments), + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Parallel requests with ThreadPoolExecutor +@app.route("/api/parallel", methods=["GET"]) +def parallel_requests(): + """Test parallel requests with context propagation.""" + ctx = otel_context.get_current() + + def fetch_url(url): + with urlopen(url, timeout=10) as response: + return json.loads(response.read().decode("utf-8")) + + with ThreadPoolExecutor(max_workers=3) as executor: + # Run three requests in parallel with context propagation + posts_future = executor.submit( + _run_with_context, + ctx, + fetch_url, + "https://jsonplaceholder.typicode.com/posts/1", + ) + users_future = executor.submit( + _run_with_context, + ctx, + fetch_url, + "https://jsonplaceholder.typicode.com/users/1", + ) + comments_future = executor.submit( + _run_with_context, + ctx, + fetch_url, + "https://jsonplaceholder.typicode.com/comments/1", + ) + + post = posts_future.result() + user = users_future.result() + comment = comments_future.result() + + return jsonify( + { + "post": post, + "user": user, + "comment": comment, + } + ) + + +# Request with explicit timeout +@app.route("/api/with-timeout", methods=["GET"]) +def with_timeout(): + """Test request with explicit timeout.""" + try: + with urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data) + except TimeoutError: + return jsonify({"error": "Request timed out"}), 504 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Custom opener usage via build_opener +@app.route("/api/custom-opener", methods=["GET"]) +def custom_opener(): + """Test custom opener created via build_opener().""" + try: + from urllib.request import HTTPHandler + + opener = build_opener(HTTPHandler()) + with opener.open("https://jsonplaceholder.typicode.com/posts/1", timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Text response handling +@app.route("/api/text-response", methods=["GET"]) +def text_response(): + """Test request that returns text/plain.""" + try: + with urlopen("https://httpbin.org/robots.txt", timeout=10) as response: + content = response.read().decode("utf-8") + headers = dict(response.info().items()) + return jsonify( + { + "content": content, + "content_type": headers.get("Content-Type"), + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Request with urlopen data parameter (implicit POST) +@app.route("/api/urlopen-with-data", methods=["POST"]) +def urlopen_with_data(): + """Test urlopen with data parameter (creates implicit POST).""" + try: + body = json.dumps({"test": "value"}).encode("utf-8") + # Using urlopen with data parameter makes it a POST request + req = Request( + "https://httpbin.org/post", + headers={"Content-Type": "application/json"}, + ) + with urlopen(req, data=body, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +if __name__ == "__main__": + sdk.mark_app_as_ready() + app.run(host="0.0.0.0", port=8000, debug=False) diff --git a/drift/instrumentation/urllib/e2e-tests/src/test_requests.py b/drift/instrumentation/urllib/e2e-tests/src/test_requests.py new file mode 100644 index 0000000..357010d --- /dev/null +++ b/drift/instrumentation/urllib/e2e-tests/src/test_requests.py @@ -0,0 +1,57 @@ +"""Execute test requests against the Flask app to exercise the urllib instrumentation.""" + +from drift.instrumentation.e2e_common.test_utils import make_request, print_request_summary + +if __name__ == "__main__": + print("Starting test request sequence for urllib instrumentation...\n") + + # Health check + make_request("GET", "/health") + + # Basic GET request - urlopen with string URL + make_request("GET", "/api/get-json") + + # GET with Request object and custom headers + make_request("GET", "/api/get-with-request-object") + + # GET with query parameters + make_request("GET", "/api/get-with-params") + + # POST with JSON body + make_request( + "POST", + "/api/post-json", + json={"title": "Test Post", "body": "This is a test post body", "userId": 1}, + ) + + # POST with form data + make_request("POST", "/api/post-form") + + # PUT request + make_request("PUT", "/api/put-json") + + # PATCH request + make_request("PATCH", "/api/patch-json") + + # DELETE request + make_request("DELETE", "/api/delete") + + # Sequential chained requests + make_request("GET", "/api/chain") + + # Parallel requests with context propagation + make_request("GET", "/api/parallel") + + # Request with explicit timeout + make_request("GET", "/api/with-timeout") + + # Custom opener usage + make_request("GET", "/api/custom-opener") + + # Text response handling + make_request("GET", "/api/text-response") + + # urlopen with data parameter + make_request("POST", "/api/urlopen-with-data") + + print_request_summary() diff --git a/drift/instrumentation/urllib/instrumentation.py b/drift/instrumentation/urllib/instrumentation.py new file mode 100644 index 0000000..b97322c --- /dev/null +++ b/drift/instrumentation/urllib/instrumentation.py @@ -0,0 +1,1002 @@ +"""Instrumentation for urllib.request HTTP client library.""" + +from __future__ import annotations + +import base64 +import email +import json +import logging +import socket +from io import BytesIO +from typing import Any +from urllib.parse import parse_qs, urlparse + +# socket._GLOBAL_DEFAULT_TIMEOUT is a sentinel object used by urllib +# The type checker doesn't recognize it, so we access it via getattr +_GLOBAL_DEFAULT_TIMEOUT: object = getattr(socket, "_GLOBAL_DEFAULT_TIMEOUT") # noqa: B009 + +from opentelemetry.trace import Span, Status +from opentelemetry.trace import SpanKind as OTelSpanKind +from opentelemetry.trace import StatusCode as OTelStatusCode + + +class RequestDroppedByTransform(Exception): + """Exception raised when an outbound HTTP request is dropped by a transform rule. + + Attributes: + message: Error message explaining the drop + method: HTTP method (GET, POST, etc.) + url: Request URL that was dropped + """ + + def __init__(self, message: str, method: str, url: str): + self.message = message + self.method = method + self.url = url + super().__init__(message) + + +from ...core.data_normalization import create_mock_input_value, remove_none_values +from ...core.drift_sdk import TuskDrift +from ...core.json_schema_helper import DecodedType, EncodingType, SchemaMerge +from ...core.mode_utils import handle_record_mode, handle_replay_mode +from ...core.tracing import TdSpanAttributes +from ...core.tracing.span_utils import CreateSpanOptions, SpanUtils +from ...core.types import ( + PackageType, + SpanKind, + SpanStatus, + StatusCode, + TuskDriftMode, +) +from ..base import InstrumentationBase +from ..http import HttpSpanData, HttpTransformEngine + +logger = logging.getLogger(__name__) + +# Schema merge hints for headers (low match importance) +HEADER_SCHEMA_MERGES = { + "headers": SchemaMerge(match_importance=0.0), +} + + +class ResponseWrapper: + """Wrapper for urllib response that caches the body for re-reading. + + This wrapper reads and caches the entire response body on first access, + then provides the cached body to all subsequent reads. This allows the + instrumentation to capture the response body while still allowing the + application code to read the response normally. + """ + + def __init__(self, response: Any): + self._response = response + self._body: bytes | None = None + self._fp: BytesIO | None = None + + # Copy attributes from the original response + self.status = getattr(response, "status", getattr(response, "code", 200)) + self.code = self.status + self.reason = getattr(response, "reason", getattr(response, "msg", "OK")) + self.msg = self.reason + self.url = getattr(response, "url", "") + + def _ensure_body_cached(self) -> None: + """Read and cache the body from the original response if not already done.""" + if self._body is None: + self._body = self._response.read() + self._fp = BytesIO(self._body) + + def read(self, amt: int | None = None) -> bytes: + """Read response body from cache.""" + self._ensure_body_cached() + assert self._fp is not None # Guaranteed by _ensure_body_cached + if amt is None: + return self._fp.read() + return self._fp.read(amt) + + def readline(self) -> bytes: + """Read a line from response body.""" + self._ensure_body_cached() + assert self._fp is not None # Guaranteed by _ensure_body_cached + return self._fp.readline() + + def readlines(self) -> list[bytes]: + """Read all lines from response body.""" + self._ensure_body_cached() + assert self._fp is not None # Guaranteed by _ensure_body_cached + return self._fp.readlines() + + def info(self) -> Any: + """Return headers from original response.""" + return self._response.info() + + def geturl(self) -> str: + """Return the URL of the response.""" + return self.url + + def getcode(self) -> int: + """Return the HTTP status code.""" + return self.status + + def getheaders(self) -> list[tuple[str, str]]: + """Return headers as list of tuples.""" + if hasattr(self._response, "getheaders"): + return self._response.getheaders() + info = self.info() + if info: + return list(info.items()) + return [] + + def getheader(self, name: str, default: str | None = None) -> str | None: + """Get a specific header value.""" + if hasattr(self._response, "getheader"): + return self._response.getheader(name, default) + info = self.info() + if info: + return info.get(name, default) + return default + + def fileno(self) -> int: + """Return file descriptor.""" + if hasattr(self._response, "fileno"): + return self._response.fileno() + return -1 + + def get_cached_body(self) -> bytes: + """Get the cached body bytes (for instrumentation use).""" + self._ensure_body_cached() + assert self._body is not None # Guaranteed by _ensure_body_cached + return self._body + + def __enter__(self): + """Context manager entry.""" + return self + + def __exit__(self, *args): + """Context manager exit.""" + self.close() + + def close(self): + """Close the response.""" + if hasattr(self._response, "close"): + self._response.close() + + def __iter__(self): + """Iterate over response lines.""" + self._ensure_body_cached() + return self + + def __next__(self) -> bytes: + """Get next line.""" + self._ensure_body_cached() + assert self._fp is not None # Guaranteed by _ensure_body_cached + line = self._fp.readline() + if not line: + raise StopIteration + return line + + def __getattr__(self, name: str) -> Any: + """Forward any other attributes to the wrapped response.""" + return getattr(self._response, name) + + +class MockHTTPResponse: + """Mock HTTP response compatible with urllib.request expectations. + + This class mimics http.client.HTTPResponse with the modifications + that urllib.request makes (adding .url and .msg attributes). + """ + + def __init__( + self, + status_code: int, + reason: str, + headers: dict[str, str], + body: bytes, + url: str, + ): + self.status = status_code + self.code = status_code # urllib uses .code + self.reason = reason + self.msg = reason # urllib sets .msg = .reason + self.url = url + self._headers = headers + self._body = body + self._fp = BytesIO(body) + + def read(self, amt: int | None = None) -> bytes: + """Read response body.""" + if amt is None: + return self._fp.read() + return self._fp.read(amt) + + def readline(self) -> bytes: + """Read a line from response body.""" + return self._fp.readline() + + def readlines(self) -> list[bytes]: + """Read all lines from response body.""" + return self._fp.readlines() + + def info(self) -> email.message.Message: + """Return headers as email.message.Message (urllib convention).""" + header_str = "\r\n".join(f"{k}: {v}" for k, v in self._headers.items()) + return email.message_from_string(header_str) + + def geturl(self) -> str: + """Return the URL of the response.""" + return self.url + + def getcode(self) -> int: + """Return the HTTP status code.""" + return self.status + + def getheaders(self) -> list[tuple[str, str]]: + """Return headers as list of tuples.""" + return list(self._headers.items()) + + def getheader(self, name: str, default: str | None = None) -> str | None: + """Get a specific header value (case-insensitive).""" + name_lower = name.lower() + for key, value in self._headers.items(): + if key.lower() == name_lower: + return value + return default + + def fileno(self) -> int: + """Return -1 as there's no real file descriptor.""" + return -1 + + def __enter__(self): + """Context manager entry.""" + return self + + def __exit__(self, *args): + """Context manager exit.""" + self.close() + + def close(self): + """Close the response (no-op for mock).""" + pass + + def __iter__(self): + """Iterate over response lines.""" + return self + + def __next__(self) -> bytes: + """Get next line.""" + line = self._fp.readline() + if not line: + raise StopIteration + return line + + +class UrllibInstrumentation(InstrumentationBase): + """Instrumentation for the urllib.request HTTP client library. + + Patches OpenerDirector.open() to: + - Intercept HTTP requests in REPLAY mode and return mocked responses + - Capture request/response data as CLIENT spans in RECORD mode + + We patch OpenerDirector.open() instead of urlopen() because all HTTP calls + flow through OpenerDirector.open(), including custom opener usage via + build_opener(). This ensures complete coverage. + """ + + def __init__(self, enabled: bool = True, transforms: dict[str, Any] | None = None) -> None: + self._transform_engine = HttpTransformEngine(self._resolve_http_transforms(transforms)) + super().__init__( + name="UrllibInstrumentation", + module_name="urllib.request", + supported_versions="*", + enabled=enabled, + ) + + def _resolve_http_transforms( + self, provided: dict[str, Any] | list[dict[str, Any]] | None + ) -> list[dict[str, Any]] | None: + """Resolve HTTP transforms from provided config or SDK config.""" + if isinstance(provided, list): + return provided + if isinstance(provided, dict) and isinstance(provided.get("http"), list): + return provided["http"] + + sdk = TuskDrift.get_instance() + transforms = getattr(sdk.config, "transforms", None) + if isinstance(transforms, dict) and isinstance(transforms.get("http"), list): + return transforms["http"] + return None + + def patch(self, module: Any) -> None: + """Patch the urllib.request module. + + Patches OpenerDirector.open() to intercept all HTTP requests. + All urllib HTTP calls (urlopen(), custom openers, etc.) flow through + OpenerDirector.open(), making it the ideal patching point. + """ + if not hasattr(module, "OpenerDirector"): + logger.warning("urllib.request.OpenerDirector not found, skipping instrumentation") + return + + # Store original method + original_open = module.OpenerDirector.open + instrumentation_self = self + + def patched_open(opener_self, fullurl, data=None, timeout=_GLOBAL_DEFAULT_TIMEOUT): + """Patched OpenerDirector.open method. + + Args: + opener_self: OpenerDirector instance + fullurl: URL string or Request object + data: Optional request body data + timeout: Request timeout + """ + sdk = TuskDrift.get_instance() + + # Pass through if SDK is disabled + if sdk.mode == TuskDriftMode.DISABLED: + return original_open(opener_self, fullurl, data, timeout) + + # Set calling_library_context to suppress socket instrumentation warnings + # context_token = calling_library_context.set("urllib") + try: + # Extract URL for default response handler + if isinstance(fullurl, str): + url = fullurl + else: + url = fullurl.full_url + + def original_call(): + return original_open(opener_self, fullurl, data, timeout) + + # REPLAY mode: Use handle_replay_mode for proper background request handling + if sdk.mode == TuskDriftMode.REPLAY: + return handle_replay_mode( + replay_mode_handler=lambda: instrumentation_self._handle_replay_open( + sdk, fullurl, data, timeout + ), + no_op_request_handler=lambda: instrumentation_self._get_default_response(url), + is_server_request=False, + ) + + # RECORD mode: Use handle_record_mode for proper is_pre_app_start handling + return handle_record_mode( + original_function_call=original_call, + record_mode_handler=lambda is_pre_app_start: instrumentation_self._handle_record_open( + opener_self, fullurl, data, timeout, is_pre_app_start, original_open + ), + span_kind=OTelSpanKind.CLIENT, + ) + finally: + # calling_library_context.reset(context_token) + pass + + # Apply patch + module.OpenerDirector.open = patched_open + logger.info("urllib.request.OpenerDirector.open instrumented") + + def _extract_request_info(self, fullurl: Any, data: bytes | None) -> dict[str, Any]: + """Extract request information from urlopen arguments. + + Args: + fullurl: Either a URL string or a Request object + data: Optional request body + + Returns: + Dict with method, url, headers, body, etc. + """ + from urllib.request import Request + + if isinstance(fullurl, str): + req = Request(fullurl, data) + else: + req = fullurl + if data is not None: + req.data = data + + method = req.get_method() + url = req.full_url + # Combine both header dicts (unredirected_hdrs has precedence in urllib) + headers = {**req.headers, **req.unredirected_hdrs} + body = req.data + + return { + "method": method, + "url": url, + "headers": headers, + "body": body, + } + + def _get_default_response(self, url: str) -> MockHTTPResponse: + """Return default response for background requests in REPLAY mode. + + Background requests (health checks, metrics, etc.) that happen outside + of any trace context should return a default response instead of failing. + """ + logger.debug(f"[UrllibInstrumentation] Returning default response for background request to {url}") + return MockHTTPResponse( + status_code=200, + reason="OK", + headers={}, + body=b"", + url=url, + ) + + def _handle_record_open( + self, + opener_self: Any, + fullurl: Any, + data: bytes | None, + timeout: Any, + is_pre_app_start: bool, + original_open: Any, + ) -> Any: + """Handle OpenerDirector.open() in RECORD mode. + + Args: + opener_self: OpenerDirector instance + fullurl: URL string or Request object + data: Optional request body + timeout: Request timeout + is_pre_app_start: Whether this is before app start + original_open: Original OpenerDirector.open method + """ + request_info = self._extract_request_info(fullurl, data) + method = request_info["method"] + url = request_info["url"] + parsed_url = urlparse(url) + span_name = f"{method.upper()} {parsed_url.path or '/'}" + + # Create span using SpanUtils + span_info = SpanUtils.create_span( + CreateSpanOptions( + name=span_name, + kind=OTelSpanKind.CLIENT, + attributes={ + TdSpanAttributes.NAME: span_name, + TdSpanAttributes.PACKAGE_NAME: parsed_url.scheme, + TdSpanAttributes.INSTRUMENTATION_NAME: "UrllibInstrumentation", + TdSpanAttributes.SUBMODULE_NAME: method.upper(), + TdSpanAttributes.PACKAGE_TYPE: PackageType.HTTP.name, + TdSpanAttributes.IS_PRE_APP_START: is_pre_app_start, + }, + is_pre_app_start=is_pre_app_start, + ) + ) + + if not span_info: + # Span creation failed (trace blocked, etc.) - just make the request + return original_open(opener_self, fullurl, data, timeout) + + try: + with SpanUtils.with_span(span_info): + # Check drop transforms BEFORE making the request + headers = request_info.get("headers", {}) + if self._transform_engine and self._transform_engine.should_drop_outbound_request( + method.upper(), url, headers + ): + # Request should be dropped - mark span and raise exception + span_info.span.set_attribute( + TdSpanAttributes.OUTPUT_VALUE, + json.dumps({"bodyProcessingError": "dropped"}), + ) + span_info.span.set_status(Status(OTelStatusCode.ERROR, "Dropped by transform")) + raise RequestDroppedByTransform( + f"Outbound request to {url} was dropped by transform rule", + method.upper(), + url, + ) + + # Make the real request + error = None + response = None + wrapped_response = None + + try: + response = original_open(opener_self, fullurl, data, timeout) + # Wrap the response to allow body caching for instrumentation + # while still allowing the caller to read it + wrapped_response = ResponseWrapper(response) + return wrapped_response + except Exception as e: + error = e + raise + finally: + # Finalize span with request/response data + # Use wrapped_response if available (it caches the body) + self._finalize_span( + span_info.span, + method, + url, + wrapped_response if wrapped_response else response, + error, + request_info, + ) + finally: + span_info.span.end() + + def _handle_replay_open( + self, + sdk: TuskDrift, + fullurl: Any, + data: bytes | None, + timeout: Any, + ) -> MockHTTPResponse: + """Handle OpenerDirector.open() in REPLAY mode. + + Args: + sdk: TuskDrift instance + fullurl: URL string or Request object + data: Optional request body + timeout: Request timeout + """ + request_info = self._extract_request_info(fullurl, data) + method = request_info["method"] + url = request_info["url"] + parsed_url = urlparse(url) + span_name = f"{method.upper()} {parsed_url.path or '/'}" + + # Create span using SpanUtils + span_info = SpanUtils.create_span( + CreateSpanOptions( + name=span_name, + kind=OTelSpanKind.CLIENT, + attributes={ + TdSpanAttributes.NAME: span_name, + TdSpanAttributes.PACKAGE_NAME: parsed_url.scheme, + TdSpanAttributes.INSTRUMENTATION_NAME: "UrllibInstrumentation", + TdSpanAttributes.SUBMODULE_NAME: method.upper(), + TdSpanAttributes.PACKAGE_TYPE: PackageType.HTTP.name, + TdSpanAttributes.IS_PRE_APP_START: not sdk.app_ready, + }, + is_pre_app_start=not sdk.app_ready, + ) + ) + + if not span_info: + raise RuntimeError(f"Error creating span in replay mode for {method} {url}") + + try: + with SpanUtils.with_span(span_info): + # Use IDs from SpanInfo (already formatted) + mock_response = self._try_get_mock( + sdk, + method, + url, + span_info.trace_id, + span_info.span_id, + request_info, + ) + + if mock_response is not None: + return mock_response + + # No mock found - raise error in REPLAY mode + raise RuntimeError(f"No mock found for {method} {url} in REPLAY mode") + finally: + span_info.span.end() + + def _encode_body_to_base64(self, body_data: Any) -> tuple[str | None, int]: + """Encode body data to base64 string. + + Args: + body_data: Body data (str, bytes, dict, or other) + + Returns: + Tuple of (base64_encoded_string, original_byte_size) + """ + if body_data is None: + return None, 0 + + # Convert to bytes + if isinstance(body_data, bytes): + body_bytes = body_data + elif isinstance(body_data, str): + body_bytes = body_data.encode("utf-8") + elif isinstance(body_data, dict): + # JSON data + body_bytes = json.dumps(body_data).encode("utf-8") + else: + # Fallback: convert to string then encode + body_bytes = str(body_data).encode("utf-8") + + # Encode to base64 + base64_body = base64.b64encode(body_bytes).decode("ascii") + + return base64_body, len(body_bytes) + + def _get_decoded_type_from_content_type(self, content_type: str | None) -> DecodedType | None: + """Determine decoded type from Content-Type header. + + Args: + content_type: Content-Type header value + + Returns: + DecodedType enum value or None + """ + if not content_type: + return None + + # Extract main type (before semicolon) + main_type = content_type.lower().split(";")[0].strip() + + # Common content type mappings + CONTENT_TYPE_MAP = { + "application/json": DecodedType.JSON, + "text/plain": DecodedType.PLAIN_TEXT, + "text/html": DecodedType.HTML, + "application/x-www-form-urlencoded": DecodedType.FORM_DATA, + "multipart/form-data": DecodedType.MULTIPART_FORM, + "application/xml": DecodedType.XML, + "text/xml": DecodedType.XML, + } + + return CONTENT_TYPE_MAP.get(main_type) + + def _get_content_type_header(self, headers: dict) -> str | None: + """Get content-type header (case-insensitive lookup).""" + for key, value in headers.items(): + if key.lower() == "content-type": + return value + return None + + def _try_get_mock( + self, + sdk: TuskDrift, + method: str, + url: str, + trace_id: str, + span_id: str, + request_info: dict[str, Any], + ) -> MockHTTPResponse | None: + """Try to get a mocked response from CLI. + + Returns: + Mocked response object if found, None otherwise + """ + try: + parsed_url = urlparse(url) + + # Extract request data + headers = request_info.get("headers", {}) + body = request_info.get("body") + + # Parse query params from URL + params = {} + if parsed_url.query: + params = {k: v[0] if len(v) == 1 else v for k, v in parse_qs(parsed_url.query).items()} + + # Handle request body - encode to base64 + body_base64 = None + body_size = 0 + + if body is not None: + body_base64, body_size = self._encode_body_to_base64(body) + + raw_input_value = { + "method": method.upper(), + "url": url, + "protocol": parsed_url.scheme, + "hostname": parsed_url.hostname, + "port": parsed_url.port, + "path": parsed_url.path or "/", + "headers": dict(headers), + "query": params, + } + + # Add body fields only if body exists + if body_base64 is not None: + raw_input_value["body"] = body_base64 + raw_input_value["bodySize"] = body_size + + input_value = create_mock_input_value(raw_input_value) + + # Create schema merge hints for input + input_schema_merges = { + "headers": SchemaMerge(match_importance=0.0), + } + if body_base64 is not None: + request_content_type = self._get_content_type_header(headers) + input_schema_merges["body"] = SchemaMerge( + encoding=EncodingType.BASE64, + decoded_type=self._get_decoded_type_from_content_type(request_content_type), + ) + + # Use centralized mock finding utility + from ...core.mock_utils import find_mock_response_sync + + mock_response_output = find_mock_response_sync( + sdk=sdk, + trace_id=trace_id, + span_id=span_id, + name=f"{method.upper()} {parsed_url.path or '/'}", + package_name=parsed_url.scheme, + package_type=PackageType.HTTP, + instrumentation_name="UrllibInstrumentation", + submodule_name=method.upper(), + input_value=input_value, + kind=SpanKind.CLIENT, + input_schema_merges=input_schema_merges, + is_pre_app_start=not sdk.app_ready, + ) + + if not mock_response_output or not mock_response_output.found: + logger.debug(f"No mock found for {method} {url} (trace_id={trace_id})") + return None + + # Create mocked response object + if mock_response_output.response is None: + logger.debug(f"Mock found but response data is None for {method} {url}") + return None + return self._create_mock_response(mock_response_output.response, url) + + except Exception as e: + logger.error(f"Error getting mock for {method} {url}: {e}") + return None + + def _create_mock_response(self, mock_data: dict[str, Any], url: str) -> MockHTTPResponse: + """Create a mocked urllib-compatible response object. + + Args: + mock_data: Mock response data from CLI + url: Request URL + + Returns: + MockHTTPResponse object + """ + status_code = mock_data.get("statusCode", 200) + reason = mock_data.get("statusMessage", "OK") + headers = dict(mock_data.get("headers", {})) + + # Remove content-encoding and transfer-encoding headers since the body + # was already decompressed when recorded + headers_to_remove = [] + for key in headers: + if key.lower() in ("content-encoding", "transfer-encoding"): + headers_to_remove.append(key) + for key in headers_to_remove: + del headers[key] + + # Decode body from base64 if needed + body = mock_data.get("body", "") + if isinstance(body, str): + # Try to decode as base64 first (expected format from CLI) + try: + decoded = base64.b64decode(body.encode("ascii"), validate=True) + # Verify round-trip works (confirms it's valid base64) + if base64.b64encode(decoded).decode("ascii") == body: + body_bytes = decoded + else: + body_bytes = body.encode("utf-8") + except Exception: + body_bytes = body.encode("utf-8") + elif isinstance(body, bytes): + body_bytes = body + else: + body_bytes = json.dumps(body).encode("utf-8") + + logger.debug(f"Created mock response: {status_code} for {url}") + return MockHTTPResponse( + status_code=status_code, + reason=reason, + headers=headers, + body=body_bytes, + url=url, + ) + + def _finalize_span( + self, + span: Span, + method: str, + url: str, + response: Any, + error: Exception | None, + request_info: dict[str, Any], + ) -> None: + """Finalize span with request/response data. + + Args: + span: The OpenTelemetry span to finalize + method: HTTP method + url: Request URL + response: Response object (if successful) + error: Exception (if failed) + request_info: Original request info dict + """ + try: + parsed_url = urlparse(url) + + # ===== BUILD INPUT VALUE ===== + headers = request_info.get("headers", {}) + body = request_info.get("body") + + # Parse query params from URL + params = {} + if parsed_url.query: + params = {k: v[0] if len(v) == 1 else v for k, v in parse_qs(parsed_url.query).items()} + + # Get request body and encode to base64 + body_base64 = None + body_size = 0 + + if body is not None: + body_base64, body_size = self._encode_body_to_base64(body) + + input_value = { + "method": method.upper(), + "url": url, + "protocol": parsed_url.scheme, + "hostname": parsed_url.hostname, + "port": parsed_url.port, + "path": parsed_url.path or "/", + "headers": dict(headers), + "query": params, + } + + # Add body fields only if body exists + if body_base64 is not None: + input_value["body"] = body_base64 + input_value["bodySize"] = body_size + + # ===== BUILD OUTPUT VALUE ===== + output_value = {} + status = SpanStatus(code=StatusCode.OK, message="") + response_body_base64 = None + + if error: + output_value = { + "errorName": type(error).__name__, + "errorMessage": str(error), + } + status = SpanStatus(code=StatusCode.ERROR, message=str(error)) + elif response: + # Extract response data + # urllib responses are file-like, need to read carefully + response_status = getattr(response, "status", getattr(response, "code", 200)) + response_reason = getattr(response, "reason", getattr(response, "msg", "OK")) + + # Get headers - urllib uses info() which returns email.message.Message + response_headers = {} + try: + info = response.info() + if info: + response_headers = dict(info.items()) + except Exception: + pass + + response_body_size = 0 + + try: + # Read response content + # Use get_cached_body() for ResponseWrapper (which caches the body) + # This allows the caller to still read the response after we capture it + if hasattr(response, "get_cached_body"): + # ResponseWrapper - use cached body + response_bytes = response.get_cached_body() + else: + # Raw response - read directly (body will be consumed) + response_bytes = response.read() + + # Encode to base64 + response_body_base64, response_body_size = self._encode_body_to_base64(response_bytes) + except Exception: + response_body_base64 = None + response_body_size = 0 + + output_value = { + "statusCode": response_status, + "statusMessage": response_reason, + "headers": response_headers, + } + + # Add body fields only if body exists + if response_body_base64 is not None: + output_value["body"] = response_body_base64 + output_value["bodySize"] = response_body_size + + if response_status >= 400: + status = SpanStatus( + code=StatusCode.ERROR, + message=f"HTTP {response_status}", + ) + + # Check if response content type should block the trace + from ...core.content_type_utils import get_decoded_type, should_block_content_type + from ...core.trace_blocking_manager import TraceBlockingManager + + response_content_type = response_headers.get("content-type") or response_headers.get("Content-Type") + decoded_type = get_decoded_type(response_content_type) + + if should_block_content_type(decoded_type): + # Block PARENT trace for outbound requests with binary responses + span_context = span.get_span_context() + trace_id = format(span_context.trace_id, "032x") + + blocking_mgr = TraceBlockingManager.get_instance() + blocking_mgr.block_trace( + trace_id, reason=f"outbound_binary:{decoded_type.name if decoded_type else 'unknown'}" + ) + logger.warning( + f"Blocking trace {trace_id} - outbound request returned binary response: {response_content_type} " + f"(decoded as {decoded_type.name if decoded_type else 'unknown'})" + ) + return # Skip finalizing span + else: + # No response and no error + output_value = {} + + # ===== APPLY TRANSFORMS ===== + transform_metadata = None + if self._transform_engine: + span_data = HttpSpanData( + kind=SpanKind.CLIENT, + input_value=input_value, + output_value=output_value, + ) + self._transform_engine.apply_transforms(span_data) + + # Update values with transformed data + input_value = span_data.input_value or input_value + output_value = span_data.output_value or output_value + transform_metadata = span_data.transform_metadata + + # ===== CREATE SCHEMA MERGE HINTS ===== + request_content_type = self._get_content_type_header(headers) + response_content_type = None + if response: + try: + info = response.info() + if info: + response_content_type = info.get("content-type") + except Exception: + pass + + # Create schema merge hints for input + input_schema_merges = { + "headers": SchemaMerge(match_importance=0.0), + } + if body_base64 is not None: + input_schema_merges["body"] = SchemaMerge( + encoding=EncodingType.BASE64, + decoded_type=self._get_decoded_type_from_content_type(request_content_type), + ) + + # Create schema merge hints for output + output_schema_merges = { + "headers": SchemaMerge(match_importance=0.0), + } + if response_body_base64 is not None: + output_schema_merges["body"] = SchemaMerge( + encoding=EncodingType.BASE64, + decoded_type=self._get_decoded_type_from_content_type(response_content_type), + ) + + # ===== SET SPAN ATTRIBUTES ===== + normalized_input = remove_none_values(input_value) + normalized_output = remove_none_values(output_value) + span.set_attribute(TdSpanAttributes.INPUT_VALUE, json.dumps(normalized_input)) + span.set_attribute(TdSpanAttributes.OUTPUT_VALUE, json.dumps(normalized_output)) + + # Set schema merges + from ..wsgi.utilities import _schema_merges_to_dict + + input_schema_merges_dict = _schema_merges_to_dict(input_schema_merges) + output_schema_merges_dict = _schema_merges_to_dict(output_schema_merges) + + span.set_attribute(TdSpanAttributes.INPUT_SCHEMA_MERGES, json.dumps(input_schema_merges_dict)) + span.set_attribute(TdSpanAttributes.OUTPUT_SCHEMA_MERGES, json.dumps(output_schema_merges_dict)) + + # Set transform metadata if present + if transform_metadata: + span.set_attribute(TdSpanAttributes.TRANSFORM_METADATA, json.dumps(transform_metadata)) + + # Set status + if status.code == StatusCode.ERROR: + span.set_status(Status(OTelStatusCode.ERROR, status.message)) + else: + span.set_status(Status(OTelStatusCode.OK)) + + except Exception as e: + logger.error(f"Error finalizing span for {method} {url}: {e}") + span.set_status(Status(OTelStatusCode.ERROR, str(e))) diff --git a/drift/instrumentation/wsgi/handler.py b/drift/instrumentation/wsgi/handler.py index 0a705dd..fdc95c3 100644 --- a/drift/instrumentation/wsgi/handler.py +++ b/drift/instrumentation/wsgi/handler.py @@ -413,8 +413,7 @@ def finalize_wsgi_span( span.set_attribute(TdSpanAttributes.TRANSFORM_METADATA, json.dumps(transform_metadata)) # Set status based on HTTP status code - # Match Node SDK: >= 300 is considered an error (redirects, client errors, server errors) - if status_code >= 300: + if status_code >= 400: span.set_status(Status(OTelStatusCode.ERROR, f"HTTP {status_code}")) else: span.set_status(Status(OTelStatusCode.OK)) diff --git a/pyproject.toml b/pyproject.toml index e9f3963..54e34cd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -128,6 +128,7 @@ include = [ "drift/instrumentation/django/**", "drift/instrumentation/psycopg/**", "drift/instrumentation/psycopg2/**", + "drift/instrumentation/pyjwt/**", "drift/instrumentation/redis/**", "drift/instrumentation/kinde/**", "drift/instrumentation/http/transform_engine.py", From 63b20faf33a8c4880a96878c2e228fca01d9a75e Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Sat, 17 Jan 2026 15:57:06 -0800 Subject: [PATCH 3/8] Fix HTTP 404 error handling in urllib instrumentation REPLAY mode When urllib raises HTTPError for 4xx/5xx responses, the instrumentation now correctly replays this by raising HTTPError instead of returning a normal MockHTTPResponse. This ensures application code catching HTTPError behaves the same in RECORD and REPLAY modes. - Add _raise_http_error_from_mock method to reconstruct HTTPError - Check for errorName in _try_get_mock and raise appropriately - Re-raise HTTPError/URLError exceptions to propagate correctly --- .../urllib/e2e-tests/src/app.py | 47 ++++++++++++++ .../urllib/e2e-tests/src/test_requests.py | 7 +++ .../instrumentation/urllib/instrumentation.py | 61 ++++++++++++++++++- 3 files changed, 114 insertions(+), 1 deletion(-) diff --git a/drift/instrumentation/urllib/e2e-tests/src/app.py b/drift/instrumentation/urllib/e2e-tests/src/app.py index 81bb048..5bb5ed5 100644 --- a/drift/instrumentation/urllib/e2e-tests/src/app.py +++ b/drift/instrumentation/urllib/e2e-tests/src/app.py @@ -328,6 +328,53 @@ def urlopen_with_data(): return jsonify({"error": str(e)}), 500 +# Test: HTTP 404 error handling (BUG-EXPOSING TEST) +@app.route("/test/http-404-error", methods=["GET"]) +def test_http_404_error(): + """Test handling of HTTP 404 error responses. + + When urlopen receives a 4xx/5xx response, it raises HTTPError which is + also a valid response object. This tests if the instrumentation correctly + handles this case. + """ + from urllib.error import HTTPError as UrllibHTTPError + try: + with urlopen("https://httpbin.org/status/404", timeout=10) as response: + return jsonify({"status": response.status}) + except UrllibHTTPError as e: + # HTTPError is also a response object - we can read its body + body = e.read().decode("utf-8") if e.fp else "" + return jsonify({ + "error": True, + "code": e.code, + "reason": e.reason, + "body": body, + }), 200 # Return 200 since we handled the error + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: HTTP redirect handling (BUG-EXPOSING TEST) +@app.route("/test/http-redirect", methods=["GET"]) +def test_http_redirect(): + """Test HTTP redirect handling (301, 302, etc.). + + urllib follows redirects automatically. This tests if the instrumentation + correctly handles redirect scenarios. + """ + try: + # httpbin.org/redirect/n redirects n times before returning 200 + with urlopen("https://httpbin.org/redirect/2", timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify({ + "final_url": response.geturl(), + "status": response.status, + "data": data, + }) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + if __name__ == "__main__": sdk.mark_app_as_ready() app.run(host="0.0.0.0", port=8000, debug=False) diff --git a/drift/instrumentation/urllib/e2e-tests/src/test_requests.py b/drift/instrumentation/urllib/e2e-tests/src/test_requests.py index 357010d..43bc7c9 100644 --- a/drift/instrumentation/urllib/e2e-tests/src/test_requests.py +++ b/drift/instrumentation/urllib/e2e-tests/src/test_requests.py @@ -54,4 +54,11 @@ # urlopen with data parameter make_request("POST", "/api/urlopen-with-data") + # Bug-exposing tests (these tests expose bugs in the instrumentation) + # HTTP 404 error handling - tests HTTPError replay + make_request("GET", "/test/http-404-error") + + # HTTP redirect handling - tests geturl() after redirects + make_request("GET", "/test/http-redirect") + print_request_summary() diff --git a/drift/instrumentation/urllib/instrumentation.py b/drift/instrumentation/urllib/instrumentation.py index b97322c..0835a4b 100644 --- a/drift/instrumentation/urllib/instrumentation.py +++ b/drift/instrumentation/urllib/instrumentation.py @@ -654,6 +654,9 @@ def _try_get_mock( Returns: Mocked response object if found, None otherwise + + Raises: + urllib.error.HTTPError: If the recorded response was an HTTPError """ try: parsed_url = urlparse(url) @@ -729,9 +732,22 @@ def _try_get_mock( if mock_response_output.response is None: logger.debug(f"Mock found but response data is None for {method} {url}") return None - return self._create_mock_response(mock_response_output.response, url) + + # Check if the recorded response was an error (HTTPError, URLError, etc.) + response_data = mock_response_output.response + error_name = response_data.get("errorName") + + if error_name == "HTTPError": + # The original request raised HTTPError - we need to raise it too + self._raise_http_error_from_mock(response_data, url) + + return self._create_mock_response(response_data, url) except Exception as e: + # Re-raise HTTPError (and other urllib errors) so they propagate correctly + from urllib.error import HTTPError, URLError + if isinstance(e, (HTTPError, URLError)): + raise logger.error(f"Error getting mock for {method} {url}: {e}") return None @@ -785,6 +801,49 @@ def _create_mock_response(self, mock_data: dict[str, Any], url: str) -> MockHTTP url=url, ) + def _raise_http_error_from_mock(self, mock_data: dict[str, Any], url: str) -> None: + """Raise an HTTPError from mocked error response data. + + When the original request resulted in an HTTPError (4xx/5xx status codes), + we need to raise the same error during replay so application code that + catches HTTPError behaves the same way. + + Args: + mock_data: Mock response data containing errorName and errorMessage + url: Request URL + + Raises: + urllib.error.HTTPError: Always raises this exception + """ + from email.message import Message + from urllib.error import HTTPError + + error_message = mock_data.get("errorMessage", "") + + # Parse status code and reason from error message + # Format: "HTTP Error 404: NOT FOUND" + status_code = 500 # Default + reason = "Internal Server Error" + + if error_message.startswith("HTTP Error "): + try: + # Extract "404: NOT FOUND" part + parts = error_message[len("HTTP Error "):].split(":", 1) + status_code = int(parts[0].strip()) + if len(parts) > 1: + reason = parts[1].strip() + except (ValueError, IndexError): + pass + + # Create empty headers (HTTPError requires headers object) + headers = Message() + + # Create empty body + body_fp = BytesIO(b"") + + logger.debug(f"Raising HTTPError {status_code} for {url} (replayed from recording)") + raise HTTPError(url, status_code, reason, headers, body_fp) + def _finalize_span( self, span: Span, From 570059a87266363fb93fd8f961cd80e968ad863c Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Sat, 17 Jan 2026 16:01:42 -0800 Subject: [PATCH 4/8] Fix HTTP redirect handling in urllib instrumentation REPLAY mode When urllib follows redirects, response.geturl() should return the final URL after all redirects. The instrumentation now: - Captures finalUrl in RECORD mode when it differs from request URL - Uses finalUrl in REPLAY mode so MockHTTPResponse.geturl() returns the correct final URL instead of the original request URL This ensures application code that checks the final URL after redirects behaves the same in RECORD and REPLAY modes. --- .../instrumentation/urllib/instrumentation.py | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/drift/instrumentation/urllib/instrumentation.py b/drift/instrumentation/urllib/instrumentation.py index 0835a4b..7afde1e 100644 --- a/drift/instrumentation/urllib/instrumentation.py +++ b/drift/instrumentation/urllib/instrumentation.py @@ -756,7 +756,7 @@ def _create_mock_response(self, mock_data: dict[str, Any], url: str) -> MockHTTP Args: mock_data: Mock response data from CLI - url: Request URL + url: Request URL (original request URL) Returns: MockHTTPResponse object @@ -765,6 +765,10 @@ def _create_mock_response(self, mock_data: dict[str, Any], url: str) -> MockHTTP reason = mock_data.get("statusMessage", "OK") headers = dict(mock_data.get("headers", {})) + # Use finalUrl from recorded response if present (indicates redirect occurred) + # Otherwise fall back to the original request URL + response_url = mock_data.get("finalUrl", url) + # Remove content-encoding and transfer-encoding headers since the body # was already decompressed when recorded headers_to_remove = [] @@ -792,13 +796,13 @@ def _create_mock_response(self, mock_data: dict[str, Any], url: str) -> MockHTTP else: body_bytes = json.dumps(body).encode("utf-8") - logger.debug(f"Created mock response: {status_code} for {url}") + logger.debug(f"Created mock response: {status_code} for {response_url}") return MockHTTPResponse( status_code=status_code, reason=reason, headers=headers, body=body_bytes, - url=url, + url=response_url, ) def _raise_http_error_from_mock(self, mock_data: dict[str, Any], url: str) -> None: @@ -943,12 +947,23 @@ def _finalize_span( response_body_base64 = None response_body_size = 0 + # Capture final URL (important for redirect scenarios) + # urllib sets response.url to the final URL after all redirects + final_url = getattr(response, "url", None) + if callable(final_url): + # Some response objects have geturl() method + final_url = final_url() + output_value = { "statusCode": response_status, "statusMessage": response_reason, "headers": response_headers, } + # Store final URL if different from request URL (indicates redirect) + if final_url and final_url != url: + output_value["finalUrl"] = final_url + # Add body fields only if body exists if response_body_base64 is not None: output_value["body"] = response_body_base64 From 9368a5789780a6fbdde3d32b7f67c56a805b7381 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Sat, 17 Jan 2026 16:06:51 -0800 Subject: [PATCH 5/8] Refactor urllib instrumentation to reduce code duplication Extract shared logic into helper methods: - _build_input_value(): constructs HTTP request input value dict - _build_input_schema_merges(): creates schema merge hints for headers/body Both RECORD (_finalize_span) and REPLAY (_try_get_mock) modes now use these helpers, eliminating ~60 lines of duplicate code and ensuring consistent behavior between modes. --- .../instrumentation/urllib/instrumentation.py | 167 +++++++++--------- 1 file changed, 88 insertions(+), 79 deletions(-) diff --git a/drift/instrumentation/urllib/instrumentation.py b/drift/instrumentation/urllib/instrumentation.py index 7afde1e..6a6f7f1 100644 --- a/drift/instrumentation/urllib/instrumentation.py +++ b/drift/instrumentation/urllib/instrumentation.py @@ -641,6 +641,83 @@ def _get_content_type_header(self, headers: dict) -> str | None: return value return None + def _build_input_value( + self, + method: str, + url: str, + headers: dict[str, str], + body: bytes | None, + ) -> tuple[dict[str, Any], str | None, int]: + """Build the input value dictionary for HTTP requests. + + Args: + method: HTTP method (GET, POST, etc.) + url: Full request URL + headers: Request headers dictionary + body: Request body bytes (or None) + + Returns: + Tuple of (input_value dict, body_base64 string or None, body_size int) + """ + parsed_url = urlparse(url) + + # Parse query params from URL + params = {} + if parsed_url.query: + params = {k: v[0] if len(v) == 1 else v for k, v in parse_qs(parsed_url.query).items()} + + # Encode body to base64 + body_base64 = None + body_size = 0 + + if body is not None: + body_base64, body_size = self._encode_body_to_base64(body) + + input_value = { + "method": method.upper(), + "url": url, + "protocol": parsed_url.scheme, + "hostname": parsed_url.hostname, + "port": parsed_url.port, + "path": parsed_url.path or "/", + "headers": dict(headers), + "query": params, + } + + # Add body fields only if body exists + if body_base64 is not None: + input_value["body"] = body_base64 + input_value["bodySize"] = body_size + + return input_value, body_base64, body_size + + def _build_input_schema_merges( + self, + headers: dict[str, str], + body_base64: str | None, + ) -> dict[str, SchemaMerge]: + """Build schema merge hints for input value. + + Args: + headers: Request headers dictionary + body_base64: Base64-encoded body string (or None if no body) + + Returns: + Dictionary of schema merge hints + """ + input_schema_merges: dict[str, SchemaMerge] = { + "headers": SchemaMerge(match_importance=0.0), + } + + if body_base64 is not None: + request_content_type = self._get_content_type_header(headers) + input_schema_merges["body"] = SchemaMerge( + encoding=EncodingType.BASE64, + decoded_type=self._get_decoded_type_from_content_type(request_content_type), + ) + + return input_schema_merges + def _try_get_mock( self, sdk: TuskDrift, @@ -665,46 +742,12 @@ def _try_get_mock( headers = request_info.get("headers", {}) body = request_info.get("body") - # Parse query params from URL - params = {} - if parsed_url.query: - params = {k: v[0] if len(v) == 1 else v for k, v in parse_qs(parsed_url.query).items()} - - # Handle request body - encode to base64 - body_base64 = None - body_size = 0 - - if body is not None: - body_base64, body_size = self._encode_body_to_base64(body) - - raw_input_value = { - "method": method.upper(), - "url": url, - "protocol": parsed_url.scheme, - "hostname": parsed_url.hostname, - "port": parsed_url.port, - "path": parsed_url.path or "/", - "headers": dict(headers), - "query": params, - } - - # Add body fields only if body exists - if body_base64 is not None: - raw_input_value["body"] = body_base64 - raw_input_value["bodySize"] = body_size - + # Build input value using shared helper + raw_input_value, body_base64, _ = self._build_input_value(method, url, headers, body) input_value = create_mock_input_value(raw_input_value) - # Create schema merge hints for input - input_schema_merges = { - "headers": SchemaMerge(match_importance=0.0), - } - if body_base64 is not None: - request_content_type = self._get_content_type_header(headers) - input_schema_merges["body"] = SchemaMerge( - encoding=EncodingType.BASE64, - decoded_type=self._get_decoded_type_from_content_type(request_content_type), - ) + # Build schema merge hints using shared helper + input_schema_merges = self._build_input_schema_merges(headers, body_base64) # Use centralized mock finding utility from ...core.mock_utils import find_mock_response_sync @@ -868,39 +911,12 @@ def _finalize_span( request_info: Original request info dict """ try: - parsed_url = urlparse(url) - # ===== BUILD INPUT VALUE ===== headers = request_info.get("headers", {}) body = request_info.get("body") - # Parse query params from URL - params = {} - if parsed_url.query: - params = {k: v[0] if len(v) == 1 else v for k, v in parse_qs(parsed_url.query).items()} - - # Get request body and encode to base64 - body_base64 = None - body_size = 0 - - if body is not None: - body_base64, body_size = self._encode_body_to_base64(body) - - input_value = { - "method": method.upper(), - "url": url, - "protocol": parsed_url.scheme, - "hostname": parsed_url.hostname, - "port": parsed_url.port, - "path": parsed_url.path or "/", - "headers": dict(headers), - "query": params, - } - - # Add body fields only if body exists - if body_base64 is not None: - input_value["body"] = body_base64 - input_value["bodySize"] = body_size + # Build input value using shared helper + input_value, body_base64, _ = self._build_input_value(method, url, headers, body) # ===== BUILD OUTPUT VALUE ===== output_value = {} @@ -1016,7 +1032,10 @@ def _finalize_span( transform_metadata = span_data.transform_metadata # ===== CREATE SCHEMA MERGE HINTS ===== - request_content_type = self._get_content_type_header(headers) + # Build input schema merges using shared helper + input_schema_merges = self._build_input_schema_merges(headers, body_base64) + + # Get response content type for output schema merges response_content_type = None if response: try: @@ -1026,18 +1045,8 @@ def _finalize_span( except Exception: pass - # Create schema merge hints for input - input_schema_merges = { - "headers": SchemaMerge(match_importance=0.0), - } - if body_base64 is not None: - input_schema_merges["body"] = SchemaMerge( - encoding=EncodingType.BASE64, - decoded_type=self._get_decoded_type_from_content_type(request_content_type), - ) - # Create schema merge hints for output - output_schema_merges = { + output_schema_merges: dict[str, SchemaMerge] = { "headers": SchemaMerge(match_importance=0.0), } if response_body_base64 is not None: From 83ca088f8ae0d2c36837e51ec1c5613a11a403bf Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Sat, 17 Jan 2026 16:21:16 -0800 Subject: [PATCH 6/8] more tests --- .../urllib/e2e-tests/src/app.py | 475 +++++++++++++++++- .../urllib/e2e-tests/src/test_requests.py | 52 ++ .../instrumentation/urllib/instrumentation.py | 3 +- 3 files changed, 519 insertions(+), 11 deletions(-) diff --git a/drift/instrumentation/urllib/e2e-tests/src/app.py b/drift/instrumentation/urllib/e2e-tests/src/app.py index 5bb5ed5..08d9a05 100644 --- a/drift/instrumentation/urllib/e2e-tests/src/app.py +++ b/drift/instrumentation/urllib/e2e-tests/src/app.py @@ -338,18 +338,21 @@ def test_http_404_error(): handles this case. """ from urllib.error import HTTPError as UrllibHTTPError + try: with urlopen("https://httpbin.org/status/404", timeout=10) as response: return jsonify({"status": response.status}) except UrllibHTTPError as e: # HTTPError is also a response object - we can read its body body = e.read().decode("utf-8") if e.fp else "" - return jsonify({ - "error": True, - "code": e.code, - "reason": e.reason, - "body": body, - }), 200 # Return 200 since we handled the error + return jsonify( + { + "error": True, + "code": e.code, + "reason": e.reason, + "body": body, + } + ), 200 # Return 200 since we handled the error except Exception as e: return jsonify({"error": str(e)}), 500 @@ -366,11 +369,463 @@ def test_http_redirect(): # httpbin.org/redirect/n redirects n times before returning 200 with urlopen("https://httpbin.org/redirect/2", timeout=10) as response: data = json.loads(response.read().decode("utf-8")) - return jsonify({ - "final_url": response.geturl(), - "status": response.status, + return jsonify( + { + "final_url": response.geturl(), + "status": response.status, + "data": data, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: Partial read with read(amt) parameter +@app.route("/test/partial-read", methods=["GET"]) +def test_partial_read(): + """Test reading response body in chunks using read(amt). + + Tests if the ResponseWrapper correctly handles partial reads and caches + the full body for instrumentation while still allowing incremental reading. + """ + try: + with urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10) as response: + # Read in small chunks + chunks = [] + while True: + chunk = response.read(50) + if not chunk: + break + chunks.append(chunk) + full_body = b"".join(chunks) + data = json.loads(full_body.decode("utf-8")) + return jsonify( + { + "chunk_count": len(chunks), + "total_bytes": len(full_body), + "data": data, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: Response iteration using for loop +@app.route("/test/response-iteration", methods=["GET"]) +def test_response_iteration(): + """Test iterating over response lines using for loop. + + Tests if the ResponseWrapper correctly implements __iter__ and __next__ + for line-by-line iteration. + """ + try: + with urlopen("https://httpbin.org/robots.txt", timeout=10) as response: + lines = [] + for line in response: + lines.append(line.decode("utf-8").strip()) + return jsonify( + { + "line_count": len(lines), + "lines": lines, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: readline() method +@app.route("/test/readline", methods=["GET"]) +def test_readline(): + """Test reading response line by line using readline(). + + Tests if the ResponseWrapper correctly implements readline(). + """ + try: + with urlopen("https://httpbin.org/robots.txt", timeout=10) as response: + lines = [] + while True: + line = response.readline() + if not line: + break + lines.append(line.decode("utf-8").strip()) + return jsonify( + { + "line_count": len(lines), + "lines": lines, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: readlines() method +@app.route("/test/readlines", methods=["GET"]) +def test_readlines(): + """Test reading all response lines at once using readlines(). + + Tests if the ResponseWrapper correctly implements readlines(). + """ + try: + with urlopen("https://httpbin.org/robots.txt", timeout=10) as response: + lines = response.readlines() + decoded_lines = [line.decode("utf-8").strip() for line in lines] + return jsonify( + { + "line_count": len(decoded_lines), + "lines": decoded_lines, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: Multiple reads from the same response +@app.route("/test/multiple-reads", methods=["GET"]) +def test_multiple_reads(): + """Test reading from response multiple times. + + The ResponseWrapper should cache the body and allow multiple reads. + This tests if the first read() returns data and subsequent reads work correctly. + """ + try: + with urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10) as response: + # First read - should get all data + first_read = response.read() + # Second read - should return empty (standard file-like behavior after EOF) + second_read = response.read() + data = json.loads(first_read.decode("utf-8")) + return jsonify( + { + "first_read_bytes": len(first_read), + "second_read_bytes": len(second_read), + "data": data, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: getheaders() method +@app.route("/test/getheaders", methods=["GET"]) +def test_getheaders(): + """Test getting response headers using getheaders(). + + Tests if the ResponseWrapper correctly implements getheaders(). + """ + try: + with urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + headers = response.getheaders() + # Only check that we got headers and specific expected ones exist + # Don't include dynamic headers in the response + has_content_type = any(h[0].lower() == "content-type" for h in headers) + return jsonify( + { + "data": data, + "has_headers": len(headers) > 0, + "has_content_type": has_content_type, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: getheader() method +@app.route("/test/getheader", methods=["GET"]) +def test_getheader(): + """Test getting specific header using getheader(). + + Tests if the ResponseWrapper correctly implements getheader(). + """ + try: + with urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + content_type = response.getheader("Content-Type") + missing_header = response.getheader("X-Missing-Header", "default-value") + return jsonify( + { + "data": data, + "content_type": content_type, + "missing_header": missing_header, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: getcode() method +@app.route("/test/getcode", methods=["GET"]) +def test_getcode(): + """Test getting status code using getcode(). + + Tests if the ResponseWrapper correctly implements getcode(). + """ + try: + with urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + code = response.getcode() + status = response.status + return jsonify( + { + "data": data, + "getcode": code, + "status": status, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: urlretrieve function +@app.route("/test/urlretrieve", methods=["GET"]) +def test_urlretrieve(): + """Test the urlretrieve function. + + urlretrieve downloads a URL to a temporary file. Since it uses urlopen + internally, it should be instrumented. This tests if the trace is captured. + """ + import os + import tempfile + from urllib.request import urlretrieve + + try: + # Create a temp file + with tempfile.NamedTemporaryFile(delete=False) as tmp: + tmp_path = tmp.name + + # Download to temp file + filepath, headers = urlretrieve( + "https://jsonplaceholder.typicode.com/posts/1", + tmp_path, + ) + + # Read and parse the downloaded content + with open(filepath) as f: + content = f.read() + data = json.loads(content) + + # Cleanup + os.unlink(tmp_path) + + # Don't include filepath in response since it's non-deterministic + return jsonify( + { + "downloaded": True, + "data": data, + } + ) + except Exception as e: + # Cleanup on error + if "tmp_path" in locals() and os.path.exists(tmp_path): + os.unlink(tmp_path) + return jsonify({"error": str(e)}), 500 + + +# Test: Response without context manager (direct assignment) +@app.route("/test/no-context-manager", methods=["GET"]) +def test_no_context_manager(): + """Test using urlopen without context manager. + + Some code may not use the 'with' statement. Tests if the response + can be used directly without context manager issues. + """ + response = None + try: + response = urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10) + data = json.loads(response.read().decode("utf-8")) + response.close() + return jsonify( + { "data": data, - }) + "closed_manually": True, + } + ) + except Exception as e: + if response: + response.close() + return jsonify({"error": str(e)}), 500 + + +# Test: SSL context parameter +@app.route("/test/ssl-context", methods=["GET"]) +def test_ssl_context(): + """Test urlopen with SSL context parameter. + + Tests if the instrumentation correctly handles requests made with + explicit SSL context configuration. + """ + import ssl + + try: + context = ssl.create_default_context() + # Relax verification for testing purposes + context.check_hostname = True + context.verify_mode = ssl.CERT_REQUIRED + + with urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10, context=context) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify( + { + "data": data, + "ssl_version": context.protocol, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: Empty response body (204 No Content) +@app.route("/test/empty-response", methods=["GET"]) +def test_empty_response(): + """Test handling of empty response bodies. + + Tests if the instrumentation correctly handles 204 No Content + or other empty response scenarios. + """ + try: + # httpbin.org/status/204 returns no content + with urlopen("https://httpbin.org/status/204", timeout=10) as response: + body = response.read() + return jsonify( + { + "status": response.status, + "body_length": len(body), + "is_empty": len(body) == 0, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: HEAD request (no body in response) +@app.route("/test/head-request", methods=["GET"]) +def test_head_request(): + """Test HEAD request which returns no body. + + HEAD requests should not have a response body but should have headers. + Tests if the instrumentation handles this correctly. + """ + try: + req = Request( + "https://jsonplaceholder.typicode.com/posts/1", + method="HEAD", + ) + with urlopen(req, timeout=10) as response: + body = response.read() + return jsonify( + { + "status": response.status, + "body_length": len(body), + "has_content_type": response.getheader("Content-Type") is not None, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: OPTIONS request +@app.route("/test/options-request", methods=["GET"]) +def test_options_request(): + """Test OPTIONS request method. + + OPTIONS requests are used for CORS preflight checks. + Tests if the instrumentation handles this HTTP method. + """ + try: + req = Request( + "https://httpbin.org/get", + method="OPTIONS", + ) + with urlopen(req, timeout=10) as response: + body = response.read() + allow_header = response.getheader("Allow") + return jsonify( + { + "status": response.status, + "body_length": len(body), + "allow_header": allow_header, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: Binary request body (bytes) +@app.route("/test/binary-request-body", methods=["POST"]) +def test_binary_request_body(): + """Test POST request with binary (non-JSON) request body. + + Tests if the instrumentation correctly captures and replays + binary request bodies. + """ + try: + # Send raw bytes + binary_data = b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09" + req = Request( + "https://httpbin.org/post", + data=binary_data, + headers={"Content-Type": "application/octet-stream"}, + ) + with urlopen(req, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + # httpbin returns the base64-encoded data + return jsonify( + { + "status": response.status, + "data_received": data.get("data", ""), + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: HTTP 500 Internal Server Error +@app.route("/test/http-500-error", methods=["GET"]) +def test_http_500_error(): + """Test handling of HTTP 500 Internal Server Error. + + Tests if the instrumentation correctly handles and replays + server error responses. + """ + from urllib.error import HTTPError as UrllibHTTPError + + try: + with urlopen("https://httpbin.org/status/500", timeout=10) as response: + return jsonify({"status": response.status}) + except UrllibHTTPError as e: + body = e.read().decode("utf-8") if e.fp else "" + return jsonify( + { + "error": True, + "code": e.code, + "reason": e.reason, + } + ), 200 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: Large query string +@app.route("/test/large-query-string", methods=["GET"]) +def test_large_query_string(): + """Test request with a large query string. + + Tests if the instrumentation correctly handles URLs with + many query parameters. + """ + try: + # Build a URL with many query parameters + params = "&".join([f"param{i}=value{i}" for i in range(20)]) + url = f"https://httpbin.org/get?{params}" + + with urlopen(url, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify( + { + "status": response.status, + "args_count": len(data.get("args", {})), + } + ) except Exception as e: return jsonify({"error": str(e)}), 500 diff --git a/drift/instrumentation/urllib/e2e-tests/src/test_requests.py b/drift/instrumentation/urllib/e2e-tests/src/test_requests.py index 43bc7c9..b36e8e1 100644 --- a/drift/instrumentation/urllib/e2e-tests/src/test_requests.py +++ b/drift/instrumentation/urllib/e2e-tests/src/test_requests.py @@ -61,4 +61,56 @@ # HTTP redirect handling - tests geturl() after redirects make_request("GET", "/test/http-redirect") + # Additional edge case tests + # Partial read with read(amt) + make_request("GET", "/test/partial-read") + + # Response iteration using for loop + make_request("GET", "/test/response-iteration") + + # readline() method + make_request("GET", "/test/readline") + + # readlines() method + make_request("GET", "/test/readlines") + + # Multiple reads from same response + make_request("GET", "/test/multiple-reads") + + # getheaders() method + make_request("GET", "/test/getheaders") + + # getheader() method + make_request("GET", "/test/getheader") + + # getcode() method + make_request("GET", "/test/getcode") + + # urlretrieve function + make_request("GET", "/test/urlretrieve") + + # Response without context manager + make_request("GET", "/test/no-context-manager") + + # SSL context parameter + make_request("GET", "/test/ssl-context") + + # Empty response body (204 No Content) + make_request("GET", "/test/empty-response") + + # HEAD request + make_request("GET", "/test/head-request") + + # OPTIONS request + make_request("GET", "/test/options-request") + + # Binary request body + make_request("POST", "/test/binary-request-body") + + # HTTP 500 error + make_request("GET", "/test/http-500-error") + + # Large query string + make_request("GET", "/test/large-query-string") + print_request_summary() diff --git a/drift/instrumentation/urllib/instrumentation.py b/drift/instrumentation/urllib/instrumentation.py index 6a6f7f1..66bbccf 100644 --- a/drift/instrumentation/urllib/instrumentation.py +++ b/drift/instrumentation/urllib/instrumentation.py @@ -789,6 +789,7 @@ def _try_get_mock( except Exception as e: # Re-raise HTTPError (and other urllib errors) so they propagate correctly from urllib.error import HTTPError, URLError + if isinstance(e, (HTTPError, URLError)): raise logger.error(f"Error getting mock for {method} {url}: {e}") @@ -875,7 +876,7 @@ def _raise_http_error_from_mock(self, mock_data: dict[str, Any], url: str) -> No if error_message.startswith("HTTP Error "): try: # Extract "404: NOT FOUND" part - parts = error_message[len("HTTP Error "):].split(":", 1) + parts = error_message[len("HTTP Error ") :].split(":", 1) status_code = int(parts[0].strip()) if len(parts) > 1: reason = parts[1].strip() From 54c004594fe521cb4650cc7b1cd35c481563caaa Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Sun, 18 Jan 2026 20:45:16 -0800 Subject: [PATCH 7/8] fix failing urllib3 test --- drift/core/content_type_utils.py | 2 -- drift/instrumentation/urllib3/instrumentation.py | 4 ++++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/drift/core/content_type_utils.py b/drift/core/content_type_utils.py index 3ca99ca..0238013 100644 --- a/drift/core/content_type_utils.py +++ b/drift/core/content_type_utils.py @@ -111,8 +111,6 @@ "application/binary": DecodedType.BINARY, } -# Only JSON and plain text are acceptable (matches Node SDK) -# All other content types will cause trace blocking ACCEPTABLE_DECODED_TYPES = {DecodedType.JSON, DecodedType.PLAIN_TEXT, DecodedType.HTML} diff --git a/drift/instrumentation/urllib3/instrumentation.py b/drift/instrumentation/urllib3/instrumentation.py index 8ec7fe4..927711e 100644 --- a/drift/instrumentation/urllib3/instrumentation.py +++ b/drift/instrumentation/urllib3/instrumentation.py @@ -813,11 +813,14 @@ def _create_mock_response(self, urllib3_module: Any, mock_data: dict[str, Any], else: content = json.dumps(body).encode("utf-8") + final_url = mock_data.get("finalUrl") or url + response = urllib3_module.HTTPResponse( body=BytesIO(content), headers=headers, status=status_code, preload_content=True, + request_url=final_url, ) # Read the content to make it available via response.data @@ -923,6 +926,7 @@ def _finalize_span( "statusCode": status_code, "statusMessage": response.reason if hasattr(response, "reason") else "", "headers": response_headers, + "finalUrl": response.geturl() if hasattr(response, "geturl") else None, } # Add body fields only if body exists From 39b4bef699b1a722d354f9b756c160f6fe779aa1 Mon Sep 17 00:00:00 2001 From: Sohan Kshirsagar Date: Mon, 19 Jan 2026 10:39:49 -0800 Subject: [PATCH 8/8] update readme --- .cursor/BUGBOT.md | 5 +++++ README.md | 2 ++ 2 files changed, 7 insertions(+) create mode 100644 .cursor/BUGBOT.md diff --git a/.cursor/BUGBOT.md b/.cursor/BUGBOT.md new file mode 100644 index 0000000..151a00b --- /dev/null +++ b/.cursor/BUGBOT.md @@ -0,0 +1,5 @@ +# BUGBOT Notes + +## Instrumentation Guidelines + +- When adding a new instrumentation, the README must be updated to document the new instrumentation. diff --git a/README.md b/README.md index 1913ad0..fe87f6f 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,8 @@ Tusk Drift currently supports the following packages and versions: | psycopg2 | all versions | | Redis | `>=4.0.0` | | Kinde | `>=2.0.1` | +| PyJWT | all versions | +| urllib.request | all versions | If you're using packages or versions not listed above, please create an issue with the package + version you'd like an instrumentation for.