diff --git a/posthog/__init__.py b/posthog/__init__.py index 98286dd7..b513d8de 100644 --- a/posthog/__init__.py +++ b/posthog/__init__.py @@ -22,6 +22,12 @@ InconclusiveMatchError as InconclusiveMatchError, RequiresServerEvaluation as RequiresServerEvaluation, ) +from posthog.request import ( + disable_connection_reuse as disable_connection_reuse, + enable_keep_alive as enable_keep_alive, + set_socket_options as set_socket_options, + SocketOptions as SocketOptions, +) from posthog.types import ( FeatureFlag, FlagsAndPayloads, diff --git a/posthog/request.py b/posthog/request.py index 2540f0e7..7199f3a8 100644 --- a/posthog/request.py +++ b/posthog/request.py @@ -1,19 +1,47 @@ import json import logging import re +import socket from dataclasses import dataclass from datetime import date, datetime from gzip import GzipFile from io import BytesIO -from typing import Any, Optional, Union +from typing import Any, List, Optional, Tuple, Union + import requests from dateutil.tz import tzutc +from requests.adapters import HTTPAdapter # type: ignore[import-untyped] +from urllib3.connection import HTTPConnection from urllib3.util.retry import Retry from posthog.utils import remove_trailing_slash from posthog.version import VERSION +SocketOptions = List[Tuple[int, int, Union[int, bytes]]] + +KEEPALIVE_IDLE_SECONDS = 60 +KEEPALIVE_INTERVAL_SECONDS = 60 +KEEPALIVE_PROBE_COUNT = 3 + +# TCP keepalive probes idle connections to prevent them from being dropped. +# SO_KEEPALIVE is cross-platform, but timing options vary: +# - Linux: TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT +# - macOS: only SO_KEEPALIVE (uses system defaults) +# - Windows: TCP_KEEPIDLE, TCP_KEEPINTVL (since Windows 10 1709) +KEEP_ALIVE_SOCKET_OPTIONS: SocketOptions = list( + HTTPConnection.default_socket_options +) + [ + (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), +] +for attr, value in [ + ("TCP_KEEPIDLE", KEEPALIVE_IDLE_SECONDS), + ("TCP_KEEPINTVL", KEEPALIVE_INTERVAL_SECONDS), + ("TCP_KEEPCNT", KEEPALIVE_PROBE_COUNT), +]: + if hasattr(socket, attr): + KEEP_ALIVE_SOCKET_OPTIONS.append((socket.SOL_TCP, getattr(socket, attr), value)) + def _mask_tokens_in_url(url: str) -> str: """Mask token values in URLs for safe logging, keeping first 10 chars visible.""" @@ -29,17 +57,69 @@ class GetResponse: not_modified: bool = False -# Retry on both connect and read errors -# by default read errors will only retry idempotent HTTP methods (so not POST) -adapter = requests.adapters.HTTPAdapter( - max_retries=Retry( - total=2, - connect=2, - read=2, +class HTTPAdapterWithSocketOptions(HTTPAdapter): + """HTTPAdapter with configurable socket options.""" + + def __init__(self, *args, socket_options: Optional[SocketOptions] = None, **kwargs): + self.socket_options = socket_options + super().__init__(*args, **kwargs) + + def init_poolmanager(self, *args, **kwargs): + if self.socket_options is not None: + kwargs["socket_options"] = self.socket_options + super().init_poolmanager(*args, **kwargs) + + +def _build_session(socket_options: Optional[SocketOptions] = None) -> requests.Session: + adapter = HTTPAdapterWithSocketOptions( + max_retries=Retry( + total=2, + connect=2, + read=2, + ), + socket_options=socket_options, ) -) -_session = requests.sessions.Session() -_session.mount("https://", adapter) + session = requests.sessions.Session() + session.mount("https://", adapter) + return session + + +_session = _build_session() +_socket_options: Optional[SocketOptions] = None +_pooling_enabled = True + + +def _get_session() -> requests.Session: + if _pooling_enabled: + return _session + return _build_session(_socket_options) + + +def set_socket_options(socket_options: Optional[SocketOptions]) -> None: + """ + Configure socket options for all HTTP connections. + + Example: + from posthog import set_socket_options + set_socket_options([(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)]) + """ + global _session, _socket_options + if socket_options == _socket_options: + return + _socket_options = socket_options + _session = _build_session(socket_options) + + +def enable_keep_alive() -> None: + """Enable TCP keepalive to prevent idle connections from being dropped.""" + set_socket_options(KEEP_ALIVE_SOCKET_OPTIONS) + + +def disable_connection_reuse() -> None: + """Disable connection reuse, creating a fresh connection for each request.""" + global _pooling_enabled + _pooling_enabled = False + US_INGESTION_ENDPOINT = "https://us.i.posthog.com" EU_INGESTION_ENDPOINT = "https://eu.i.posthog.com" @@ -85,7 +165,7 @@ def post( gz.write(data.encode("utf-8")) data = buf.getvalue() - res = _session.post(url, data=data, headers=headers, timeout=timeout) + res = _get_session().post(url, data=data, headers=headers, timeout=timeout) if res.status_code == 200: log.debug("data uploaded successfully") @@ -200,7 +280,7 @@ def get( if etag: headers["If-None-Match"] = etag - res = _session.get(full_url, headers=headers, timeout=timeout) + res = _get_session().get(full_url, headers=headers, timeout=timeout) masked_url = _mask_tokens_in_url(full_url) diff --git a/posthog/test/test_request.py b/posthog/test/test_request.py index 7eee835f..128123fe 100644 --- a/posthog/test/test_request.py +++ b/posthog/test/test_request.py @@ -6,16 +6,21 @@ import pytest import requests +import posthog.request as request_module from posthog.request import ( APIError, DatetimeSerializer, GetResponse, + KEEP_ALIVE_SOCKET_OPTIONS, QuotaLimitError, _mask_tokens_in_url, batch_post, decide, determine_server_host, + disable_connection_reuse, + enable_keep_alive, get, + set_socket_options, ) from posthog.test.test_utils import TEST_API_KEY @@ -344,3 +349,47 @@ def test_get_removes_trailing_slash_from_host(self, mock_get): ) def test_routing_to_custom_host(host, expected): assert determine_server_host(host) == expected + + +def test_enable_keep_alive_sets_socket_options(): + try: + enable_keep_alive() + from posthog.request import _session + + adapter = _session.get_adapter("https://example.com") + assert adapter.socket_options == KEEP_ALIVE_SOCKET_OPTIONS + finally: + set_socket_options(None) + + +def test_set_socket_options_clears_with_none(): + try: + enable_keep_alive() + set_socket_options(None) + from posthog.request import _session + + adapter = _session.get_adapter("https://example.com") + assert adapter.socket_options is None + finally: + set_socket_options(None) + + +def test_disable_connection_reuse_creates_fresh_sessions(): + try: + disable_connection_reuse() + session1 = request_module._get_session() + session2 = request_module._get_session() + assert session1 is not session2 + finally: + request_module._pooling_enabled = True + + +def test_set_socket_options_is_idempotent(): + try: + enable_keep_alive() + session1 = request_module._session + enable_keep_alive() + session2 = request_module._session + assert session1 is session2 + finally: + set_socket_options(None)