Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions posthog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@
InconclusiveMatchError as InconclusiveMatchError,
RequiresServerEvaluation as RequiresServerEvaluation,
)
from posthog.request import (
disable_connection_reuse as disable_connection_reuse,
enable_keep_alive as enable_keep_alive,
set_socket_options as set_socket_options,
SocketOptions as SocketOptions,
)
from posthog.types import (
FeatureFlag,
FlagsAndPayloads,
Expand Down
106 changes: 93 additions & 13 deletions posthog/request.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,47 @@
import json
import logging
import re
import socket
from dataclasses import dataclass
from datetime import date, datetime
from gzip import GzipFile
from io import BytesIO
from typing import Any, Optional, Union
from typing import Any, List, Optional, Tuple, Union


import requests
from dateutil.tz import tzutc
from requests.adapters import HTTPAdapter # type: ignore[import-untyped]
from urllib3.connection import HTTPConnection
from urllib3.util.retry import Retry

from posthog.utils import remove_trailing_slash
from posthog.version import VERSION

SocketOptions = List[Tuple[int, int, Union[int, bytes]]]

KEEPALIVE_IDLE_SECONDS = 60
KEEPALIVE_INTERVAL_SECONDS = 60
KEEPALIVE_PROBE_COUNT = 3

# TCP keepalive probes idle connections to prevent them from being dropped.
# SO_KEEPALIVE is cross-platform, but timing options vary:
# - Linux: TCP_KEEPIDLE, TCP_KEEPINTVL, TCP_KEEPCNT
# - macOS: only SO_KEEPALIVE (uses system defaults)
# - Windows: TCP_KEEPIDLE, TCP_KEEPINTVL (since Windows 10 1709)
KEEP_ALIVE_SOCKET_OPTIONS: SocketOptions = list(
HTTPConnection.default_socket_options
) + [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
]
for attr, value in [
("TCP_KEEPIDLE", KEEPALIVE_IDLE_SECONDS),
("TCP_KEEPINTVL", KEEPALIVE_INTERVAL_SECONDS),
("TCP_KEEPCNT", KEEPALIVE_PROBE_COUNT),
]:
if hasattr(socket, attr):
KEEP_ALIVE_SOCKET_OPTIONS.append((socket.SOL_TCP, getattr(socket, attr), value))


def _mask_tokens_in_url(url: str) -> str:
"""Mask token values in URLs for safe logging, keeping first 10 chars visible."""
Expand All @@ -29,17 +57,69 @@ class GetResponse:
not_modified: bool = False


# Retry on both connect and read errors
# by default read errors will only retry idempotent HTTP methods (so not POST)
adapter = requests.adapters.HTTPAdapter(
max_retries=Retry(
total=2,
connect=2,
read=2,
class HTTPAdapterWithSocketOptions(HTTPAdapter):
"""HTTPAdapter with configurable socket options."""

def __init__(self, *args, socket_options: Optional[SocketOptions] = None, **kwargs):
self.socket_options = socket_options
super().__init__(*args, **kwargs)

def init_poolmanager(self, *args, **kwargs):
if self.socket_options is not None:
kwargs["socket_options"] = self.socket_options
super().init_poolmanager(*args, **kwargs)


def _build_session(socket_options: Optional[SocketOptions] = None) -> requests.Session:
adapter = HTTPAdapterWithSocketOptions(
max_retries=Retry(
total=2,
connect=2,
read=2,
),
socket_options=socket_options,
)
)
_session = requests.sessions.Session()
_session.mount("https://", adapter)
session = requests.sessions.Session()
session.mount("https://", adapter)
return session


_session = _build_session()
_socket_options: Optional[SocketOptions] = None
_pooling_enabled = True


def _get_session() -> requests.Session:
if _pooling_enabled:
return _session
return _build_session(_socket_options)


def set_socket_options(socket_options: Optional[SocketOptions]) -> None:
"""
Configure socket options for all HTTP connections.

Example:
from posthog import set_socket_options
set_socket_options([(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)])
"""
global _session, _socket_options
if socket_options == _socket_options:
return
_socket_options = socket_options
_session = _build_session(socket_options)


def enable_keep_alive() -> None:
"""Enable TCP keepalive to prevent idle connections from being dropped."""
set_socket_options(KEEP_ALIVE_SOCKET_OPTIONS)


def disable_connection_reuse() -> None:
"""Disable connection reuse, creating a fresh connection for each request."""
global _pooling_enabled
_pooling_enabled = False


US_INGESTION_ENDPOINT = "https://us.i.posthog.com"
EU_INGESTION_ENDPOINT = "https://eu.i.posthog.com"
Expand Down Expand Up @@ -85,7 +165,7 @@ def post(
gz.write(data.encode("utf-8"))
data = buf.getvalue()

res = _session.post(url, data=data, headers=headers, timeout=timeout)
res = _get_session().post(url, data=data, headers=headers, timeout=timeout)

if res.status_code == 200:
log.debug("data uploaded successfully")
Expand Down Expand Up @@ -200,7 +280,7 @@ def get(
if etag:
headers["If-None-Match"] = etag

res = _session.get(full_url, headers=headers, timeout=timeout)
res = _get_session().get(full_url, headers=headers, timeout=timeout)

masked_url = _mask_tokens_in_url(full_url)

Expand Down
49 changes: 49 additions & 0 deletions posthog/test/test_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,21 @@
import pytest
import requests

import posthog.request as request_module
from posthog.request import (
APIError,
DatetimeSerializer,
GetResponse,
KEEP_ALIVE_SOCKET_OPTIONS,
QuotaLimitError,
_mask_tokens_in_url,
batch_post,
decide,
determine_server_host,
disable_connection_reuse,
enable_keep_alive,
get,
set_socket_options,
)
from posthog.test.test_utils import TEST_API_KEY

Expand Down Expand Up @@ -344,3 +349,47 @@ def test_get_removes_trailing_slash_from_host(self, mock_get):
)
def test_routing_to_custom_host(host, expected):
assert determine_server_host(host) == expected


def test_enable_keep_alive_sets_socket_options():
try:
enable_keep_alive()
from posthog.request import _session

adapter = _session.get_adapter("https://example.com")
assert adapter.socket_options == KEEP_ALIVE_SOCKET_OPTIONS
finally:
set_socket_options(None)


def test_set_socket_options_clears_with_none():
try:
enable_keep_alive()
set_socket_options(None)
from posthog.request import _session

adapter = _session.get_adapter("https://example.com")
assert adapter.socket_options is None
finally:
set_socket_options(None)


def test_disable_connection_reuse_creates_fresh_sessions():
try:
disable_connection_reuse()
session1 = request_module._get_session()
session2 = request_module._get_session()
assert session1 is not session2
finally:
request_module._pooling_enabled = True


def test_set_socket_options_is_idempotent():
try:
enable_keep_alive()
session1 = request_module._session
enable_keep_alive()
session2 = request_module._session
assert session1 is session2
finally:
set_socket_options(None)
Loading