diff --git a/.cursor/BUGBOT.md b/.cursor/BUGBOT.md new file mode 100644 index 0000000..151a00b --- /dev/null +++ b/.cursor/BUGBOT.md @@ -0,0 +1,5 @@ +# BUGBOT Notes + +## Instrumentation Guidelines + +- When adding a new instrumentation, the README must be updated to document the new instrumentation. diff --git a/README.md b/README.md index 1913ad0..fe87f6f 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,8 @@ Tusk Drift currently supports the following packages and versions: | psycopg2 | all versions | | Redis | `>=4.0.0` | | Kinde | `>=2.0.1` | +| PyJWT | all versions | +| urllib.request | all versions | If you're using packages or versions not listed above, please create an issue with the package + version you'd like an instrumentation for. diff --git a/drift/core/content_type_utils.py b/drift/core/content_type_utils.py index a4e0aea..0238013 100644 --- a/drift/core/content_type_utils.py +++ b/drift/core/content_type_utils.py @@ -17,7 +17,7 @@ "application/vnd.api+json": DecodedType.JSON, # Plain Text (ALLOWED) "text/plain": DecodedType.PLAIN_TEXT, - # HTML (BLOCKED) + # HTML "text/html": DecodedType.HTML, "application/xhtml+xml": DecodedType.HTML, # CSS (BLOCKED) @@ -111,9 +111,7 @@ "application/binary": DecodedType.BINARY, } -# Only JSON and plain text are acceptable (matches Node SDK) -# All other content types will cause trace blocking -ACCEPTABLE_DECODED_TYPES = {DecodedType.JSON, DecodedType.PLAIN_TEXT} +ACCEPTABLE_DECODED_TYPES = {DecodedType.JSON, DecodedType.PLAIN_TEXT, DecodedType.HTML} def get_decoded_type(content_type: str | None) -> DecodedType | None: diff --git a/drift/core/drift_sdk.py b/drift/core/drift_sdk.py index 968d1bc..5eca8fe 100644 --- a/drift/core/drift_sdk.py +++ b/drift/core/drift_sdk.py @@ -406,6 +406,16 @@ def _init_auto_instrumentations(self) -> None: except ImportError: pass + try: + import urllib.request + + from ..instrumentation.urllib import UrllibInstrumentation + + _ = UrllibInstrumentation() + logger.debug("urllib instrumentation initialized") + except ImportError: + pass + # Initialize PostgreSQL instrumentation before Django # Instrument BOTH psycopg2 and psycopg if available # This allows apps to use either or both @@ -481,6 +491,15 @@ def _init_auto_instrumentations(self) -> None: except Exception as e: logger.debug(f"Socket instrumentation initialization failed: {e}") + # PyJWT instrumentation for JWT verification bypass + try: + from ..instrumentation.pyjwt import PyJWTInstrumentation + + _ = PyJWTInstrumentation(mode=self.mode) + logger.debug("PyJWT instrumentation registered (REPLAY mode)") + except Exception as e: + logger.debug(f"PyJWT instrumentation registration failed: {e}") + def create_env_vars_snapshot(self) -> None: """Create a span capturing all environment variables. diff --git a/drift/core/mock_utils.py b/drift/core/mock_utils.py index 8616dcc..7bc481e 100644 --- a/drift/core/mock_utils.py +++ b/drift/core/mock_utils.py @@ -183,7 +183,9 @@ def find_mock_response_sync( mock_response = sdk.request_mock_sync(mock_request) if not mock_response or not mock_response.found: - logger.debug(f"No matching mock found for {trace_id} with input value: {input_value}") + logger.debug( + f"No matching mock found for {trace_id} with input value: {input_value}, input schema: {input_schema_merges}, input schema hash: {outbound_span.input_schema_hash}, input value hash: {outbound_span.input_value_hash}" + ) return None logger.debug(f"Found mock response for {trace_id}") diff --git a/drift/core/trace_blocking_manager.py b/drift/core/trace_blocking_manager.py index 6c46eba..7c7e019 100644 --- a/drift/core/trace_blocking_manager.py +++ b/drift/core/trace_blocking_manager.py @@ -204,7 +204,7 @@ def should_block_span(span: CleanSpanData) -> bool: """Check if a span should be blocked due to size or server error status. Blocks the trace if: - 1. The span is a SERVER span with ERROR status (e.g., HTTP >= 300) + 1. The span is a SERVER span with ERROR status (e.g., HTTP >= 400) 2. The span exceeds the maximum size limit (1MB) This matches Node SDK behavior in TdSpanExporter.ts. @@ -221,7 +221,7 @@ def should_block_span(span: CleanSpanData) -> bool: span_name = span.name blocking_manager = TraceBlockingManager.get_instance() - # Check 1: Block SERVER spans with ERROR status (e.g., HTTP >= 300) + # Check 1: Block SERVER spans with ERROR status (e.g., HTTP >= 400) if span.kind == SpanKind.SERVER and span.status.code == StatusCode.ERROR: logger.debug(f"Blocking trace {trace_id} - server span '{span_name}' has error status") blocking_manager.block_trace(trace_id, reason="server_error") diff --git a/drift/instrumentation/django/middleware.py b/drift/instrumentation/django/middleware.py index cb5f486..23b1568 100644 --- a/drift/instrumentation/django/middleware.py +++ b/drift/instrumentation/django/middleware.py @@ -390,8 +390,8 @@ def dict_to_schema_merges(merges_dict): duration_seconds = duration_ns // 1_000_000_000 duration_nanos = duration_ns % 1_000_000_000 - # Match Node SDK: >= 300 is considered an error (redirects, client errors, server errors) - if status_code >= 300: + # Match Node SDK: >= 400 is considered an error + if status_code >= 400: status = SpanStatus(code=StatusCode.ERROR, message=f"HTTP {status_code}") else: status = SpanStatus(code=StatusCode.OK, message="") diff --git a/drift/instrumentation/fastapi/instrumentation.py b/drift/instrumentation/fastapi/instrumentation.py index 7a37073..6fcb700 100644 --- a/drift/instrumentation/fastapi/instrumentation.py +++ b/drift/instrumentation/fastapi/instrumentation.py @@ -530,8 +530,8 @@ def _finalize_span( TuskDrift.get_instance() status_code = response_data.get("status_code", 200) - # Match Node SDK: >= 300 is considered an error (redirects, client errors, server errors) - if status_code >= 300: + # Match Node SDK: >= 400 is considered an error + if status_code >= 400: span_info.span.set_status(Status(OTelStatusCode.ERROR, f"HTTP {status_code}")) else: span_info.span.set_status(Status(OTelStatusCode.OK)) diff --git a/drift/instrumentation/pyjwt/__init__.py b/drift/instrumentation/pyjwt/__init__.py new file mode 100644 index 0000000..d99bb0e --- /dev/null +++ b/drift/instrumentation/pyjwt/__init__.py @@ -0,0 +1,5 @@ +"""PyJWT instrumentation for REPLAY mode.""" + +from .instrumentation import PyJWTInstrumentation + +__all__ = ["PyJWTInstrumentation"] diff --git a/drift/instrumentation/pyjwt/instrumentation.py b/drift/instrumentation/pyjwt/instrumentation.py new file mode 100644 index 0000000..2fb2d1d --- /dev/null +++ b/drift/instrumentation/pyjwt/instrumentation.py @@ -0,0 +1,98 @@ +"""PyJWT instrumentation for REPLAY mode. + +Patches PyJWT to disable all verification during test replay: +1. _merge_options - returns all verification options as False +2. _verify_signature - no-op (defense in depth) +3. _validate_claims - no-op (defense in depth) + +Only active in REPLAY mode. +""" + +from __future__ import annotations + +import logging +from types import ModuleType + +from ...core.types import TuskDriftMode +from ..base import InstrumentationBase + +logger = logging.getLogger(__name__) + + +class PyJWTInstrumentation(InstrumentationBase): + """Patches PyJWT to disable verification in REPLAY mode.""" + + def __init__(self, mode: TuskDriftMode = TuskDriftMode.DISABLED, enabled: bool = True) -> None: + self.mode = mode + should_enable = enabled and mode == TuskDriftMode.REPLAY + + super().__init__( + name="PyJWTInstrumentation", + module_name="jwt", + supported_versions="*", + enabled=should_enable, + ) + + def patch(self, module: ModuleType) -> None: + if self.mode != TuskDriftMode.REPLAY: + return + + self._patch_merge_options() + self._patch_signature_verification() + self._patch_claim_validation() + logger.debug("[PyJWTInstrumentation] All patches applied") + + def _patch_signature_verification(self) -> None: + """No-op signature verification.""" + try: + from jwt import api_jws + + def patched_verify_signature(self, *args, **kwargs): + logger.debug("[PyJWTInstrumentation] _verify_signature called - skipping verification") + return None + + api_jws.PyJWS._verify_signature = patched_verify_signature + logger.debug("[PyJWTInstrumentation] Patched PyJWS._verify_signature") + except Exception as e: + logger.warning(f"[PyJWTInstrumentation] Failed to patch _verify_signature: {e}") + + def _patch_claim_validation(self) -> None: + """No-op claim validation.""" + try: + from jwt import api_jwt + + def patched_validate_claims(self, *args, **kwargs): + logger.debug("[PyJWTInstrumentation] _validate_claims called - skipping validation") + return None + + api_jwt.PyJWT._validate_claims = patched_validate_claims + logger.debug("[PyJWTInstrumentation] Patched PyJWT._validate_claims") + except Exception as e: + logger.warning(f"[PyJWTInstrumentation] Failed to patch _validate_claims: {e}") + + def _patch_merge_options(self) -> None: + """Patch _merge_options to always return disabled verification options.""" + try: + from jwt import api_jwt + + disabled_options = { + "verify_signature": False, + "verify_exp": False, + "verify_nbf": False, + "verify_iat": False, + "verify_aud": False, + "verify_iss": False, + "verify_sub": False, + "verify_jti": False, + "require": [], + "strict_aud": False, + } + + def patched_merge_options(self, options=None): + logger.debug("[PyJWTInstrumentation] _merge_options called - returning disabled options") + return disabled_options + + api_jwt.PyJWT._merge_options = patched_merge_options + logger.debug("[PyJWTInstrumentation] Patched PyJWT._merge_options") + except Exception as e: + logger.warning(f"[PyJWTInstrumentation] Failed to patch _merge_options: {e}") diff --git a/drift/instrumentation/urllib/__init__.py b/drift/instrumentation/urllib/__init__.py new file mode 100644 index 0000000..9d02da8 --- /dev/null +++ b/drift/instrumentation/urllib/__init__.py @@ -0,0 +1,5 @@ +"""urllib.request instrumentation module.""" + +from .instrumentation import UrllibInstrumentation + +__all__ = ["UrllibInstrumentation"] diff --git a/drift/instrumentation/urllib/e2e-tests/.tusk/config.yaml b/drift/instrumentation/urllib/e2e-tests/.tusk/config.yaml new file mode 100644 index 0000000..41294f6 --- /dev/null +++ b/drift/instrumentation/urllib/e2e-tests/.tusk/config.yaml @@ -0,0 +1,27 @@ +version: 1 + +service: + id: "urllib-e2e-test-id" + name: "urllib-e2e-test" + port: 8000 + start: + command: "python src/app.py" + readiness_check: + command: "curl -f http://localhost:8000/health" + timeout: 45s + interval: 5s + +tusk_api: + url: "http://localhost:8000" + +test_execution: + concurrent_limit: 10 + batch_size: 10 + timeout: 30s + +recording: + sampling_rate: 1.0 + export_spans: false + +replay: + enable_telemetry: false diff --git a/drift/instrumentation/urllib/e2e-tests/Dockerfile b/drift/instrumentation/urllib/e2e-tests/Dockerfile new file mode 100644 index 0000000..f817d5d --- /dev/null +++ b/drift/instrumentation/urllib/e2e-tests/Dockerfile @@ -0,0 +1,21 @@ +FROM python-e2e-base:latest + +# Copy SDK source for editable install +COPY . /sdk + +# Copy test files +COPY drift/instrumentation/urllib/e2e-tests /app + +WORKDIR /app + +# Install dependencies (requirements.txt uses -e /sdk for SDK) +RUN pip install -q -r requirements.txt + +# Make entrypoint executable +RUN chmod +x entrypoint.py + +# Create .tusk directories +RUN mkdir -p /app/.tusk/traces /app/.tusk/logs + +# Run entrypoint +ENTRYPOINT ["python", "entrypoint.py"] diff --git a/drift/instrumentation/urllib/e2e-tests/docker-compose.yml b/drift/instrumentation/urllib/e2e-tests/docker-compose.yml new file mode 100644 index 0000000..c27ba21 --- /dev/null +++ b/drift/instrumentation/urllib/e2e-tests/docker-compose.yml @@ -0,0 +1,19 @@ +services: + app: + build: + context: ../../../.. + dockerfile: drift/instrumentation/urllib/e2e-tests/Dockerfile + args: + - TUSK_CLI_VERSION=${TUSK_CLI_VERSION:-latest} + environment: + - PORT=8000 + - TUSK_ANALYTICS_DISABLED=1 + - PYTHONUNBUFFERED=1 + working_dir: /app + volumes: + # Mount SDK source for hot reload (no rebuild needed for SDK changes) + - ../../../..:/sdk + # Mount app source for development + - ./src:/app/src + # Mount .tusk folder to persist traces + - ./.tusk:/app/.tusk diff --git a/drift/instrumentation/urllib/e2e-tests/entrypoint.py b/drift/instrumentation/urllib/e2e-tests/entrypoint.py new file mode 100644 index 0000000..b59c9af --- /dev/null +++ b/drift/instrumentation/urllib/e2e-tests/entrypoint.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 +""" +E2E Test Entrypoint for Urllib Instrumentation + +This script orchestrates the full e2e test lifecycle: +1. Setup: Install dependencies +2. Record: Start app in RECORD mode, execute requests +3. Test: Run Tusk CLI tests +4. Teardown: Cleanup and return exit code +""" + +import sys +from pathlib import Path + +# Add SDK to path for imports +sys.path.insert(0, "/sdk") + +from drift.instrumentation.e2e_common.base_runner import E2ETestRunnerBase + + +class UrllibE2ETestRunner(E2ETestRunnerBase): + """E2E test runner for Urllib instrumentation.""" + + def __init__(self): + import os + + port = int(os.getenv("PORT", "8000")) + super().__init__(app_port=port) + + +if __name__ == "__main__": + runner = UrllibE2ETestRunner() + exit_code = runner.run() + sys.exit(exit_code) diff --git a/drift/instrumentation/urllib/e2e-tests/requirements.txt b/drift/instrumentation/urllib/e2e-tests/requirements.txt new file mode 100644 index 0000000..584c5ae --- /dev/null +++ b/drift/instrumentation/urllib/e2e-tests/requirements.txt @@ -0,0 +1,2 @@ +-e /sdk +Flask>=3.1.2 diff --git a/drift/instrumentation/urllib/e2e-tests/run.sh b/drift/instrumentation/urllib/e2e-tests/run.sh new file mode 100755 index 0000000..d67bae1 --- /dev/null +++ b/drift/instrumentation/urllib/e2e-tests/run.sh @@ -0,0 +1,64 @@ +#!/bin/bash + +# Exit on error +set -e + +# Accept optional port parameter (default: 8000) +APP_PORT=${1:-8000} +export APP_PORT + +# Generate unique docker compose project name +# Get the instrumentation name (parent directory of e2e-tests) +TEST_NAME="$(basename "$(dirname "$(pwd)")")" +PROJECT_NAME="python-${TEST_NAME}-${APP_PORT}" + +# Colors for output +GREEN='\033[0;32m' +RED='\033[0;31m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +NC='\033[0m' + +echo -e "${BLUE}========================================${NC}" +echo -e "${BLUE}Running Python E2E Test: ${TEST_NAME}${NC}" +echo -e "${BLUE}Port: ${APP_PORT}${NC}" +echo -e "${BLUE}========================================${NC}" +echo "" + +# Cleanup function +cleanup() { + echo "" + echo -e "${YELLOW}Cleaning up containers...${NC}" + docker compose -p "$PROJECT_NAME" down -v 2>/dev/null || true +} + +# Register cleanup on exit +trap cleanup EXIT + +# Build containers +echo -e "${BLUE}Building containers...${NC}" +docker compose -p "$PROJECT_NAME" build --no-cache + +# Run the test container +echo -e "${BLUE}Starting test...${NC}" +echo "" + +# Run container and capture exit code (always use port 8000 inside container) +# Disable set -e temporarily to capture exit code +set +e +docker compose -p "$PROJECT_NAME" run --rm app +EXIT_CODE=$? +set -e + +echo "" +if [ $EXIT_CODE -eq 0 ]; then + echo -e "${GREEN}========================================${NC}" + echo -e "${GREEN}Test passed!${NC}" + echo -e "${GREEN}========================================${NC}" +else + echo -e "${RED}========================================${NC}" + echo -e "${RED}Test failed with exit code ${EXIT_CODE}${NC}" + echo -e "${RED}========================================${NC}" +fi + +exit $EXIT_CODE diff --git a/drift/instrumentation/urllib/e2e-tests/src/app.py b/drift/instrumentation/urllib/e2e-tests/src/app.py new file mode 100644 index 0000000..08d9a05 --- /dev/null +++ b/drift/instrumentation/urllib/e2e-tests/src/app.py @@ -0,0 +1,835 @@ +"""Flask test app for e2e tests - urllib.request instrumentation testing.""" + +import json +from concurrent.futures import ThreadPoolExecutor +from urllib.request import Request, build_opener, urlopen + +from flask import Flask, jsonify +from flask import request as flask_request +from opentelemetry import context as otel_context + +from drift import TuskDrift + +# Initialize SDK +sdk = TuskDrift.initialize( + api_key="tusk-test-key", + log_level="debug", +) + +app = Flask(__name__) + + +def _run_with_context(ctx, fn, *args, **kwargs): + """Helper to run a function with OpenTelemetry context in a thread pool.""" + token = otel_context.attach(ctx) + try: + return fn(*args, **kwargs) + finally: + otel_context.detach(token) + + +# Health check endpoint +@app.route("/health", methods=["GET"]) +def health(): + return jsonify({"status": "healthy"}) + + +# GET request - simple urlopen with string URL +@app.route("/api/get-json", methods=["GET"]) +def get_json(): + """Test basic GET request using urlopen with string URL.""" + try: + with urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# GET request using Request object with custom headers +@app.route("/api/get-with-request-object", methods=["GET"]) +def get_with_request_object(): + """Test GET request using Request object with custom headers.""" + try: + req = Request( + "https://jsonplaceholder.typicode.com/posts/1", + headers={ + "Accept": "application/json", + "User-Agent": "urllib-test/1.0", + "X-Custom-Header": "test-value", + }, + ) + with urlopen(req, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# GET request with query parameters in URL +@app.route("/api/get-with-params", methods=["GET"]) +def get_with_params(): + """Test GET request with query parameters.""" + try: + with urlopen("https://jsonplaceholder.typicode.com/comments?postId=1", timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify({"count": len(data), "first": data[0] if data else None}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# POST request with JSON body +@app.route("/api/post-json", methods=["POST"]) +def post_json(): + """Test POST request with JSON body.""" + try: + post_data = flask_request.get_json() or {} + body = json.dumps( + { + "title": post_data.get("title", "Test Title"), + "body": post_data.get("body", "Test Body"), + "userId": post_data.get("userId", 1), + } + ).encode("utf-8") + + req = Request( + "https://jsonplaceholder.typicode.com/posts", + data=body, + headers={"Content-Type": "application/json"}, + ) + with urlopen(req, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data), 201 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# POST request with form-encoded data +@app.route("/api/post-form", methods=["POST"]) +def post_form(): + """Test POST request with form-encoded data.""" + try: + from urllib.parse import urlencode + + body = urlencode({"field1": "value1", "field2": "value2"}).encode("utf-8") + req = Request( + "https://httpbin.org/post", + data=body, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + with urlopen(req, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# PUT request +@app.route("/api/put-json", methods=["PUT"]) +def put_json(): + """Test PUT request with JSON body.""" + try: + body = json.dumps( + { + "id": 1, + "title": "Updated Title", + "body": "Updated Body", + "userId": 1, + } + ).encode("utf-8") + + req = Request( + "https://jsonplaceholder.typicode.com/posts/1", + data=body, + headers={"Content-Type": "application/json"}, + method="PUT", + ) + with urlopen(req, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# PATCH request +@app.route("/api/patch-json", methods=["PATCH"]) +def patch_json(): + """Test PATCH request with partial JSON body.""" + try: + body = json.dumps({"title": "Patched Title"}).encode("utf-8") + + req = Request( + "https://jsonplaceholder.typicode.com/posts/1", + data=body, + headers={"Content-Type": "application/json"}, + method="PATCH", + ) + with urlopen(req, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# DELETE request +@app.route("/api/delete", methods=["DELETE"]) +def delete_resource(): + """Test DELETE request.""" + try: + req = Request( + "https://jsonplaceholder.typicode.com/posts/1", + method="DELETE", + ) + with urlopen(req, timeout=10) as response: + return jsonify({"status": "deleted", "status_code": response.status}) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Sequential chained requests +@app.route("/api/chain", methods=["GET"]) +def chain_requests(): + """Test sequential chained requests.""" + try: + # First request: get a user + with urlopen("https://jsonplaceholder.typicode.com/users/1", timeout=10) as response: + user = json.loads(response.read().decode("utf-8")) + + # Second request: get posts by that user + with urlopen(f"https://jsonplaceholder.typicode.com/posts?userId={user['id']}", timeout=10) as response: + posts = json.loads(response.read().decode("utf-8")) + + # Third request: get comments on the first post + if posts: + with urlopen( + f"https://jsonplaceholder.typicode.com/posts/{posts[0]['id']}/comments", timeout=10 + ) as response: + comments = json.loads(response.read().decode("utf-8")) + else: + comments = [] + + return jsonify( + { + "user": user["name"], + "post_count": len(posts), + "first_post_comments": len(comments), + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Parallel requests with ThreadPoolExecutor +@app.route("/api/parallel", methods=["GET"]) +def parallel_requests(): + """Test parallel requests with context propagation.""" + ctx = otel_context.get_current() + + def fetch_url(url): + with urlopen(url, timeout=10) as response: + return json.loads(response.read().decode("utf-8")) + + with ThreadPoolExecutor(max_workers=3) as executor: + # Run three requests in parallel with context propagation + posts_future = executor.submit( + _run_with_context, + ctx, + fetch_url, + "https://jsonplaceholder.typicode.com/posts/1", + ) + users_future = executor.submit( + _run_with_context, + ctx, + fetch_url, + "https://jsonplaceholder.typicode.com/users/1", + ) + comments_future = executor.submit( + _run_with_context, + ctx, + fetch_url, + "https://jsonplaceholder.typicode.com/comments/1", + ) + + post = posts_future.result() + user = users_future.result() + comment = comments_future.result() + + return jsonify( + { + "post": post, + "user": user, + "comment": comment, + } + ) + + +# Request with explicit timeout +@app.route("/api/with-timeout", methods=["GET"]) +def with_timeout(): + """Test request with explicit timeout.""" + try: + with urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data) + except TimeoutError: + return jsonify({"error": "Request timed out"}), 504 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Custom opener usage via build_opener +@app.route("/api/custom-opener", methods=["GET"]) +def custom_opener(): + """Test custom opener created via build_opener().""" + try: + from urllib.request import HTTPHandler + + opener = build_opener(HTTPHandler()) + with opener.open("https://jsonplaceholder.typicode.com/posts/1", timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Text response handling +@app.route("/api/text-response", methods=["GET"]) +def text_response(): + """Test request that returns text/plain.""" + try: + with urlopen("https://httpbin.org/robots.txt", timeout=10) as response: + content = response.read().decode("utf-8") + headers = dict(response.info().items()) + return jsonify( + { + "content": content, + "content_type": headers.get("Content-Type"), + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Request with urlopen data parameter (implicit POST) +@app.route("/api/urlopen-with-data", methods=["POST"]) +def urlopen_with_data(): + """Test urlopen with data parameter (creates implicit POST).""" + try: + body = json.dumps({"test": "value"}).encode("utf-8") + # Using urlopen with data parameter makes it a POST request + req = Request( + "https://httpbin.org/post", + headers={"Content-Type": "application/json"}, + ) + with urlopen(req, data=body, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify(data) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: HTTP 404 error handling (BUG-EXPOSING TEST) +@app.route("/test/http-404-error", methods=["GET"]) +def test_http_404_error(): + """Test handling of HTTP 404 error responses. + + When urlopen receives a 4xx/5xx response, it raises HTTPError which is + also a valid response object. This tests if the instrumentation correctly + handles this case. + """ + from urllib.error import HTTPError as UrllibHTTPError + + try: + with urlopen("https://httpbin.org/status/404", timeout=10) as response: + return jsonify({"status": response.status}) + except UrllibHTTPError as e: + # HTTPError is also a response object - we can read its body + body = e.read().decode("utf-8") if e.fp else "" + return jsonify( + { + "error": True, + "code": e.code, + "reason": e.reason, + "body": body, + } + ), 200 # Return 200 since we handled the error + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: HTTP redirect handling (BUG-EXPOSING TEST) +@app.route("/test/http-redirect", methods=["GET"]) +def test_http_redirect(): + """Test HTTP redirect handling (301, 302, etc.). + + urllib follows redirects automatically. This tests if the instrumentation + correctly handles redirect scenarios. + """ + try: + # httpbin.org/redirect/n redirects n times before returning 200 + with urlopen("https://httpbin.org/redirect/2", timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify( + { + "final_url": response.geturl(), + "status": response.status, + "data": data, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: Partial read with read(amt) parameter +@app.route("/test/partial-read", methods=["GET"]) +def test_partial_read(): + """Test reading response body in chunks using read(amt). + + Tests if the ResponseWrapper correctly handles partial reads and caches + the full body for instrumentation while still allowing incremental reading. + """ + try: + with urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10) as response: + # Read in small chunks + chunks = [] + while True: + chunk = response.read(50) + if not chunk: + break + chunks.append(chunk) + full_body = b"".join(chunks) + data = json.loads(full_body.decode("utf-8")) + return jsonify( + { + "chunk_count": len(chunks), + "total_bytes": len(full_body), + "data": data, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: Response iteration using for loop +@app.route("/test/response-iteration", methods=["GET"]) +def test_response_iteration(): + """Test iterating over response lines using for loop. + + Tests if the ResponseWrapper correctly implements __iter__ and __next__ + for line-by-line iteration. + """ + try: + with urlopen("https://httpbin.org/robots.txt", timeout=10) as response: + lines = [] + for line in response: + lines.append(line.decode("utf-8").strip()) + return jsonify( + { + "line_count": len(lines), + "lines": lines, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: readline() method +@app.route("/test/readline", methods=["GET"]) +def test_readline(): + """Test reading response line by line using readline(). + + Tests if the ResponseWrapper correctly implements readline(). + """ + try: + with urlopen("https://httpbin.org/robots.txt", timeout=10) as response: + lines = [] + while True: + line = response.readline() + if not line: + break + lines.append(line.decode("utf-8").strip()) + return jsonify( + { + "line_count": len(lines), + "lines": lines, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: readlines() method +@app.route("/test/readlines", methods=["GET"]) +def test_readlines(): + """Test reading all response lines at once using readlines(). + + Tests if the ResponseWrapper correctly implements readlines(). + """ + try: + with urlopen("https://httpbin.org/robots.txt", timeout=10) as response: + lines = response.readlines() + decoded_lines = [line.decode("utf-8").strip() for line in lines] + return jsonify( + { + "line_count": len(decoded_lines), + "lines": decoded_lines, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: Multiple reads from the same response +@app.route("/test/multiple-reads", methods=["GET"]) +def test_multiple_reads(): + """Test reading from response multiple times. + + The ResponseWrapper should cache the body and allow multiple reads. + This tests if the first read() returns data and subsequent reads work correctly. + """ + try: + with urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10) as response: + # First read - should get all data + first_read = response.read() + # Second read - should return empty (standard file-like behavior after EOF) + second_read = response.read() + data = json.loads(first_read.decode("utf-8")) + return jsonify( + { + "first_read_bytes": len(first_read), + "second_read_bytes": len(second_read), + "data": data, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: getheaders() method +@app.route("/test/getheaders", methods=["GET"]) +def test_getheaders(): + """Test getting response headers using getheaders(). + + Tests if the ResponseWrapper correctly implements getheaders(). + """ + try: + with urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + headers = response.getheaders() + # Only check that we got headers and specific expected ones exist + # Don't include dynamic headers in the response + has_content_type = any(h[0].lower() == "content-type" for h in headers) + return jsonify( + { + "data": data, + "has_headers": len(headers) > 0, + "has_content_type": has_content_type, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: getheader() method +@app.route("/test/getheader", methods=["GET"]) +def test_getheader(): + """Test getting specific header using getheader(). + + Tests if the ResponseWrapper correctly implements getheader(). + """ + try: + with urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + content_type = response.getheader("Content-Type") + missing_header = response.getheader("X-Missing-Header", "default-value") + return jsonify( + { + "data": data, + "content_type": content_type, + "missing_header": missing_header, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: getcode() method +@app.route("/test/getcode", methods=["GET"]) +def test_getcode(): + """Test getting status code using getcode(). + + Tests if the ResponseWrapper correctly implements getcode(). + """ + try: + with urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + code = response.getcode() + status = response.status + return jsonify( + { + "data": data, + "getcode": code, + "status": status, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: urlretrieve function +@app.route("/test/urlretrieve", methods=["GET"]) +def test_urlretrieve(): + """Test the urlretrieve function. + + urlretrieve downloads a URL to a temporary file. Since it uses urlopen + internally, it should be instrumented. This tests if the trace is captured. + """ + import os + import tempfile + from urllib.request import urlretrieve + + try: + # Create a temp file + with tempfile.NamedTemporaryFile(delete=False) as tmp: + tmp_path = tmp.name + + # Download to temp file + filepath, headers = urlretrieve( + "https://jsonplaceholder.typicode.com/posts/1", + tmp_path, + ) + + # Read and parse the downloaded content + with open(filepath) as f: + content = f.read() + data = json.loads(content) + + # Cleanup + os.unlink(tmp_path) + + # Don't include filepath in response since it's non-deterministic + return jsonify( + { + "downloaded": True, + "data": data, + } + ) + except Exception as e: + # Cleanup on error + if "tmp_path" in locals() and os.path.exists(tmp_path): + os.unlink(tmp_path) + return jsonify({"error": str(e)}), 500 + + +# Test: Response without context manager (direct assignment) +@app.route("/test/no-context-manager", methods=["GET"]) +def test_no_context_manager(): + """Test using urlopen without context manager. + + Some code may not use the 'with' statement. Tests if the response + can be used directly without context manager issues. + """ + response = None + try: + response = urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10) + data = json.loads(response.read().decode("utf-8")) + response.close() + return jsonify( + { + "data": data, + "closed_manually": True, + } + ) + except Exception as e: + if response: + response.close() + return jsonify({"error": str(e)}), 500 + + +# Test: SSL context parameter +@app.route("/test/ssl-context", methods=["GET"]) +def test_ssl_context(): + """Test urlopen with SSL context parameter. + + Tests if the instrumentation correctly handles requests made with + explicit SSL context configuration. + """ + import ssl + + try: + context = ssl.create_default_context() + # Relax verification for testing purposes + context.check_hostname = True + context.verify_mode = ssl.CERT_REQUIRED + + with urlopen("https://jsonplaceholder.typicode.com/posts/1", timeout=10, context=context) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify( + { + "data": data, + "ssl_version": context.protocol, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: Empty response body (204 No Content) +@app.route("/test/empty-response", methods=["GET"]) +def test_empty_response(): + """Test handling of empty response bodies. + + Tests if the instrumentation correctly handles 204 No Content + or other empty response scenarios. + """ + try: + # httpbin.org/status/204 returns no content + with urlopen("https://httpbin.org/status/204", timeout=10) as response: + body = response.read() + return jsonify( + { + "status": response.status, + "body_length": len(body), + "is_empty": len(body) == 0, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: HEAD request (no body in response) +@app.route("/test/head-request", methods=["GET"]) +def test_head_request(): + """Test HEAD request which returns no body. + + HEAD requests should not have a response body but should have headers. + Tests if the instrumentation handles this correctly. + """ + try: + req = Request( + "https://jsonplaceholder.typicode.com/posts/1", + method="HEAD", + ) + with urlopen(req, timeout=10) as response: + body = response.read() + return jsonify( + { + "status": response.status, + "body_length": len(body), + "has_content_type": response.getheader("Content-Type") is not None, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: OPTIONS request +@app.route("/test/options-request", methods=["GET"]) +def test_options_request(): + """Test OPTIONS request method. + + OPTIONS requests are used for CORS preflight checks. + Tests if the instrumentation handles this HTTP method. + """ + try: + req = Request( + "https://httpbin.org/get", + method="OPTIONS", + ) + with urlopen(req, timeout=10) as response: + body = response.read() + allow_header = response.getheader("Allow") + return jsonify( + { + "status": response.status, + "body_length": len(body), + "allow_header": allow_header, + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: Binary request body (bytes) +@app.route("/test/binary-request-body", methods=["POST"]) +def test_binary_request_body(): + """Test POST request with binary (non-JSON) request body. + + Tests if the instrumentation correctly captures and replays + binary request bodies. + """ + try: + # Send raw bytes + binary_data = b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09" + req = Request( + "https://httpbin.org/post", + data=binary_data, + headers={"Content-Type": "application/octet-stream"}, + ) + with urlopen(req, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + # httpbin returns the base64-encoded data + return jsonify( + { + "status": response.status, + "data_received": data.get("data", ""), + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: HTTP 500 Internal Server Error +@app.route("/test/http-500-error", methods=["GET"]) +def test_http_500_error(): + """Test handling of HTTP 500 Internal Server Error. + + Tests if the instrumentation correctly handles and replays + server error responses. + """ + from urllib.error import HTTPError as UrllibHTTPError + + try: + with urlopen("https://httpbin.org/status/500", timeout=10) as response: + return jsonify({"status": response.status}) + except UrllibHTTPError as e: + body = e.read().decode("utf-8") if e.fp else "" + return jsonify( + { + "error": True, + "code": e.code, + "reason": e.reason, + } + ), 200 + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +# Test: Large query string +@app.route("/test/large-query-string", methods=["GET"]) +def test_large_query_string(): + """Test request with a large query string. + + Tests if the instrumentation correctly handles URLs with + many query parameters. + """ + try: + # Build a URL with many query parameters + params = "&".join([f"param{i}=value{i}" for i in range(20)]) + url = f"https://httpbin.org/get?{params}" + + with urlopen(url, timeout=10) as response: + data = json.loads(response.read().decode("utf-8")) + return jsonify( + { + "status": response.status, + "args_count": len(data.get("args", {})), + } + ) + except Exception as e: + return jsonify({"error": str(e)}), 500 + + +if __name__ == "__main__": + sdk.mark_app_as_ready() + app.run(host="0.0.0.0", port=8000, debug=False) diff --git a/drift/instrumentation/urllib/e2e-tests/src/test_requests.py b/drift/instrumentation/urllib/e2e-tests/src/test_requests.py new file mode 100644 index 0000000..b36e8e1 --- /dev/null +++ b/drift/instrumentation/urllib/e2e-tests/src/test_requests.py @@ -0,0 +1,116 @@ +"""Execute test requests against the Flask app to exercise the urllib instrumentation.""" + +from drift.instrumentation.e2e_common.test_utils import make_request, print_request_summary + +if __name__ == "__main__": + print("Starting test request sequence for urllib instrumentation...\n") + + # Health check + make_request("GET", "/health") + + # Basic GET request - urlopen with string URL + make_request("GET", "/api/get-json") + + # GET with Request object and custom headers + make_request("GET", "/api/get-with-request-object") + + # GET with query parameters + make_request("GET", "/api/get-with-params") + + # POST with JSON body + make_request( + "POST", + "/api/post-json", + json={"title": "Test Post", "body": "This is a test post body", "userId": 1}, + ) + + # POST with form data + make_request("POST", "/api/post-form") + + # PUT request + make_request("PUT", "/api/put-json") + + # PATCH request + make_request("PATCH", "/api/patch-json") + + # DELETE request + make_request("DELETE", "/api/delete") + + # Sequential chained requests + make_request("GET", "/api/chain") + + # Parallel requests with context propagation + make_request("GET", "/api/parallel") + + # Request with explicit timeout + make_request("GET", "/api/with-timeout") + + # Custom opener usage + make_request("GET", "/api/custom-opener") + + # Text response handling + make_request("GET", "/api/text-response") + + # urlopen with data parameter + make_request("POST", "/api/urlopen-with-data") + + # Bug-exposing tests (these tests expose bugs in the instrumentation) + # HTTP 404 error handling - tests HTTPError replay + make_request("GET", "/test/http-404-error") + + # HTTP redirect handling - tests geturl() after redirects + make_request("GET", "/test/http-redirect") + + # Additional edge case tests + # Partial read with read(amt) + make_request("GET", "/test/partial-read") + + # Response iteration using for loop + make_request("GET", "/test/response-iteration") + + # readline() method + make_request("GET", "/test/readline") + + # readlines() method + make_request("GET", "/test/readlines") + + # Multiple reads from same response + make_request("GET", "/test/multiple-reads") + + # getheaders() method + make_request("GET", "/test/getheaders") + + # getheader() method + make_request("GET", "/test/getheader") + + # getcode() method + make_request("GET", "/test/getcode") + + # urlretrieve function + make_request("GET", "/test/urlretrieve") + + # Response without context manager + make_request("GET", "/test/no-context-manager") + + # SSL context parameter + make_request("GET", "/test/ssl-context") + + # Empty response body (204 No Content) + make_request("GET", "/test/empty-response") + + # HEAD request + make_request("GET", "/test/head-request") + + # OPTIONS request + make_request("GET", "/test/options-request") + + # Binary request body + make_request("POST", "/test/binary-request-body") + + # HTTP 500 error + make_request("GET", "/test/http-500-error") + + # Large query string + make_request("GET", "/test/large-query-string") + + print_request_summary() diff --git a/drift/instrumentation/urllib/instrumentation.py b/drift/instrumentation/urllib/instrumentation.py new file mode 100644 index 0000000..66bbccf --- /dev/null +++ b/drift/instrumentation/urllib/instrumentation.py @@ -0,0 +1,1086 @@ +"""Instrumentation for urllib.request HTTP client library.""" + +from __future__ import annotations + +import base64 +import email +import json +import logging +import socket +from io import BytesIO +from typing import Any +from urllib.parse import parse_qs, urlparse + +# socket._GLOBAL_DEFAULT_TIMEOUT is a sentinel object used by urllib +# The type checker doesn't recognize it, so we access it via getattr +_GLOBAL_DEFAULT_TIMEOUT: object = getattr(socket, "_GLOBAL_DEFAULT_TIMEOUT") # noqa: B009 + +from opentelemetry.trace import Span, Status +from opentelemetry.trace import SpanKind as OTelSpanKind +from opentelemetry.trace import StatusCode as OTelStatusCode + + +class RequestDroppedByTransform(Exception): + """Exception raised when an outbound HTTP request is dropped by a transform rule. + + Attributes: + message: Error message explaining the drop + method: HTTP method (GET, POST, etc.) + url: Request URL that was dropped + """ + + def __init__(self, message: str, method: str, url: str): + self.message = message + self.method = method + self.url = url + super().__init__(message) + + +from ...core.data_normalization import create_mock_input_value, remove_none_values +from ...core.drift_sdk import TuskDrift +from ...core.json_schema_helper import DecodedType, EncodingType, SchemaMerge +from ...core.mode_utils import handle_record_mode, handle_replay_mode +from ...core.tracing import TdSpanAttributes +from ...core.tracing.span_utils import CreateSpanOptions, SpanUtils +from ...core.types import ( + PackageType, + SpanKind, + SpanStatus, + StatusCode, + TuskDriftMode, +) +from ..base import InstrumentationBase +from ..http import HttpSpanData, HttpTransformEngine + +logger = logging.getLogger(__name__) + +# Schema merge hints for headers (low match importance) +HEADER_SCHEMA_MERGES = { + "headers": SchemaMerge(match_importance=0.0), +} + + +class ResponseWrapper: + """Wrapper for urllib response that caches the body for re-reading. + + This wrapper reads and caches the entire response body on first access, + then provides the cached body to all subsequent reads. This allows the + instrumentation to capture the response body while still allowing the + application code to read the response normally. + """ + + def __init__(self, response: Any): + self._response = response + self._body: bytes | None = None + self._fp: BytesIO | None = None + + # Copy attributes from the original response + self.status = getattr(response, "status", getattr(response, "code", 200)) + self.code = self.status + self.reason = getattr(response, "reason", getattr(response, "msg", "OK")) + self.msg = self.reason + self.url = getattr(response, "url", "") + + def _ensure_body_cached(self) -> None: + """Read and cache the body from the original response if not already done.""" + if self._body is None: + self._body = self._response.read() + self._fp = BytesIO(self._body) + + def read(self, amt: int | None = None) -> bytes: + """Read response body from cache.""" + self._ensure_body_cached() + assert self._fp is not None # Guaranteed by _ensure_body_cached + if amt is None: + return self._fp.read() + return self._fp.read(amt) + + def readline(self) -> bytes: + """Read a line from response body.""" + self._ensure_body_cached() + assert self._fp is not None # Guaranteed by _ensure_body_cached + return self._fp.readline() + + def readlines(self) -> list[bytes]: + """Read all lines from response body.""" + self._ensure_body_cached() + assert self._fp is not None # Guaranteed by _ensure_body_cached + return self._fp.readlines() + + def info(self) -> Any: + """Return headers from original response.""" + return self._response.info() + + def geturl(self) -> str: + """Return the URL of the response.""" + return self.url + + def getcode(self) -> int: + """Return the HTTP status code.""" + return self.status + + def getheaders(self) -> list[tuple[str, str]]: + """Return headers as list of tuples.""" + if hasattr(self._response, "getheaders"): + return self._response.getheaders() + info = self.info() + if info: + return list(info.items()) + return [] + + def getheader(self, name: str, default: str | None = None) -> str | None: + """Get a specific header value.""" + if hasattr(self._response, "getheader"): + return self._response.getheader(name, default) + info = self.info() + if info: + return info.get(name, default) + return default + + def fileno(self) -> int: + """Return file descriptor.""" + if hasattr(self._response, "fileno"): + return self._response.fileno() + return -1 + + def get_cached_body(self) -> bytes: + """Get the cached body bytes (for instrumentation use).""" + self._ensure_body_cached() + assert self._body is not None # Guaranteed by _ensure_body_cached + return self._body + + def __enter__(self): + """Context manager entry.""" + return self + + def __exit__(self, *args): + """Context manager exit.""" + self.close() + + def close(self): + """Close the response.""" + if hasattr(self._response, "close"): + self._response.close() + + def __iter__(self): + """Iterate over response lines.""" + self._ensure_body_cached() + return self + + def __next__(self) -> bytes: + """Get next line.""" + self._ensure_body_cached() + assert self._fp is not None # Guaranteed by _ensure_body_cached + line = self._fp.readline() + if not line: + raise StopIteration + return line + + def __getattr__(self, name: str) -> Any: + """Forward any other attributes to the wrapped response.""" + return getattr(self._response, name) + + +class MockHTTPResponse: + """Mock HTTP response compatible with urllib.request expectations. + + This class mimics http.client.HTTPResponse with the modifications + that urllib.request makes (adding .url and .msg attributes). + """ + + def __init__( + self, + status_code: int, + reason: str, + headers: dict[str, str], + body: bytes, + url: str, + ): + self.status = status_code + self.code = status_code # urllib uses .code + self.reason = reason + self.msg = reason # urllib sets .msg = .reason + self.url = url + self._headers = headers + self._body = body + self._fp = BytesIO(body) + + def read(self, amt: int | None = None) -> bytes: + """Read response body.""" + if amt is None: + return self._fp.read() + return self._fp.read(amt) + + def readline(self) -> bytes: + """Read a line from response body.""" + return self._fp.readline() + + def readlines(self) -> list[bytes]: + """Read all lines from response body.""" + return self._fp.readlines() + + def info(self) -> email.message.Message: + """Return headers as email.message.Message (urllib convention).""" + header_str = "\r\n".join(f"{k}: {v}" for k, v in self._headers.items()) + return email.message_from_string(header_str) + + def geturl(self) -> str: + """Return the URL of the response.""" + return self.url + + def getcode(self) -> int: + """Return the HTTP status code.""" + return self.status + + def getheaders(self) -> list[tuple[str, str]]: + """Return headers as list of tuples.""" + return list(self._headers.items()) + + def getheader(self, name: str, default: str | None = None) -> str | None: + """Get a specific header value (case-insensitive).""" + name_lower = name.lower() + for key, value in self._headers.items(): + if key.lower() == name_lower: + return value + return default + + def fileno(self) -> int: + """Return -1 as there's no real file descriptor.""" + return -1 + + def __enter__(self): + """Context manager entry.""" + return self + + def __exit__(self, *args): + """Context manager exit.""" + self.close() + + def close(self): + """Close the response (no-op for mock).""" + pass + + def __iter__(self): + """Iterate over response lines.""" + return self + + def __next__(self) -> bytes: + """Get next line.""" + line = self._fp.readline() + if not line: + raise StopIteration + return line + + +class UrllibInstrumentation(InstrumentationBase): + """Instrumentation for the urllib.request HTTP client library. + + Patches OpenerDirector.open() to: + - Intercept HTTP requests in REPLAY mode and return mocked responses + - Capture request/response data as CLIENT spans in RECORD mode + + We patch OpenerDirector.open() instead of urlopen() because all HTTP calls + flow through OpenerDirector.open(), including custom opener usage via + build_opener(). This ensures complete coverage. + """ + + def __init__(self, enabled: bool = True, transforms: dict[str, Any] | None = None) -> None: + self._transform_engine = HttpTransformEngine(self._resolve_http_transforms(transforms)) + super().__init__( + name="UrllibInstrumentation", + module_name="urllib.request", + supported_versions="*", + enabled=enabled, + ) + + def _resolve_http_transforms( + self, provided: dict[str, Any] | list[dict[str, Any]] | None + ) -> list[dict[str, Any]] | None: + """Resolve HTTP transforms from provided config or SDK config.""" + if isinstance(provided, list): + return provided + if isinstance(provided, dict) and isinstance(provided.get("http"), list): + return provided["http"] + + sdk = TuskDrift.get_instance() + transforms = getattr(sdk.config, "transforms", None) + if isinstance(transforms, dict) and isinstance(transforms.get("http"), list): + return transforms["http"] + return None + + def patch(self, module: Any) -> None: + """Patch the urllib.request module. + + Patches OpenerDirector.open() to intercept all HTTP requests. + All urllib HTTP calls (urlopen(), custom openers, etc.) flow through + OpenerDirector.open(), making it the ideal patching point. + """ + if not hasattr(module, "OpenerDirector"): + logger.warning("urllib.request.OpenerDirector not found, skipping instrumentation") + return + + # Store original method + original_open = module.OpenerDirector.open + instrumentation_self = self + + def patched_open(opener_self, fullurl, data=None, timeout=_GLOBAL_DEFAULT_TIMEOUT): + """Patched OpenerDirector.open method. + + Args: + opener_self: OpenerDirector instance + fullurl: URL string or Request object + data: Optional request body data + timeout: Request timeout + """ + sdk = TuskDrift.get_instance() + + # Pass through if SDK is disabled + if sdk.mode == TuskDriftMode.DISABLED: + return original_open(opener_self, fullurl, data, timeout) + + # Set calling_library_context to suppress socket instrumentation warnings + # context_token = calling_library_context.set("urllib") + try: + # Extract URL for default response handler + if isinstance(fullurl, str): + url = fullurl + else: + url = fullurl.full_url + + def original_call(): + return original_open(opener_self, fullurl, data, timeout) + + # REPLAY mode: Use handle_replay_mode for proper background request handling + if sdk.mode == TuskDriftMode.REPLAY: + return handle_replay_mode( + replay_mode_handler=lambda: instrumentation_self._handle_replay_open( + sdk, fullurl, data, timeout + ), + no_op_request_handler=lambda: instrumentation_self._get_default_response(url), + is_server_request=False, + ) + + # RECORD mode: Use handle_record_mode for proper is_pre_app_start handling + return handle_record_mode( + original_function_call=original_call, + record_mode_handler=lambda is_pre_app_start: instrumentation_self._handle_record_open( + opener_self, fullurl, data, timeout, is_pre_app_start, original_open + ), + span_kind=OTelSpanKind.CLIENT, + ) + finally: + # calling_library_context.reset(context_token) + pass + + # Apply patch + module.OpenerDirector.open = patched_open + logger.info("urllib.request.OpenerDirector.open instrumented") + + def _extract_request_info(self, fullurl: Any, data: bytes | None) -> dict[str, Any]: + """Extract request information from urlopen arguments. + + Args: + fullurl: Either a URL string or a Request object + data: Optional request body + + Returns: + Dict with method, url, headers, body, etc. + """ + from urllib.request import Request + + if isinstance(fullurl, str): + req = Request(fullurl, data) + else: + req = fullurl + if data is not None: + req.data = data + + method = req.get_method() + url = req.full_url + # Combine both header dicts (unredirected_hdrs has precedence in urllib) + headers = {**req.headers, **req.unredirected_hdrs} + body = req.data + + return { + "method": method, + "url": url, + "headers": headers, + "body": body, + } + + def _get_default_response(self, url: str) -> MockHTTPResponse: + """Return default response for background requests in REPLAY mode. + + Background requests (health checks, metrics, etc.) that happen outside + of any trace context should return a default response instead of failing. + """ + logger.debug(f"[UrllibInstrumentation] Returning default response for background request to {url}") + return MockHTTPResponse( + status_code=200, + reason="OK", + headers={}, + body=b"", + url=url, + ) + + def _handle_record_open( + self, + opener_self: Any, + fullurl: Any, + data: bytes | None, + timeout: Any, + is_pre_app_start: bool, + original_open: Any, + ) -> Any: + """Handle OpenerDirector.open() in RECORD mode. + + Args: + opener_self: OpenerDirector instance + fullurl: URL string or Request object + data: Optional request body + timeout: Request timeout + is_pre_app_start: Whether this is before app start + original_open: Original OpenerDirector.open method + """ + request_info = self._extract_request_info(fullurl, data) + method = request_info["method"] + url = request_info["url"] + parsed_url = urlparse(url) + span_name = f"{method.upper()} {parsed_url.path or '/'}" + + # Create span using SpanUtils + span_info = SpanUtils.create_span( + CreateSpanOptions( + name=span_name, + kind=OTelSpanKind.CLIENT, + attributes={ + TdSpanAttributes.NAME: span_name, + TdSpanAttributes.PACKAGE_NAME: parsed_url.scheme, + TdSpanAttributes.INSTRUMENTATION_NAME: "UrllibInstrumentation", + TdSpanAttributes.SUBMODULE_NAME: method.upper(), + TdSpanAttributes.PACKAGE_TYPE: PackageType.HTTP.name, + TdSpanAttributes.IS_PRE_APP_START: is_pre_app_start, + }, + is_pre_app_start=is_pre_app_start, + ) + ) + + if not span_info: + # Span creation failed (trace blocked, etc.) - just make the request + return original_open(opener_self, fullurl, data, timeout) + + try: + with SpanUtils.with_span(span_info): + # Check drop transforms BEFORE making the request + headers = request_info.get("headers", {}) + if self._transform_engine and self._transform_engine.should_drop_outbound_request( + method.upper(), url, headers + ): + # Request should be dropped - mark span and raise exception + span_info.span.set_attribute( + TdSpanAttributes.OUTPUT_VALUE, + json.dumps({"bodyProcessingError": "dropped"}), + ) + span_info.span.set_status(Status(OTelStatusCode.ERROR, "Dropped by transform")) + raise RequestDroppedByTransform( + f"Outbound request to {url} was dropped by transform rule", + method.upper(), + url, + ) + + # Make the real request + error = None + response = None + wrapped_response = None + + try: + response = original_open(opener_self, fullurl, data, timeout) + # Wrap the response to allow body caching for instrumentation + # while still allowing the caller to read it + wrapped_response = ResponseWrapper(response) + return wrapped_response + except Exception as e: + error = e + raise + finally: + # Finalize span with request/response data + # Use wrapped_response if available (it caches the body) + self._finalize_span( + span_info.span, + method, + url, + wrapped_response if wrapped_response else response, + error, + request_info, + ) + finally: + span_info.span.end() + + def _handle_replay_open( + self, + sdk: TuskDrift, + fullurl: Any, + data: bytes | None, + timeout: Any, + ) -> MockHTTPResponse: + """Handle OpenerDirector.open() in REPLAY mode. + + Args: + sdk: TuskDrift instance + fullurl: URL string or Request object + data: Optional request body + timeout: Request timeout + """ + request_info = self._extract_request_info(fullurl, data) + method = request_info["method"] + url = request_info["url"] + parsed_url = urlparse(url) + span_name = f"{method.upper()} {parsed_url.path or '/'}" + + # Create span using SpanUtils + span_info = SpanUtils.create_span( + CreateSpanOptions( + name=span_name, + kind=OTelSpanKind.CLIENT, + attributes={ + TdSpanAttributes.NAME: span_name, + TdSpanAttributes.PACKAGE_NAME: parsed_url.scheme, + TdSpanAttributes.INSTRUMENTATION_NAME: "UrllibInstrumentation", + TdSpanAttributes.SUBMODULE_NAME: method.upper(), + TdSpanAttributes.PACKAGE_TYPE: PackageType.HTTP.name, + TdSpanAttributes.IS_PRE_APP_START: not sdk.app_ready, + }, + is_pre_app_start=not sdk.app_ready, + ) + ) + + if not span_info: + raise RuntimeError(f"Error creating span in replay mode for {method} {url}") + + try: + with SpanUtils.with_span(span_info): + # Use IDs from SpanInfo (already formatted) + mock_response = self._try_get_mock( + sdk, + method, + url, + span_info.trace_id, + span_info.span_id, + request_info, + ) + + if mock_response is not None: + return mock_response + + # No mock found - raise error in REPLAY mode + raise RuntimeError(f"No mock found for {method} {url} in REPLAY mode") + finally: + span_info.span.end() + + def _encode_body_to_base64(self, body_data: Any) -> tuple[str | None, int]: + """Encode body data to base64 string. + + Args: + body_data: Body data (str, bytes, dict, or other) + + Returns: + Tuple of (base64_encoded_string, original_byte_size) + """ + if body_data is None: + return None, 0 + + # Convert to bytes + if isinstance(body_data, bytes): + body_bytes = body_data + elif isinstance(body_data, str): + body_bytes = body_data.encode("utf-8") + elif isinstance(body_data, dict): + # JSON data + body_bytes = json.dumps(body_data).encode("utf-8") + else: + # Fallback: convert to string then encode + body_bytes = str(body_data).encode("utf-8") + + # Encode to base64 + base64_body = base64.b64encode(body_bytes).decode("ascii") + + return base64_body, len(body_bytes) + + def _get_decoded_type_from_content_type(self, content_type: str | None) -> DecodedType | None: + """Determine decoded type from Content-Type header. + + Args: + content_type: Content-Type header value + + Returns: + DecodedType enum value or None + """ + if not content_type: + return None + + # Extract main type (before semicolon) + main_type = content_type.lower().split(";")[0].strip() + + # Common content type mappings + CONTENT_TYPE_MAP = { + "application/json": DecodedType.JSON, + "text/plain": DecodedType.PLAIN_TEXT, + "text/html": DecodedType.HTML, + "application/x-www-form-urlencoded": DecodedType.FORM_DATA, + "multipart/form-data": DecodedType.MULTIPART_FORM, + "application/xml": DecodedType.XML, + "text/xml": DecodedType.XML, + } + + return CONTENT_TYPE_MAP.get(main_type) + + def _get_content_type_header(self, headers: dict) -> str | None: + """Get content-type header (case-insensitive lookup).""" + for key, value in headers.items(): + if key.lower() == "content-type": + return value + return None + + def _build_input_value( + self, + method: str, + url: str, + headers: dict[str, str], + body: bytes | None, + ) -> tuple[dict[str, Any], str | None, int]: + """Build the input value dictionary for HTTP requests. + + Args: + method: HTTP method (GET, POST, etc.) + url: Full request URL + headers: Request headers dictionary + body: Request body bytes (or None) + + Returns: + Tuple of (input_value dict, body_base64 string or None, body_size int) + """ + parsed_url = urlparse(url) + + # Parse query params from URL + params = {} + if parsed_url.query: + params = {k: v[0] if len(v) == 1 else v for k, v in parse_qs(parsed_url.query).items()} + + # Encode body to base64 + body_base64 = None + body_size = 0 + + if body is not None: + body_base64, body_size = self._encode_body_to_base64(body) + + input_value = { + "method": method.upper(), + "url": url, + "protocol": parsed_url.scheme, + "hostname": parsed_url.hostname, + "port": parsed_url.port, + "path": parsed_url.path or "/", + "headers": dict(headers), + "query": params, + } + + # Add body fields only if body exists + if body_base64 is not None: + input_value["body"] = body_base64 + input_value["bodySize"] = body_size + + return input_value, body_base64, body_size + + def _build_input_schema_merges( + self, + headers: dict[str, str], + body_base64: str | None, + ) -> dict[str, SchemaMerge]: + """Build schema merge hints for input value. + + Args: + headers: Request headers dictionary + body_base64: Base64-encoded body string (or None if no body) + + Returns: + Dictionary of schema merge hints + """ + input_schema_merges: dict[str, SchemaMerge] = { + "headers": SchemaMerge(match_importance=0.0), + } + + if body_base64 is not None: + request_content_type = self._get_content_type_header(headers) + input_schema_merges["body"] = SchemaMerge( + encoding=EncodingType.BASE64, + decoded_type=self._get_decoded_type_from_content_type(request_content_type), + ) + + return input_schema_merges + + def _try_get_mock( + self, + sdk: TuskDrift, + method: str, + url: str, + trace_id: str, + span_id: str, + request_info: dict[str, Any], + ) -> MockHTTPResponse | None: + """Try to get a mocked response from CLI. + + Returns: + Mocked response object if found, None otherwise + + Raises: + urllib.error.HTTPError: If the recorded response was an HTTPError + """ + try: + parsed_url = urlparse(url) + + # Extract request data + headers = request_info.get("headers", {}) + body = request_info.get("body") + + # Build input value using shared helper + raw_input_value, body_base64, _ = self._build_input_value(method, url, headers, body) + input_value = create_mock_input_value(raw_input_value) + + # Build schema merge hints using shared helper + input_schema_merges = self._build_input_schema_merges(headers, body_base64) + + # Use centralized mock finding utility + from ...core.mock_utils import find_mock_response_sync + + mock_response_output = find_mock_response_sync( + sdk=sdk, + trace_id=trace_id, + span_id=span_id, + name=f"{method.upper()} {parsed_url.path or '/'}", + package_name=parsed_url.scheme, + package_type=PackageType.HTTP, + instrumentation_name="UrllibInstrumentation", + submodule_name=method.upper(), + input_value=input_value, + kind=SpanKind.CLIENT, + input_schema_merges=input_schema_merges, + is_pre_app_start=not sdk.app_ready, + ) + + if not mock_response_output or not mock_response_output.found: + logger.debug(f"No mock found for {method} {url} (trace_id={trace_id})") + return None + + # Create mocked response object + if mock_response_output.response is None: + logger.debug(f"Mock found but response data is None for {method} {url}") + return None + + # Check if the recorded response was an error (HTTPError, URLError, etc.) + response_data = mock_response_output.response + error_name = response_data.get("errorName") + + if error_name == "HTTPError": + # The original request raised HTTPError - we need to raise it too + self._raise_http_error_from_mock(response_data, url) + + return self._create_mock_response(response_data, url) + + except Exception as e: + # Re-raise HTTPError (and other urllib errors) so they propagate correctly + from urllib.error import HTTPError, URLError + + if isinstance(e, (HTTPError, URLError)): + raise + logger.error(f"Error getting mock for {method} {url}: {e}") + return None + + def _create_mock_response(self, mock_data: dict[str, Any], url: str) -> MockHTTPResponse: + """Create a mocked urllib-compatible response object. + + Args: + mock_data: Mock response data from CLI + url: Request URL (original request URL) + + Returns: + MockHTTPResponse object + """ + status_code = mock_data.get("statusCode", 200) + reason = mock_data.get("statusMessage", "OK") + headers = dict(mock_data.get("headers", {})) + + # Use finalUrl from recorded response if present (indicates redirect occurred) + # Otherwise fall back to the original request URL + response_url = mock_data.get("finalUrl", url) + + # Remove content-encoding and transfer-encoding headers since the body + # was already decompressed when recorded + headers_to_remove = [] + for key in headers: + if key.lower() in ("content-encoding", "transfer-encoding"): + headers_to_remove.append(key) + for key in headers_to_remove: + del headers[key] + + # Decode body from base64 if needed + body = mock_data.get("body", "") + if isinstance(body, str): + # Try to decode as base64 first (expected format from CLI) + try: + decoded = base64.b64decode(body.encode("ascii"), validate=True) + # Verify round-trip works (confirms it's valid base64) + if base64.b64encode(decoded).decode("ascii") == body: + body_bytes = decoded + else: + body_bytes = body.encode("utf-8") + except Exception: + body_bytes = body.encode("utf-8") + elif isinstance(body, bytes): + body_bytes = body + else: + body_bytes = json.dumps(body).encode("utf-8") + + logger.debug(f"Created mock response: {status_code} for {response_url}") + return MockHTTPResponse( + status_code=status_code, + reason=reason, + headers=headers, + body=body_bytes, + url=response_url, + ) + + def _raise_http_error_from_mock(self, mock_data: dict[str, Any], url: str) -> None: + """Raise an HTTPError from mocked error response data. + + When the original request resulted in an HTTPError (4xx/5xx status codes), + we need to raise the same error during replay so application code that + catches HTTPError behaves the same way. + + Args: + mock_data: Mock response data containing errorName and errorMessage + url: Request URL + + Raises: + urllib.error.HTTPError: Always raises this exception + """ + from email.message import Message + from urllib.error import HTTPError + + error_message = mock_data.get("errorMessage", "") + + # Parse status code and reason from error message + # Format: "HTTP Error 404: NOT FOUND" + status_code = 500 # Default + reason = "Internal Server Error" + + if error_message.startswith("HTTP Error "): + try: + # Extract "404: NOT FOUND" part + parts = error_message[len("HTTP Error ") :].split(":", 1) + status_code = int(parts[0].strip()) + if len(parts) > 1: + reason = parts[1].strip() + except (ValueError, IndexError): + pass + + # Create empty headers (HTTPError requires headers object) + headers = Message() + + # Create empty body + body_fp = BytesIO(b"") + + logger.debug(f"Raising HTTPError {status_code} for {url} (replayed from recording)") + raise HTTPError(url, status_code, reason, headers, body_fp) + + def _finalize_span( + self, + span: Span, + method: str, + url: str, + response: Any, + error: Exception | None, + request_info: dict[str, Any], + ) -> None: + """Finalize span with request/response data. + + Args: + span: The OpenTelemetry span to finalize + method: HTTP method + url: Request URL + response: Response object (if successful) + error: Exception (if failed) + request_info: Original request info dict + """ + try: + # ===== BUILD INPUT VALUE ===== + headers = request_info.get("headers", {}) + body = request_info.get("body") + + # Build input value using shared helper + input_value, body_base64, _ = self._build_input_value(method, url, headers, body) + + # ===== BUILD OUTPUT VALUE ===== + output_value = {} + status = SpanStatus(code=StatusCode.OK, message="") + response_body_base64 = None + + if error: + output_value = { + "errorName": type(error).__name__, + "errorMessage": str(error), + } + status = SpanStatus(code=StatusCode.ERROR, message=str(error)) + elif response: + # Extract response data + # urllib responses are file-like, need to read carefully + response_status = getattr(response, "status", getattr(response, "code", 200)) + response_reason = getattr(response, "reason", getattr(response, "msg", "OK")) + + # Get headers - urllib uses info() which returns email.message.Message + response_headers = {} + try: + info = response.info() + if info: + response_headers = dict(info.items()) + except Exception: + pass + + response_body_size = 0 + + try: + # Read response content + # Use get_cached_body() for ResponseWrapper (which caches the body) + # This allows the caller to still read the response after we capture it + if hasattr(response, "get_cached_body"): + # ResponseWrapper - use cached body + response_bytes = response.get_cached_body() + else: + # Raw response - read directly (body will be consumed) + response_bytes = response.read() + + # Encode to base64 + response_body_base64, response_body_size = self._encode_body_to_base64(response_bytes) + except Exception: + response_body_base64 = None + response_body_size = 0 + + # Capture final URL (important for redirect scenarios) + # urllib sets response.url to the final URL after all redirects + final_url = getattr(response, "url", None) + if callable(final_url): + # Some response objects have geturl() method + final_url = final_url() + + output_value = { + "statusCode": response_status, + "statusMessage": response_reason, + "headers": response_headers, + } + + # Store final URL if different from request URL (indicates redirect) + if final_url and final_url != url: + output_value["finalUrl"] = final_url + + # Add body fields only if body exists + if response_body_base64 is not None: + output_value["body"] = response_body_base64 + output_value["bodySize"] = response_body_size + + if response_status >= 400: + status = SpanStatus( + code=StatusCode.ERROR, + message=f"HTTP {response_status}", + ) + + # Check if response content type should block the trace + from ...core.content_type_utils import get_decoded_type, should_block_content_type + from ...core.trace_blocking_manager import TraceBlockingManager + + response_content_type = response_headers.get("content-type") or response_headers.get("Content-Type") + decoded_type = get_decoded_type(response_content_type) + + if should_block_content_type(decoded_type): + # Block PARENT trace for outbound requests with binary responses + span_context = span.get_span_context() + trace_id = format(span_context.trace_id, "032x") + + blocking_mgr = TraceBlockingManager.get_instance() + blocking_mgr.block_trace( + trace_id, reason=f"outbound_binary:{decoded_type.name if decoded_type else 'unknown'}" + ) + logger.warning( + f"Blocking trace {trace_id} - outbound request returned binary response: {response_content_type} " + f"(decoded as {decoded_type.name if decoded_type else 'unknown'})" + ) + return # Skip finalizing span + else: + # No response and no error + output_value = {} + + # ===== APPLY TRANSFORMS ===== + transform_metadata = None + if self._transform_engine: + span_data = HttpSpanData( + kind=SpanKind.CLIENT, + input_value=input_value, + output_value=output_value, + ) + self._transform_engine.apply_transforms(span_data) + + # Update values with transformed data + input_value = span_data.input_value or input_value + output_value = span_data.output_value or output_value + transform_metadata = span_data.transform_metadata + + # ===== CREATE SCHEMA MERGE HINTS ===== + # Build input schema merges using shared helper + input_schema_merges = self._build_input_schema_merges(headers, body_base64) + + # Get response content type for output schema merges + response_content_type = None + if response: + try: + info = response.info() + if info: + response_content_type = info.get("content-type") + except Exception: + pass + + # Create schema merge hints for output + output_schema_merges: dict[str, SchemaMerge] = { + "headers": SchemaMerge(match_importance=0.0), + } + if response_body_base64 is not None: + output_schema_merges["body"] = SchemaMerge( + encoding=EncodingType.BASE64, + decoded_type=self._get_decoded_type_from_content_type(response_content_type), + ) + + # ===== SET SPAN ATTRIBUTES ===== + normalized_input = remove_none_values(input_value) + normalized_output = remove_none_values(output_value) + span.set_attribute(TdSpanAttributes.INPUT_VALUE, json.dumps(normalized_input)) + span.set_attribute(TdSpanAttributes.OUTPUT_VALUE, json.dumps(normalized_output)) + + # Set schema merges + from ..wsgi.utilities import _schema_merges_to_dict + + input_schema_merges_dict = _schema_merges_to_dict(input_schema_merges) + output_schema_merges_dict = _schema_merges_to_dict(output_schema_merges) + + span.set_attribute(TdSpanAttributes.INPUT_SCHEMA_MERGES, json.dumps(input_schema_merges_dict)) + span.set_attribute(TdSpanAttributes.OUTPUT_SCHEMA_MERGES, json.dumps(output_schema_merges_dict)) + + # Set transform metadata if present + if transform_metadata: + span.set_attribute(TdSpanAttributes.TRANSFORM_METADATA, json.dumps(transform_metadata)) + + # Set status + if status.code == StatusCode.ERROR: + span.set_status(Status(OTelStatusCode.ERROR, status.message)) + else: + span.set_status(Status(OTelStatusCode.OK)) + + except Exception as e: + logger.error(f"Error finalizing span for {method} {url}: {e}") + span.set_status(Status(OTelStatusCode.ERROR, str(e))) diff --git a/drift/instrumentation/urllib3/instrumentation.py b/drift/instrumentation/urllib3/instrumentation.py index 8ec7fe4..927711e 100644 --- a/drift/instrumentation/urllib3/instrumentation.py +++ b/drift/instrumentation/urllib3/instrumentation.py @@ -813,11 +813,14 @@ def _create_mock_response(self, urllib3_module: Any, mock_data: dict[str, Any], else: content = json.dumps(body).encode("utf-8") + final_url = mock_data.get("finalUrl") or url + response = urllib3_module.HTTPResponse( body=BytesIO(content), headers=headers, status=status_code, preload_content=True, + request_url=final_url, ) # Read the content to make it available via response.data @@ -923,6 +926,7 @@ def _finalize_span( "statusCode": status_code, "statusMessage": response.reason if hasattr(response, "reason") else "", "headers": response_headers, + "finalUrl": response.geturl() if hasattr(response, "geturl") else None, } # Add body fields only if body exists diff --git a/drift/instrumentation/wsgi/handler.py b/drift/instrumentation/wsgi/handler.py index 0a705dd..fdc95c3 100644 --- a/drift/instrumentation/wsgi/handler.py +++ b/drift/instrumentation/wsgi/handler.py @@ -413,8 +413,7 @@ def finalize_wsgi_span( span.set_attribute(TdSpanAttributes.TRANSFORM_METADATA, json.dumps(transform_metadata)) # Set status based on HTTP status code - # Match Node SDK: >= 300 is considered an error (redirects, client errors, server errors) - if status_code >= 300: + if status_code >= 400: span.set_status(Status(OTelStatusCode.ERROR, f"HTTP {status_code}")) else: span.set_status(Status(OTelStatusCode.OK)) diff --git a/pyproject.toml b/pyproject.toml index e9f3963..54e34cd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -128,6 +128,7 @@ include = [ "drift/instrumentation/django/**", "drift/instrumentation/psycopg/**", "drift/instrumentation/psycopg2/**", + "drift/instrumentation/pyjwt/**", "drift/instrumentation/redis/**", "drift/instrumentation/kinde/**", "drift/instrumentation/http/transform_engine.py",