From 21a84605489dfcc7ec5526172edcea3c42627c25 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Wed, 11 Feb 2026 19:15:29 -0500 Subject: [PATCH 01/18] Improve HTTP response handling and retry logic - Add Authorization header with Basic auth (base64 encoded write key) - Add X-Retry-Count header on all requests (starts at 0) - Implement Retry-After header support (capped at 300s) - Retry-After attempts don't count against backoff retry budget - Add granular status code classification: - Retryable 4xx: 408, 410, 429, 460 - Non-retryable 4xx: 400, 401, 403, 404, 413, 422 - Retryable 5xx: all except 501, 505 - Non-retryable 5xx: 501, 505 - Replace backoff decorator with custom retry loop - Exponential backoff with jitter (0.5s base, 60s cap) - Clear OAuth token on 511 Network Authentication Required - 413 Payload Too Large is non-retryable - Add 30 new comprehensive tests (106 total tests) Aligns with analytics-java and analytics-next retry behavior. Co-Authored-By: Claude Opus 4.6 --- segment/analytics/consumer.py | 140 +++++-- segment/analytics/request.py | 45 +- segment/analytics/test/test_consumer.py | 527 +++++++++++++++++++++++- segment/analytics/test/test_request.py | 153 ++++++- 4 files changed, 821 insertions(+), 44 deletions(-) diff --git a/segment/analytics/consumer.py b/segment/analytics/consumer.py index 157e3c9..2f939ac 100644 --- a/segment/analytics/consumer.py +++ b/segment/analytics/consumer.py @@ -1,10 +1,10 @@ import logging import time +import random from threading import Thread -import backoff import json -from segment.analytics.request import post, APIError, DatetimeSerializer +from segment.analytics.request import post, APIError, DatetimeSerializer, parse_retry_after from queue import Empty @@ -120,40 +120,108 @@ def next(self): return items def request(self, batch): - """Attempt to upload the batch and retry before raising an error """ - - def fatal_exception(exc): - if isinstance(exc, APIError): - # retry on server errors and client errors - # with 429 status code (rate limited), - # don't retry on other client errors - return (400 <= exc.status < 500) and exc.status != 429 - elif isinstance(exc, FatalError): - return True - else: - # retry on all other errors (eg. network) - return False - - attempt_count = 0 - - @backoff.on_exception( - backoff.expo, - Exception, - max_tries=self.retries + 1, - giveup=fatal_exception, - on_backoff=lambda details: self.log.debug( - f"Retry attempt {details['tries']}/{self.retries + 1} after {details['elapsed']:.2f}s" - )) - def send_request(): - nonlocal attempt_count - attempt_count += 1 + """Attempt to upload the batch and retry before raising an error""" + + def is_retryable_status(status): + """ + Determine if a status code is retryable. + Retryable 4xx: 408, 410, 429, 460 + Non-retryable 4xx: 400, 401, 403, 404, 413, 422, and all other 4xx + Retryable 5xx: All except 501, 505 + Non-retryable 5xx: 501, 505 + """ + if 400 <= status < 500: + return status in (408, 410, 429, 460) + elif 500 <= status < 600: + return status not in (501, 505) + return False + + def should_use_retry_after(status): + """Check if status code should respect Retry-After header""" + return status in (408, 429, 503) + + total_attempts = 0 + backoff_attempts = 0 + max_backoff_attempts = self.retries + 1 + + while True: try: - return post(self.write_key, self.host, gzip=self.gzip, - timeout=self.timeout, batch=batch, proxies=self.proxies, - oauth_manager=self.oauth_manager) - except Exception as e: - if attempt_count >= self.retries + 1: - self.log.error(f"All {self.retries} retries exhausted. Final error: {e}") + # Make the request with current retry count + response = post( + self.write_key, + self.host, + gzip=self.gzip, + timeout=self.timeout, + batch=batch, + proxies=self.proxies, + oauth_manager=self.oauth_manager, + retry_count=total_attempts + ) + # Success + return response + + except FatalError as e: + # Non-retryable error + self.log.error(f"Fatal error after {total_attempts} attempts: {e}") raise - send_request() + except APIError as e: + total_attempts += 1 + + # Check if we should use Retry-After header + if should_use_retry_after(e.status) and e.response: + retry_after = parse_retry_after(e.response) + if retry_after: + self.log.debug( + f"Retry-After header present: waiting {retry_after}s (attempt {total_attempts})" + ) + time.sleep(retry_after) + continue # Does not count against backoff budget + + # Check if status is retryable + if not is_retryable_status(e.status): + self.log.error( + f"Non-retryable error {e.status} after {total_attempts} attempts: {e}" + ) + raise + + # Count this against backoff attempts + backoff_attempts += 1 + if backoff_attempts >= max_backoff_attempts: + self.log.error( + f"All {self.retries} retries exhausted after {total_attempts} total attempts. Final error: {e}" + ) + raise + + # Calculate exponential backoff delay with jitter + base_delay = 0.5 * (2 ** (backoff_attempts - 1)) + jitter = random.uniform(0, 0.1 * base_delay) + delay = min(base_delay + jitter, 60) # Cap at 60 seconds + + self.log.debug( + f"Retry attempt {backoff_attempts}/{self.retries} (total attempts: {total_attempts}) " + f"after {delay:.2f}s for status {e.status}" + ) + time.sleep(delay) + + except Exception as e: + # Network errors or other exceptions - retry with backoff + total_attempts += 1 + backoff_attempts += 1 + + if backoff_attempts >= max_backoff_attempts: + self.log.error( + f"All {self.retries} retries exhausted after {total_attempts} total attempts. Final error: {e}" + ) + raise + + # Calculate exponential backoff delay with jitter + base_delay = 0.5 * (2 ** (backoff_attempts - 1)) + jitter = random.uniform(0, 0.1 * base_delay) + delay = min(base_delay + jitter, 60) # Cap at 60 seconds + + self.log.debug( + f"Network error retry {backoff_attempts}/{self.retries} (total attempts: {total_attempts}) " + f"after {delay:.2f}s: {e}" + ) + time.sleep(delay) diff --git a/segment/analytics/request.py b/segment/analytics/request.py index ab92b80..511a8a8 100644 --- a/segment/analytics/request.py +++ b/segment/analytics/request.py @@ -3,6 +3,7 @@ from gzip import GzipFile import logging import json +import base64 from dateutil.tz import tzutc from requests.auth import HTTPBasicAuth from requests import sessions @@ -12,8 +13,31 @@ _session = sessions.Session() +# Maximum Retry-After delay to respect (5 minutes) +MAX_RETRY_AFTER_SECONDS = 300 -def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manager=None, **kwargs): + +def parse_retry_after(response): + """ + Parse Retry-After header from response. + Returns the delay in seconds, or None if header is not present or invalid. + Caps the value at MAX_RETRY_AFTER_SECONDS. + """ + retry_after = response.headers.get('Retry-After') + if not retry_after: + return None + + try: + # Try parsing as integer (delay in seconds) + delay = int(retry_after) + return min(delay, MAX_RETRY_AFTER_SECONDS) + except ValueError: + # Could be HTTP-date format, but for simplicity we'll skip that + # Most APIs use integer seconds + return None + + +def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manager=None, retry_count=0, **kwargs): """Post the `kwargs` to the API""" log = logging.getLogger('segment') body = kwargs @@ -28,10 +52,18 @@ def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manag log.debug('making request: %s', data) headers = { 'Content-Type': 'application/json', - 'User-Agent': 'analytics-python/' + VERSION + 'User-Agent': 'analytics-python/' + VERSION, + 'X-Retry-Count': str(retry_count) } + + # Add Authorization header - prefer OAuth Bearer token, fallback to Basic auth if auth: headers['Authorization'] = 'Bearer {}'.format(auth) + else: + # Basic auth with write key (format: "writeKey:" encoded in base64) + credentials = '{}:'.format(write_key) + encoded = base64.b64encode(credentials.encode('utf-8')).decode('utf-8') + headers['Authorization'] = 'Basic {}'.format(encoded) if gzip: headers['Content-Encoding'] = 'gzip' @@ -60,24 +92,25 @@ def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manag log.debug('data uploaded successfully') return res - if oauth_manager and res.status_code in [400, 401, 403]: + if oauth_manager and res.status_code in [400, 401, 403, 511]: oauth_manager.clear_token() try: payload = res.json() log.debug('received response: %s', payload) - raise APIError(res.status_code, payload['code'], payload['message']) + raise APIError(res.status_code, payload['code'], payload['message'], res) except ValueError: log.error('Unknown error: [%s] %s', res.status_code, res.reason) - raise APIError(res.status_code, 'unknown', res.text) + raise APIError(res.status_code, 'unknown', res.text, res) class APIError(Exception): - def __init__(self, status, code, message): + def __init__(self, status, code, message, response=None): self.message = message self.status = status self.code = code + self.response = response def __str__(self): msg = "[Segment] {0}: {1} ({2})" diff --git a/segment/analytics/test/test_consumer.py b/segment/analytics/test/test_consumer.py index 8371726..1b9718b 100644 --- a/segment/analytics/test/test_consumer.py +++ b/segment/analytics/test/test_consumer.py @@ -8,7 +8,7 @@ except ImportError: from Queue import Queue -from segment.analytics.consumer import Consumer, MAX_MSG_SIZE +from segment.analytics.consumer import Consumer, MAX_MSG_SIZE, FatalError from segment.analytics.request import APIError @@ -220,3 +220,528 @@ def mock_post_fn(*args, **kwargs): args, kwargs = mock_post.call_args cls().assertIn('proxies', kwargs) cls().assertEqual(kwargs['proxies'], proxies) + + def test_retry_count_header_increments(self): + """Test that X-Retry-Count header increments on each retry""" + consumer = Consumer(None, 'testsecret', retries=3) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + retry_counts = [] + + def mock_post_fn(*args, **kwargs): + retry_counts.append(kwargs.get('retry_count', 0)) + if len(retry_counts) < 3: + raise APIError(500, 'error', 'Server Error') + # Success on third attempt + return mock.Mock(status_code=200) + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + consumer.request([track]) + + # Should have been called 3 times with retry counts 0, 1, 2 + self.assertEqual(retry_counts, [0, 1, 2]) + + def test_non_retryable_4xx_status_codes(self): + """Test that non-retryable 4xx errors are not retried""" + consumer = Consumer(None, 'testsecret', retries=3) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + non_retryable_codes = [400, 401, 403, 404, 413, 422] + + for status_code in non_retryable_codes: + call_count = 0 + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + raise APIError(status_code, 'error', f'Client Error {status_code}') + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + try: + consumer.request([track]) + except APIError as e: + self.assertEqual(e.status, status_code) + + # Should only be called once (no retries) + self.assertEqual(call_count, 1, f'Status {status_code} should not be retried') + + def test_retryable_4xx_status_codes(self): + """Test that retryable 4xx errors are retried""" + consumer = Consumer(None, 'testsecret', retries=3) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + retryable_codes = [408, 410, 429, 460] + + for status_code in retryable_codes: + call_count = 0 + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count < 3: + raise APIError(status_code, 'error', f'Retryable Error {status_code}') + # Success on third attempt + return mock.Mock(status_code=200) + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep'): # Mock sleep to speed up test + consumer.request([track]) + + # Should have been called 3 times + self.assertEqual(call_count, 3, f'Status {status_code} should be retried') + + def test_non_retryable_5xx_status_codes(self): + """Test that non-retryable 5xx errors are not retried""" + consumer = Consumer(None, 'testsecret', retries=3) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + non_retryable_codes = [501, 505] + + for status_code in non_retryable_codes: + call_count = 0 + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + raise APIError(status_code, 'error', f'Server Error {status_code}') + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + try: + consumer.request([track]) + except APIError as e: + self.assertEqual(e.status, status_code) + + # Should only be called once (no retries) + self.assertEqual(call_count, 1, f'Status {status_code} should not be retried') + + def test_retryable_5xx_status_codes(self): + """Test that retryable 5xx errors are retried""" + consumer = Consumer(None, 'testsecret', retries=3) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + retryable_codes = [500, 502, 503, 504] + + for status_code in retryable_codes: + call_count = 0 + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count < 3: + raise APIError(status_code, 'error', f'Server Error {status_code}') + # Success on third attempt + return mock.Mock(status_code=200) + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep'): # Mock sleep to speed up test + consumer.request([track]) + + # Should have been called 3 times + self.assertEqual(call_count, 3, f'Status {status_code} should be retried') + + def test_retry_after_header_support(self): + """Test that Retry-After header is respected and doesn't count against retry budget""" + consumer = Consumer(None, 'testsecret', retries=2) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + call_count = 0 + sleep_durations = [] + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + + if call_count <= 3: + # Return 429 with Retry-After for first 3 attempts + response = mock.Mock() + response.headers = {'Retry-After': '10'} + error = APIError(429, 'rate_limit', 'Too Many Requests') + error.response = response + raise error + + # Success on 4th attempt + return mock.Mock(status_code=200) + + def mock_sleep(duration): + sleep_durations.append(duration) + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep', side_effect=mock_sleep): + consumer.request([track]) + + # Should succeed after 4 attempts (3 Retry-After, then success) + self.assertEqual(call_count, 4) + + # First 3 sleeps should be for Retry-After (10 seconds each) + self.assertEqual(sleep_durations[:3], [10, 10, 10]) + + def test_retry_after_capped_at_300_seconds(self): + """Test that Retry-After delay is capped at 300 seconds""" + consumer = Consumer(None, 'testsecret', retries=2) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + call_count = 0 + sleep_duration = None + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + + if call_count == 1: + # Return 429 with large Retry-After + response = mock.Mock() + response.headers = {'Retry-After': '600'} # 10 minutes + error = APIError(429, 'rate_limit', 'Too Many Requests') + error.response = response + raise error + + # Success on 2nd attempt + return mock.Mock(status_code=200) + + def mock_sleep(duration): + nonlocal sleep_duration + sleep_duration = duration + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep', side_effect=mock_sleep): + consumer.request([track]) + + # Sleep should be capped at 300 seconds + self.assertEqual(sleep_duration, 300) + + def test_retry_after_for_408_and_503(self): + """Test that Retry-After is respected for 408 and 503 status codes""" + consumer = Consumer(None, 'testsecret', retries=2) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + for status_code in [408, 503]: + call_count = 0 + sleep_duration = None + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + + if call_count == 1: + response = mock.Mock() + response.headers = {'Retry-After': '5'} + error = APIError(status_code, 'error', 'Error') + error.response = response + raise error + + return mock.Mock(status_code=200) + + def mock_sleep(duration): + nonlocal sleep_duration + sleep_duration = duration + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep', side_effect=mock_sleep): + consumer.request([track]) + + self.assertEqual(sleep_duration, 5, f'Retry-After should be respected for {status_code}') + + def test_exponential_backoff_with_jitter(self): + """Test that exponential backoff is used for retries without Retry-After""" + consumer = Consumer(None, 'testsecret', retries=4) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + call_count = 0 + sleep_durations = [] + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + + if call_count <= 3: + raise APIError(500, 'error', 'Server Error') + + return mock.Mock(status_code=200) + + def mock_sleep(duration): + sleep_durations.append(duration) + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep', side_effect=mock_sleep): + consumer.request([track]) + + # Should have 3 backoff delays + self.assertEqual(len(sleep_durations), 3) + + # Delays should be increasing (exponential) + # First: ~0.5s, Second: ~1s, Third: ~2s (with jitter) + self.assertGreater(sleep_durations[0], 0.4) + self.assertLess(sleep_durations[0], 1.0) + self.assertGreater(sleep_durations[1], 0.9) + self.assertLess(sleep_durations[1], 2.0) + self.assertGreater(sleep_durations[2], 1.8) + self.assertLess(sleep_durations[2], 4.0) + + def test_fatal_error_not_retried(self): + """Test that FatalError is not retried""" + consumer = Consumer(None, 'testsecret', retries=3) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + call_count = 0 + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + raise FatalError('Fatal error occurred') + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + try: + consumer.request([track]) + except FatalError: + pass + + # Should only be called once (no retries) + self.assertEqual(call_count, 1) + + def test_max_retries_exhausted(self): + """Test that request fails after max retries exhausted""" + consumer = Consumer(None, 'testsecret', retries=2) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + call_count = 0 + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + # Always fail with retryable error + raise APIError(500, 'error', 'Server Error') + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep'): # Mock sleep to speed up test + try: + consumer.request([track]) + except APIError as e: + self.assertEqual(e.status, 500) + + # Should be called 3 times (initial + 2 retries) + self.assertEqual(call_count, 3) + + def test_first_request_has_retry_count_zero(self): + """T01: First successful request includes X-Retry-Count=0""" + consumer = Consumer(None, 'testsecret') + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + retry_count = None + + def mock_post_fn(*args, **kwargs): + nonlocal retry_count + retry_count = kwargs.get('retry_count') + return mock.Mock(status_code=200) + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + consumer.request([track]) + + # First request should have retry_count=0 + self.assertEqual(retry_count, 0) + + def test_429_without_retry_after_uses_backoff(self): + """T09: 429 without Retry-After header uses backoff retry""" + consumer = Consumer(None, 'testsecret', retries=3) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + call_count = 0 + retry_counts = [] + sleep_duration = None + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + retry_counts.append(kwargs.get('retry_count', 0)) + + if call_count == 1: + # 429 without Retry-After header + error = APIError(429, 'rate_limit', 'Too Many Requests') + error.response = mock.Mock() + error.response.headers = {} # No Retry-After + raise error + + return mock.Mock(status_code=200) + + def mock_sleep(duration): + nonlocal sleep_duration + sleep_duration = duration + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep', side_effect=mock_sleep): + consumer.request([track]) + + # Should have two attempts + self.assertEqual(call_count, 2) + self.assertEqual(retry_counts, [0, 1]) + + # Should use backoff delay (around 0.5s with jitter) + self.assertIsNotNone(sleep_duration) + if sleep_duration is not None: + self.assertGreater(sleep_duration, 0.4) + self.assertLess(sleep_duration, 1.0) + + def test_408_without_retry_after_uses_backoff(self): + """T10: 408 without Retry-After header uses backoff retry""" + consumer = Consumer(None, 'testsecret', retries=3) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + call_count = 0 + retry_counts = [] + sleep_duration = None + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + retry_counts.append(kwargs.get('retry_count', 0)) + + if call_count == 1: + # 408 without Retry-After header + error = APIError(408, 'timeout', 'Request Timeout') + error.response = mock.Mock() + error.response.headers = {} # No Retry-After + raise error + + return mock.Mock(status_code=200) + + def mock_sleep(duration): + nonlocal sleep_duration + sleep_duration = duration + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep', side_effect=mock_sleep): + consumer.request([track]) + + # Should have two attempts + self.assertEqual(call_count, 2) + self.assertEqual(retry_counts, [0, 1]) + + # Should use backoff delay + self.assertIsNotNone(sleep_duration) + if sleep_duration is not None: + self.assertGreater(sleep_duration, 0.4) + self.assertLess(sleep_duration, 1.0) + + def test_network_error_retried_with_backoff(self): + """T15: Network/IO error is retried with backoff""" + consumer = Consumer(None, 'testsecret', retries=3) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + call_count = 0 + retry_counts = [] + sleep_duration = None + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + retry_counts.append(kwargs.get('retry_count', 0)) + + if call_count == 1: + # Network error + raise ConnectionError('Network connection failed') + + return mock.Mock(status_code=200) + + def mock_sleep(duration): + nonlocal sleep_duration + sleep_duration = duration + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep', side_effect=mock_sleep): + consumer.request([track]) + + # Should have two attempts + self.assertEqual(call_count, 2) + self.assertEqual(retry_counts, [0, 1]) + + # Should use backoff delay + self.assertIsNotNone(sleep_duration) + if sleep_duration is not None: + self.assertGreater(sleep_duration, 0.4) + self.assertLess(sleep_duration, 1.0) + + def test_511_is_retryable(self): + """T05: 511 status code is retryable (part of 5xx family, not in non-retryable list)""" + consumer = Consumer(None, 'testsecret', retries=3) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + call_count = 0 + retry_counts = [] + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + retry_counts.append(kwargs.get('retry_count', 0)) + + if call_count < 3: + raise APIError(511, 'auth_required', 'Network Authentication Required') + + return mock.Mock(status_code=200) + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep'): # Mock sleep to speed up test + consumer.request([track]) + + # Should have been called 3 times (511 is retryable) + self.assertEqual(call_count, 3) + self.assertEqual(retry_counts, [0, 1, 2]) + + def test_retry_after_not_counted_against_backoff_budget(self): + """T17: Retry-After attempts don't consume backoff retry budget""" + consumer = Consumer(None, 'testsecret', retries=1) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + call_count = 0 + retry_counts = [] + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + retry_counts.append(kwargs.get('retry_count', 0)) + + if call_count <= 2: + # First two: 429 with Retry-After (shouldn't count against budget) + response = mock.Mock() + response.headers = {'Retry-After': '1'} + error = APIError(429, 'rate_limit', 'Too Many Requests') + error.response = response + raise error + elif call_count == 3: + # Third: 500 without Retry-After (counts against budget) + raise APIError(500, 'error', 'Server Error') + + # Success on 4th attempt + return mock.Mock(status_code=200) + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep'): # Mock sleep to speed up test + consumer.request([track]) + + # Should succeed after 4 attempts: + # - 2 Retry-After attempts (don't count against budget) + # - 1 backoff attempt (counts against budget = 1) + # - 1 final backoff attempt (counts against budget = 1, limit reached) + # Actually wait, with retries=1, we have max_backoff_attempts=2 + # So: 2 Retry-After + 2 backoff attempts = 4 total + self.assertEqual(call_count, 4) + self.assertEqual(retry_counts, [0, 1, 2, 3]) + + def test_413_payload_too_large_not_retried(self): + """T12: 413 Payload Too Large is non-retryable (won't succeed on retry)""" + consumer = Consumer(None, 'testsecret', retries=3) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + call_count = 0 + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + raise APIError(413, 'payload_too_large', 'Payload Too Large') + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + try: + consumer.request([track]) + except APIError as e: + self.assertEqual(e.status, 413) + + # Should only be called once (no retries) + self.assertEqual(call_count, 1) diff --git a/segment/analytics/test/test_request.py b/segment/analytics/test/test_request.py index 5ffca00..54b48be 100644 --- a/segment/analytics/test/test_request.py +++ b/segment/analytics/test/test_request.py @@ -2,9 +2,10 @@ import unittest import json import requests +import base64 from unittest import mock -from segment.analytics.request import post, DatetimeSerializer +from segment.analytics.request import post, DatetimeSerializer, parse_retry_after, APIError class TestRequests(unittest.TestCase): @@ -72,3 +73,153 @@ def mock_post_fn(*args, **kwargs): args, kwargs = mock_post.call_args self.assertIn('proxies', kwargs) self.assertEqual(kwargs['proxies'], proxies) + + def test_authorization_header_basic_auth(self): + """Test that Basic Authorization header is added when no OAuth manager""" + def mock_post_fn(*args, **kwargs): + res = mock.Mock() + res.status_code = 200 + return res + + with mock.patch('segment.analytics.request._session.post', side_effect=mock_post_fn) as mock_post: + post('testsecret', batch=[{ + 'userId': 'userId', + 'event': 'python event', + 'type': 'track' + }]) + + args, kwargs = mock_post.call_args + headers = kwargs['headers'] + self.assertIn('Authorization', headers) + + # Verify it's Basic auth with correct encoding + expected_credentials = base64.b64encode(b'testsecret:').decode('utf-8') + expected_auth = f'Basic {expected_credentials}' + self.assertEqual(headers['Authorization'], expected_auth) + + def test_authorization_header_oauth(self): + """Test that Bearer Authorization header is used with OAuth manager""" + oauth_manager = mock.Mock() + oauth_manager.get_token.return_value = 'test_token_123' + + def mock_post_fn(*args, **kwargs): + res = mock.Mock() + res.status_code = 200 + return res + + with mock.patch('segment.analytics.request._session.post', side_effect=mock_post_fn) as mock_post: + post('testsecret', oauth_manager=oauth_manager, batch=[{ + 'userId': 'userId', + 'event': 'python event', + 'type': 'track' + }]) + + args, kwargs = mock_post.call_args + headers = kwargs['headers'] + self.assertIn('Authorization', headers) + self.assertEqual(headers['Authorization'], 'Bearer test_token_123') + + def test_x_retry_count_header(self): + """Test that X-Retry-Count header is included""" + def mock_post_fn(*args, **kwargs): + res = mock.Mock() + res.status_code = 200 + return res + + with mock.patch('segment.analytics.request._session.post', side_effect=mock_post_fn) as mock_post: + # Test with retry_count=0 (first attempt) + post('testsecret', retry_count=0, batch=[{ + 'userId': 'userId', + 'event': 'python event', + 'type': 'track' + }]) + + args, kwargs = mock_post.call_args + headers = kwargs['headers'] + self.assertIn('X-Retry-Count', headers) + self.assertEqual(headers['X-Retry-Count'], '0') + + with mock.patch('segment.analytics.request._session.post', side_effect=mock_post_fn) as mock_post: + # Test with retry_count=5 + post('testsecret', retry_count=5, batch=[{ + 'userId': 'userId', + 'event': 'python event', + 'type': 'track' + }]) + + args, kwargs = mock_post.call_args + headers = kwargs['headers'] + self.assertEqual(headers['X-Retry-Count'], '5') + + def test_parse_retry_after_integer(self): + """Test parsing Retry-After header with integer seconds""" + response = mock.Mock() + response.headers = {'Retry-After': '30'} + result = parse_retry_after(response) + self.assertEqual(result, 30) + + def test_parse_retry_after_capped(self): + """Test that Retry-After is capped at 300 seconds""" + response = mock.Mock() + response.headers = {'Retry-After': '600'} + result = parse_retry_after(response) + self.assertEqual(result, 300) + + def test_parse_retry_after_missing(self): + """Test parsing when Retry-After header is missing""" + response = mock.Mock() + response.headers = {} + result = parse_retry_after(response) + self.assertIsNone(result) + + def test_parse_retry_after_invalid(self): + """Test parsing with invalid Retry-After header""" + response = mock.Mock() + response.headers = {'Retry-After': 'invalid'} + result = parse_retry_after(response) + self.assertIsNone(result) + + def test_oauth_token_cleared_on_511(self): + """Test that OAuth token is cleared on 511 status""" + oauth_manager = mock.Mock() + oauth_manager.get_token.return_value = 'test_token' + + def mock_post_fn(*args, **kwargs): + res = mock.Mock() + res.status_code = 511 + res.json.return_value = {'code': 'error', 'message': 'Network Authentication Required'} + return res + + with mock.patch('segment.analytics.request._session.post', side_effect=mock_post_fn): + try: + post('testsecret', oauth_manager=oauth_manager, batch=[{ + 'userId': 'userId', + 'event': 'python event', + 'type': 'track' + }]) + except APIError: + pass + + # Verify clear_token was called + oauth_manager.clear_token.assert_called_once() + + def test_api_error_includes_response(self): + """Test that APIError includes the response object""" + def mock_post_fn(*args, **kwargs): + res = mock.Mock() + res.status_code = 429 + res.json.return_value = {'code': 'rate_limit', 'message': 'Too Many Requests'} + return res + + with mock.patch('segment.analytics.request._session.post', side_effect=mock_post_fn): + try: + post('testsecret', batch=[{ + 'userId': 'userId', + 'event': 'python event', + 'type': 'track' + }]) + except APIError as e: + self.assertEqual(e.status, 429) + self.assertIsNotNone(e.response) + else: + self.fail('Expected APIError to be raised') From e4f34d0d377bb608b42537f31aeb061f6ec5d5b5 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Wed, 11 Feb 2026 20:06:09 -0500 Subject: [PATCH 02/18] Increase max retries from 10 to 1000 Aligns with analytics-java change to accommodate shorter backoff periods (0.5s base, 60s cap). With faster retries, a higher retry limit allows for better resilience during extended outages. Co-Authored-By: Claude Opus 4.6 --- segment/analytics/client.py | 2 +- segment/analytics/consumer.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/segment/analytics/client.py b/segment/analytics/client.py index 0f8015c..79262e9 100644 --- a/segment/analytics/client.py +++ b/segment/analytics/client.py @@ -30,7 +30,7 @@ class DefaultConfig(object): max_queue_size = 10000 gzip = False timeout = 15 - max_retries = 10 + max_retries = 1000 proxies = None thread = 1 upload_interval = 0.5 diff --git a/segment/analytics/consumer.py b/segment/analytics/consumer.py index 2f939ac..db0fe9d 100644 --- a/segment/analytics/consumer.py +++ b/segment/analytics/consumer.py @@ -29,7 +29,7 @@ class Consumer(Thread): log = logging.getLogger('segment') def __init__(self, queue, write_key, upload_size=100, host=None, - on_error=None, upload_interval=0.5, gzip=False, retries=10, + on_error=None, upload_interval=0.5, gzip=False, retries=1000, timeout=15, proxies=None, oauth_manager=None): """Create a consumer thread.""" Thread.__init__(self) From 8bd4163e6d470454545b90cc782bba8902b806c2 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Thu, 12 Feb 2026 18:07:48 -0500 Subject: [PATCH 03/18] Update segment/analytics/test/test_request.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- segment/analytics/test/test_request.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/segment/analytics/test/test_request.py b/segment/analytics/test/test_request.py index 54b48be..e0206dd 100644 --- a/segment/analytics/test/test_request.py +++ b/segment/analytics/test/test_request.py @@ -191,14 +191,12 @@ def mock_post_fn(*args, **kwargs): return res with mock.patch('segment.analytics.request._session.post', side_effect=mock_post_fn): - try: + with self.assertRaises(APIError): post('testsecret', oauth_manager=oauth_manager, batch=[{ 'userId': 'userId', 'event': 'python event', 'type': 'track' }]) - except APIError: - pass # Verify clear_token was called oauth_manager.clear_token.assert_called_once() From e1ac2143dff29adcba581564f758c284038568ea Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Wed, 18 Feb 2026 10:46:10 -0500 Subject: [PATCH 04/18] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- segment/analytics/request.py | 5 +++-- segment/analytics/test/test_consumer.py | 4 +--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/segment/analytics/request.py b/segment/analytics/request.py index 511a8a8..38a1be9 100644 --- a/segment/analytics/request.py +++ b/segment/analytics/request.py @@ -5,7 +5,7 @@ import json import base64 from dateutil.tz import tzutc -from requests.auth import HTTPBasicAuth + from requests import sessions from segment.analytics.version import VERSION @@ -30,7 +30,8 @@ def parse_retry_after(response): try: # Try parsing as integer (delay in seconds) delay = int(retry_after) - return min(delay, MAX_RETRY_AFTER_SECONDS) + # Ensure delay is non-negative before applying upper bound + return min(max(delay, 0), MAX_RETRY_AFTER_SECONDS) except ValueError: # Could be HTTP-date format, but for simplicity we'll skip that # Most APIs use integer seconds diff --git a/segment/analytics/test/test_consumer.py b/segment/analytics/test/test_consumer.py index 1b9718b..3e8a8eb 100644 --- a/segment/analytics/test/test_consumer.py +++ b/segment/analytics/test/test_consumer.py @@ -490,10 +490,8 @@ def mock_post_fn(*args, **kwargs): raise FatalError('Fatal error occurred') with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): - try: + with self.assertRaises(FatalError): consumer.request([track]) - except FatalError: - pass # Should only be called once (no retries) self.assertEqual(call_count, 1) From 6a10b7c314451cfadb216cb31454d3f4aab264ab Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Wed, 18 Feb 2026 21:36:20 -0500 Subject: [PATCH 05/18] Address PR review feedback - Extract duplicate exponential backoff calculation into helper function - Add upper bound (max_total_attempts) to prevent infinite retry loops with Retry-After - Improves code maintainability and prevents edge case of continuous Retry-After responses Co-Authored-By: Claude Opus 4.6 --- segment/analytics/consumer.py | 35 ++++++++++++++++++++----- segment/analytics/test/test_consumer.py | 28 +++++++++----------- 2 files changed, 41 insertions(+), 22 deletions(-) diff --git a/segment/analytics/consumer.py b/segment/analytics/consumer.py index db0fe9d..1d82201 100644 --- a/segment/analytics/consumer.py +++ b/segment/analytics/consumer.py @@ -140,9 +140,22 @@ def should_use_retry_after(status): """Check if status code should respect Retry-After header""" return status in (408, 429, 503) + def calculate_backoff_delay(attempt): + """ + Calculate exponential backoff delay with jitter. + First retry is immediate, then 0.5s, 1s, 2s, 4s, etc. + """ + if attempt == 1: + return 0 # First retry is immediate + base_delay = 0.5 * (2 ** (attempt - 2)) + jitter = random.uniform(0, 0.1 * base_delay) + return min(base_delay + jitter, 60) # Cap at 60 seconds + total_attempts = 0 backoff_attempts = 0 max_backoff_attempts = self.retries + 1 + # Prevent infinite retry loops even with Retry-After + max_total_attempts = max_backoff_attempts * 10 while True: try: @@ -168,6 +181,13 @@ def should_use_retry_after(status): except APIError as e: total_attempts += 1 + # Prevent infinite retry loops + if total_attempts >= max_total_attempts: + self.log.error( + f"Maximum total attempts ({max_total_attempts}) reached after {total_attempts} attempts. Final error: {e}" + ) + raise + # Check if we should use Retry-After header if should_use_retry_after(e.status) and e.response: retry_after = parse_retry_after(e.response) @@ -194,9 +214,7 @@ def should_use_retry_after(status): raise # Calculate exponential backoff delay with jitter - base_delay = 0.5 * (2 ** (backoff_attempts - 1)) - jitter = random.uniform(0, 0.1 * base_delay) - delay = min(base_delay + jitter, 60) # Cap at 60 seconds + delay = calculate_backoff_delay(backoff_attempts) self.log.debug( f"Retry attempt {backoff_attempts}/{self.retries} (total attempts: {total_attempts}) " @@ -209,6 +227,13 @@ def should_use_retry_after(status): total_attempts += 1 backoff_attempts += 1 + # Prevent infinite retry loops + if total_attempts >= max_total_attempts: + self.log.error( + f"Maximum total attempts ({max_total_attempts}) reached after {total_attempts} attempts. Final error: {e}" + ) + raise + if backoff_attempts >= max_backoff_attempts: self.log.error( f"All {self.retries} retries exhausted after {total_attempts} total attempts. Final error: {e}" @@ -216,9 +241,7 @@ def should_use_retry_after(status): raise # Calculate exponential backoff delay with jitter - base_delay = 0.5 * (2 ** (backoff_attempts - 1)) - jitter = random.uniform(0, 0.1 * base_delay) - delay = min(base_delay + jitter, 60) # Cap at 60 seconds + delay = calculate_backoff_delay(backoff_attempts) self.log.debug( f"Network error retry {backoff_attempts}/{self.retries} (total attempts: {total_attempts}) " diff --git a/segment/analytics/test/test_consumer.py b/segment/analytics/test/test_consumer.py index 3e8a8eb..df4b531 100644 --- a/segment/analytics/test/test_consumer.py +++ b/segment/analytics/test/test_consumer.py @@ -469,13 +469,12 @@ def mock_sleep(duration): self.assertEqual(len(sleep_durations), 3) # Delays should be increasing (exponential) - # First: ~0.5s, Second: ~1s, Third: ~2s (with jitter) - self.assertGreater(sleep_durations[0], 0.4) - self.assertLess(sleep_durations[0], 1.0) - self.assertGreater(sleep_durations[1], 0.9) - self.assertLess(sleep_durations[1], 2.0) - self.assertGreater(sleep_durations[2], 1.8) - self.assertLess(sleep_durations[2], 4.0) + # First: 0s (immediate), Second: ~0.5s, Third: ~1s (with jitter) + self.assertEqual(sleep_durations[0], 0) # First retry is immediate + self.assertGreater(sleep_durations[1], 0.4) + self.assertLess(sleep_durations[1], 0.6) + self.assertGreater(sleep_durations[2], 0.9) + self.assertLess(sleep_durations[2], 1.2) def test_fatal_error_not_retried(self): """Test that FatalError is not retried""" @@ -572,11 +571,10 @@ def mock_sleep(duration): self.assertEqual(call_count, 2) self.assertEqual(retry_counts, [0, 1]) - # Should use backoff delay (around 0.5s with jitter) + # First retry should be immediate (0s delay) self.assertIsNotNone(sleep_duration) if sleep_duration is not None: - self.assertGreater(sleep_duration, 0.4) - self.assertLess(sleep_duration, 1.0) + self.assertEqual(sleep_duration, 0) def test_408_without_retry_after_uses_backoff(self): """T10: 408 without Retry-After header uses backoff retry""" @@ -613,11 +611,10 @@ def mock_sleep(duration): self.assertEqual(call_count, 2) self.assertEqual(retry_counts, [0, 1]) - # Should use backoff delay + # First retry should be immediate (0s delay) self.assertIsNotNone(sleep_duration) if sleep_duration is not None: - self.assertGreater(sleep_duration, 0.4) - self.assertLess(sleep_duration, 1.0) + self.assertEqual(sleep_duration, 0) def test_network_error_retried_with_backoff(self): """T15: Network/IO error is retried with backoff""" @@ -651,11 +648,10 @@ def mock_sleep(duration): self.assertEqual(call_count, 2) self.assertEqual(retry_counts, [0, 1]) - # Should use backoff delay + # First retry should be immediate (0s delay) self.assertIsNotNone(sleep_duration) if sleep_duration is not None: - self.assertGreater(sleep_duration, 0.4) - self.assertLess(sleep_duration, 1.0) + self.assertEqual(sleep_duration, 0) def test_511_is_retryable(self): """T05: 511 status code is retryable (part of 5xx family, not in non-retryable list)""" From 6ed8ef6a005d23590c0908fa28c933c5762cddd4 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Wed, 25 Feb 2026 11:16:33 -0500 Subject: [PATCH 06/18] Implement unified HTTP response handling per SDD - Remove 408/503 from Retry-After eligibility (only 429 uses Retry-After) - Add rate-limit state to Consumer (rate_limited_until, rate_limit_start_time) - 429 with Retry-After: set rate-limit state, raise to caller for requeue - 429 without Retry-After: counted backoff (not pipeline blocking) - Add maxTotalBackoffDuration / maxRateLimitDuration config (default 43200s) - upload() checks rate-limit state before request(), enforces duration limit - 511 OAuth gating: only retry when OauthManager is configured - Add tests: T04, T17, T19, T20; update 429/408/503 behavior tests Co-Authored-By: Claude Opus 4.6 --- segment/analytics/client.py | 15 +- segment/analytics/consumer.py | 145 ++++++++--- segment/analytics/test/test_consumer.py | 328 +++++++++++++++--------- 3 files changed, 332 insertions(+), 156 deletions(-) diff --git a/segment/analytics/client.py b/segment/analytics/client.py index 79262e9..5cb0973 100644 --- a/segment/analytics/client.py +++ b/segment/analytics/client.py @@ -31,6 +31,8 @@ class DefaultConfig(object): gzip = False timeout = 15 max_retries = 1000 + max_total_backoff_duration = 43200 + max_rate_limit_duration = 43200 proxies = None thread = 1 upload_interval = 0.5 @@ -65,9 +67,16 @@ def __init__(self, oauth_client_key=DefaultConfig.oauth_client_key, oauth_key_id=DefaultConfig.oauth_key_id, oauth_auth_server=DefaultConfig.oauth_auth_server, - oauth_scope=DefaultConfig.oauth_scope,): + oauth_scope=DefaultConfig.oauth_scope, + max_total_backoff_duration=DefaultConfig.max_total_backoff_duration, + max_rate_limit_duration=DefaultConfig.max_rate_limit_duration,): require('write_key', write_key, str) + if max_total_backoff_duration is not None and max_total_backoff_duration < 0: + raise ValueError('max_total_backoff_duration must be non-negative') + if max_rate_limit_duration is not None and max_rate_limit_duration < 0: + raise ValueError('max_rate_limit_duration must be non-negative') + self.queue = queue.Queue(max_queue_size) self.write_key = write_key self.on_error = on_error @@ -78,6 +87,8 @@ def __init__(self, self.gzip = gzip self.timeout = timeout self.proxies = proxies + self.max_total_backoff_duration = max_total_backoff_duration + self.max_rate_limit_duration = max_rate_limit_duration self.oauth_manager = None if(oauth_client_id and oauth_client_key and oauth_key_id): self.oauth_manager = OauthManager(oauth_client_id, oauth_client_key, oauth_key_id, @@ -110,6 +121,8 @@ def __init__(self, upload_size=upload_size, upload_interval=upload_interval, gzip=gzip, retries=max_retries, timeout=timeout, proxies=proxies, oauth_manager=self.oauth_manager, + max_total_backoff_duration=max_total_backoff_duration, + max_rate_limit_duration=max_rate_limit_duration, ) self.consumers.append(consumer) diff --git a/segment/analytics/consumer.py b/segment/analytics/consumer.py index 1d82201..e9a9ef7 100644 --- a/segment/analytics/consumer.py +++ b/segment/analytics/consumer.py @@ -14,6 +14,10 @@ # lower to leave space for extra data that will be added later, eg. "sentAt". BATCH_SIZE_LIMIT = 475000 +# Default duration limits (12 hours in seconds) +DEFAULT_MAX_TOTAL_BACKOFF_DURATION = 43200 +DEFAULT_MAX_RATE_LIMIT_DURATION = 43200 + class FatalError(Exception): def __init__(self, message): @@ -30,7 +34,9 @@ class Consumer(Thread): def __init__(self, queue, write_key, upload_size=100, host=None, on_error=None, upload_interval=0.5, gzip=False, retries=1000, - timeout=15, proxies=None, oauth_manager=None): + timeout=15, proxies=None, oauth_manager=None, + max_total_backoff_duration=DEFAULT_MAX_TOTAL_BACKOFF_DURATION, + max_rate_limit_duration=DEFAULT_MAX_RATE_LIMIT_DURATION): """Create a consumer thread.""" Thread.__init__(self) # Make consumer a daemon thread so that it doesn't block program exit @@ -51,6 +57,12 @@ def __init__(self, queue, write_key, upload_size=100, host=None, self.timeout = timeout self.proxies = proxies self.oauth_manager = oauth_manager + self.max_total_backoff_duration = max_total_backoff_duration + self.max_rate_limit_duration = max_rate_limit_duration + + # Rate-limit state + self.rate_limited_until = None + self.rate_limit_start_time = None def run(self): """Runs the consumer.""" @@ -64,6 +76,19 @@ def pause(self): """Pause the consumer.""" self.running = False + def set_rate_limit_state(self, response): + """Set rate-limit state from a 429 response with a valid Retry-After header.""" + retry_after = parse_retry_after(response) if response else None + if retry_after: + self.rate_limited_until = time.time() + retry_after + if self.rate_limit_start_time is None: + self.rate_limit_start_time = time.time() + + def clear_rate_limit_state(self): + """Clear rate-limit state after successful request or duration exceeded.""" + self.rate_limited_until = None + self.rate_limit_start_time = None + def upload(self): """Upload the next batch of items, return whether successful.""" success = False @@ -71,9 +96,57 @@ def upload(self): if len(batch) == 0: return False + # Check rate-limit state before attempting upload + if self.rate_limited_until is not None: + now = time.time() + + # Check if maxRateLimitDuration has been exceeded + if (self.rate_limit_start_time is not None and + now - self.rate_limit_start_time > self.max_rate_limit_duration): + self.log.error( + 'Rate limit duration exceeded (%ds). Clearing rate-limit state and dropping batch.', + self.max_rate_limit_duration + ) + self.clear_rate_limit_state() + # Drop the batch by marking items as done + if self.on_error: + self.on_error( + Exception('Rate limit duration exceeded, batch dropped'), + batch + ) + for _ in batch: + self.queue.task_done() + return False + + # Still rate-limited; wait until the rate limit expires + wait_time = self.rate_limited_until - now + if wait_time > 0: + self.log.debug( + 'Rate-limited. Waiting %.2fs before next upload attempt.', + wait_time + ) + time.sleep(wait_time) + try: self.request(batch) + # Success — clear rate-limit state + self.clear_rate_limit_state() success = True + except APIError as e: + if e.status == 429: + # 429: rate-limit state already set by request(). Re-queue batch. + self.log.debug('429 received. Re-queuing batch and halting upload iteration.') + for item in batch: + try: + self.queue.put(item, block=False) + except Exception: + pass # Queue full, item lost + success = False + else: + self.log.error('error uploading: %s', e) + success = False + if self.on_error: + self.on_error(e, batch) except Exception as e: self.log.error('error uploading: %s', e) success = False @@ -128,18 +201,19 @@ def is_retryable_status(status): Retryable 4xx: 408, 410, 429, 460 Non-retryable 4xx: 400, 401, 403, 404, 413, 422, and all other 4xx Retryable 5xx: All except 501, 505 + - 511 is only retryable when OauthManager is configured Non-retryable 5xx: 501, 505 """ if 400 <= status < 500: return status in (408, 410, 429, 460) elif 500 <= status < 600: - return status not in (501, 505) + if status in (501, 505): + return False + if status == 511: + return self.oauth_manager is not None + return True return False - def should_use_retry_after(status): - """Check if status code should respect Retry-After header""" - return status in (408, 429, 503) - def calculate_backoff_delay(attempt): """ Calculate exponential backoff delay with jitter. @@ -153,11 +227,11 @@ def calculate_backoff_delay(attempt): total_attempts = 0 backoff_attempts = 0 - max_backoff_attempts = self.retries + 1 - # Prevent infinite retry loops even with Retry-After - max_total_attempts = max_backoff_attempts * 10 + first_failure_time = None while True: + total_attempts += 1 + try: # Make the request with current retry count response = post( @@ -168,7 +242,7 @@ def calculate_backoff_delay(attempt): batch=batch, proxies=self.proxies, oauth_manager=self.oauth_manager, - retry_count=total_attempts + retry_count=total_attempts - 1 ) # Success return response @@ -179,35 +253,35 @@ def calculate_backoff_delay(attempt): raise except APIError as e: - total_attempts += 1 + # 429 with valid Retry-After: set rate-limit state and raise + # to caller (pipeline blocking). Without Retry-After, fall + # through to counted backoff like any other retryable error. + if e.status == 429: + retry_after = parse_retry_after(e.response) if e.response else None + if retry_after is not None: + self.set_rate_limit_state(e.response) + raise - # Prevent infinite retry loops - if total_attempts >= max_total_attempts: + # Check if status is retryable + if not is_retryable_status(e.status): self.log.error( - f"Maximum total attempts ({max_total_attempts}) reached after {total_attempts} attempts. Final error: {e}" + f"Non-retryable error {e.status} after {total_attempts} attempts: {e}" ) raise - # Check if we should use Retry-After header - if should_use_retry_after(e.status) and e.response: - retry_after = parse_retry_after(e.response) - if retry_after: - self.log.debug( - f"Retry-After header present: waiting {retry_after}s (attempt {total_attempts})" - ) - time.sleep(retry_after) - continue # Does not count against backoff budget - - # Check if status is retryable - if not is_retryable_status(e.status): + # Transient error -- per-batch backoff + if first_failure_time is None: + first_failure_time = time.time() + if time.time() - first_failure_time > self.max_total_backoff_duration: self.log.error( - f"Non-retryable error {e.status} after {total_attempts} attempts: {e}" + f"Max total backoff duration ({self.max_total_backoff_duration}s) exceeded " + f"after {total_attempts} attempts. Final error: {e}" ) raise # Count this against backoff attempts backoff_attempts += 1 - if backoff_attempts >= max_backoff_attempts: + if backoff_attempts >= self.retries + 1: self.log.error( f"All {self.retries} retries exhausted after {total_attempts} total attempts. Final error: {e}" ) @@ -224,17 +298,18 @@ def calculate_backoff_delay(attempt): except Exception as e: # Network errors or other exceptions - retry with backoff - total_attempts += 1 - backoff_attempts += 1 - - # Prevent infinite retry loops - if total_attempts >= max_total_attempts: + if first_failure_time is None: + first_failure_time = time.time() + if time.time() - first_failure_time > self.max_total_backoff_duration: self.log.error( - f"Maximum total attempts ({max_total_attempts}) reached after {total_attempts} attempts. Final error: {e}" + f"Max total backoff duration ({self.max_total_backoff_duration}s) exceeded " + f"after {total_attempts} attempts. Final error: {e}" ) raise - if backoff_attempts >= max_backoff_attempts: + backoff_attempts += 1 + + if backoff_attempts >= self.retries + 1: self.log.error( f"All {self.retries} retries exhausted after {total_attempts} total attempts. Final error: {e}" ) diff --git a/segment/analytics/test/test_consumer.py b/segment/analytics/test/test_consumer.py index df4b531..aa9c094 100644 --- a/segment/analytics/test/test_consumer.py +++ b/segment/analytics/test/test_consumer.py @@ -144,7 +144,7 @@ def test_request_retry(self): self._test_request_retry(consumer, APIError( 500, 'code', 'Internal Server Error'), 2) - # we should retry on HTTP 429 errors + # 429 without Retry-After uses counted backoff (like other retryable errors) consumer = Consumer(None, 'testsecret') self._test_request_retry(consumer, APIError( 429, 'code', 'Too Many Requests'), 2) @@ -266,7 +266,7 @@ def mock_post_fn(*args, **kwargs): self.assertEqual(call_count, 1, f'Status {status_code} should not be retried') def test_retryable_4xx_status_codes(self): - """Test that retryable 4xx errors are retried""" + """Test that retryable 4xx errors are retried (429 without Retry-After uses backoff too)""" consumer = Consumer(None, 'testsecret', retries=3) track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} @@ -339,107 +339,82 @@ def mock_post_fn(*args, **kwargs): # Should have been called 3 times self.assertEqual(call_count, 3, f'Status {status_code} should be retried') - def test_retry_after_header_support(self): - """Test that Retry-After header is respected and doesn't count against retry budget""" + def test_429_sets_rate_limit_state_with_retry_after(self): + """Test that 429 with Retry-After sets rate_limited_until on consumer""" consumer = Consumer(None, 'testsecret', retries=2) track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} - call_count = 0 - sleep_durations = [] - def mock_post_fn(*args, **kwargs): - nonlocal call_count - call_count += 1 - - if call_count <= 3: - # Return 429 with Retry-After for first 3 attempts - response = mock.Mock() - response.headers = {'Retry-After': '10'} - error = APIError(429, 'rate_limit', 'Too Many Requests') - error.response = response - raise error - - # Success on 4th attempt - return mock.Mock(status_code=200) - - def mock_sleep(duration): - sleep_durations.append(duration) + response = mock.Mock() + response.headers = {'Retry-After': '10'} + error = APIError(429, 'rate_limit', 'Too Many Requests') + error.response = response + raise error with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): - with mock.patch('time.sleep', side_effect=mock_sleep): + with self.assertRaises(APIError) as ctx: consumer.request([track]) + self.assertEqual(ctx.exception.status, 429) - # Should succeed after 4 attempts (3 Retry-After, then success) - self.assertEqual(call_count, 4) - - # First 3 sleeps should be for Retry-After (10 seconds each) - self.assertEqual(sleep_durations[:3], [10, 10, 10]) + # Rate-limit state should be set + self.assertIsNotNone(consumer.rate_limited_until) + self.assertIsNotNone(consumer.rate_limit_start_time) + # rate_limited_until should be ~10 seconds in the future + self.assertGreater(consumer.rate_limited_until, time.time() + 5) def test_retry_after_capped_at_300_seconds(self): - """Test that Retry-After delay is capped at 300 seconds""" + """Test that Retry-After delay is capped at 300 seconds when setting rate-limit state""" consumer = Consumer(None, 'testsecret', retries=2) track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} - call_count = 0 - sleep_duration = None - def mock_post_fn(*args, **kwargs): - nonlocal call_count - call_count += 1 - - if call_count == 1: - # Return 429 with large Retry-After - response = mock.Mock() - response.headers = {'Retry-After': '600'} # 10 minutes - error = APIError(429, 'rate_limit', 'Too Many Requests') - error.response = response - raise error - - # Success on 2nd attempt - return mock.Mock(status_code=200) - - def mock_sleep(duration): - nonlocal sleep_duration - sleep_duration = duration + response = mock.Mock() + response.headers = {'Retry-After': '600'} # 10 minutes + error = APIError(429, 'rate_limit', 'Too Many Requests') + error.response = response + raise error + now = time.time() with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): - with mock.patch('time.sleep', side_effect=mock_sleep): + with self.assertRaises(APIError): consumer.request([track]) - # Sleep should be capped at 300 seconds - self.assertEqual(sleep_duration, 300) + # rate_limited_until should be capped at ~300s from now (not 600s) + self.assertIsNotNone(consumer.rate_limited_until) + self.assertLessEqual(consumer.rate_limited_until, now + 310) + self.assertGreater(consumer.rate_limited_until, now + 290) - def test_retry_after_for_408_and_503(self): - """Test that Retry-After is respected for 408 and 503 status codes""" - consumer = Consumer(None, 'testsecret', retries=2) + def test_408_and_503_use_backoff(self): + """Test that 408 and 503 use exponential backoff""" track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} for status_code in [408, 503]: + consumer = Consumer(None, 'testsecret', retries=2) call_count = 0 - sleep_duration = None + sleep_durations = [] def mock_post_fn(*args, **kwargs): nonlocal call_count call_count += 1 - if call_count == 1: response = mock.Mock() response.headers = {'Retry-After': '5'} error = APIError(status_code, 'error', 'Error') error.response = response raise error - return mock.Mock(status_code=200) def mock_sleep(duration): - nonlocal sleep_duration - sleep_duration = duration + sleep_durations.append(duration) with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): with mock.patch('time.sleep', side_effect=mock_sleep): consumer.request([track]) - self.assertEqual(sleep_duration, 5, f'Retry-After should be respected for {status_code}') + # Should use backoff delay (0 for first retry), NOT the Retry-After value of 5 + self.assertEqual(call_count, 2) + self.assertEqual(len(sleep_durations), 1) + self.assertEqual(sleep_durations[0], 0, f'{status_code} should use backoff, not Retry-After') def test_exponential_backoff_with_jitter(self): """Test that exponential backoff is used for retries without Retry-After""" @@ -536,45 +511,31 @@ def mock_post_fn(*args, **kwargs): # First request should have retry_count=0 self.assertEqual(retry_count, 0) - def test_429_without_retry_after_uses_backoff(self): - """T09: 429 without Retry-After header uses backoff retry""" - consumer = Consumer(None, 'testsecret', retries=3) + def test_429_without_retry_after_uses_counted_backoff(self): + """429 without Retry-After uses counted backoff (not pipeline blocking)""" + consumer = Consumer(None, 'testsecret', retries=2) track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} call_count = 0 - retry_counts = [] - sleep_duration = None def mock_post_fn(*args, **kwargs): nonlocal call_count call_count += 1 - retry_counts.append(kwargs.get('retry_count', 0)) - - if call_count == 1: - # 429 without Retry-After header + if call_count < 3: error = APIError(429, 'rate_limit', 'Too Many Requests') error.response = mock.Mock() error.response.headers = {} # No Retry-After raise error - return mock.Mock(status_code=200) - def mock_sleep(duration): - nonlocal sleep_duration - sleep_duration = duration - with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): - with mock.patch('time.sleep', side_effect=mock_sleep): + with mock.patch('time.sleep'): consumer.request([track]) - # Should have two attempts - self.assertEqual(call_count, 2) - self.assertEqual(retry_counts, [0, 1]) - - # First retry should be immediate (0s delay) - self.assertIsNotNone(sleep_duration) - if sleep_duration is not None: - self.assertEqual(sleep_duration, 0) + # Should retry with backoff (3 calls: initial + 2 retries) + self.assertEqual(call_count, 3) + # Rate-limit state should NOT be set (no pipeline blocking) + self.assertIsNone(consumer.rate_limited_until) def test_408_without_retry_after_uses_backoff(self): """T10: 408 without Retry-After header uses backoff retry""" @@ -653,71 +614,70 @@ def mock_sleep(duration): if sleep_duration is not None: self.assertEqual(sleep_duration, 0) - def test_511_is_retryable(self): - """T05: 511 status code is retryable (part of 5xx family, not in non-retryable list)""" + def test_511_not_retryable_without_oauth(self): + """T17: 511 is NOT retried when OauthManager is not configured""" consumer = Consumer(None, 'testsecret', retries=3) track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} call_count = 0 - retry_counts = [] def mock_post_fn(*args, **kwargs): nonlocal call_count call_count += 1 - retry_counts.append(kwargs.get('retry_count', 0)) + raise APIError(511, 'auth_required', 'Network Authentication Required') + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with self.assertRaises(APIError) as ctx: + consumer.request([track]) + self.assertEqual(ctx.exception.status, 511) + + # Should only be called once (not retried without OAuth) + self.assertEqual(call_count, 1) + + def test_511_retryable_with_oauth(self): + """T17: 511 IS retried when OauthManager is configured""" + oauth_manager = mock.Mock() + consumer = Consumer(None, 'testsecret', retries=3, oauth_manager=oauth_manager) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + call_count = 0 + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 if call_count < 3: raise APIError(511, 'auth_required', 'Network Authentication Required') - return mock.Mock(status_code=200) with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): - with mock.patch('time.sleep'): # Mock sleep to speed up test + with mock.patch('time.sleep'): consumer.request([track]) - # Should have been called 3 times (511 is retryable) + # Should have been called 3 times (511 is retryable with OAuth) self.assertEqual(call_count, 3) - self.assertEqual(retry_counts, [0, 1, 2]) - def test_retry_after_not_counted_against_backoff_budget(self): - """T17: Retry-After attempts don't consume backoff retry budget""" + def test_429_with_retry_after_does_not_count_against_backoff_budget(self): + """429 with Retry-After raises immediately (pipeline blocking) without consuming backoff budget""" consumer = Consumer(None, 'testsecret', retries=1) track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} call_count = 0 - retry_counts = [] def mock_post_fn(*args, **kwargs): nonlocal call_count call_count += 1 - retry_counts.append(kwargs.get('retry_count', 0)) - - if call_count <= 2: - # First two: 429 with Retry-After (shouldn't count against budget) - response = mock.Mock() - response.headers = {'Retry-After': '1'} - error = APIError(429, 'rate_limit', 'Too Many Requests') - error.response = response - raise error - elif call_count == 3: - # Third: 500 without Retry-After (counts against budget) - raise APIError(500, 'error', 'Server Error') - - # Success on 4th attempt - return mock.Mock(status_code=200) + error = APIError(429, 'rate_limit', 'Too Many Requests') + error.response = mock.Mock() + error.response.headers = {'Retry-After': '1'} + raise error with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): - with mock.patch('time.sleep'): # Mock sleep to speed up test + with self.assertRaises(APIError) as ctx: consumer.request([track]) + self.assertEqual(ctx.exception.status, 429) - # Should succeed after 4 attempts: - # - 2 Retry-After attempts (don't count against budget) - # - 1 backoff attempt (counts against budget = 1) - # - 1 final backoff attempt (counts against budget = 1, limit reached) - # Actually wait, with retries=1, we have max_backoff_attempts=2 - # So: 2 Retry-After + 2 backoff attempts = 4 total - self.assertEqual(call_count, 4) - self.assertEqual(retry_counts, [0, 1, 2, 3]) + # 429 with Retry-After raises on first attempt (pipeline blocking) + self.assertEqual(call_count, 1) def test_413_payload_too_large_not_retried(self): """T12: 413 Payload Too Large is non-retryable (won't succeed on retry)""" @@ -739,3 +699,131 @@ def mock_post_fn(*args, **kwargs): # Should only be called once (no retries) self.assertEqual(call_count, 1) + + def test_t04_429_halts_upload_iteration(self): + """T04: 429 halts current upload iteration — batch is re-queued, not dropped""" + q = Queue() + consumer = Consumer(q, 'testsecret', retries=3) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + # Put a message in the queue + q.put(track) + + call_count = 0 + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + response = mock.Mock() + response.headers = {'Retry-After': '10'} + error = APIError(429, 'rate_limit', 'Too Many Requests') + error.response = response + raise error + + on_error_called = [] + + def on_error(e, batch): + on_error_called.append((e, batch)) + + consumer.on_error = on_error + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep'): + result = consumer.upload() + + # upload() should return False (not successful) + self.assertFalse(result) + # request() should have been called exactly once + self.assertEqual(call_count, 1) + # on_error should NOT have been called (batch was re-queued, not dropped) + self.assertEqual(len(on_error_called), 0) + # Rate-limit state should be set + self.assertIsNotNone(consumer.rate_limited_until) + self.assertIsNotNone(consumer.rate_limit_start_time) + + def test_t19_max_total_backoff_duration(self): + """T19: Gives up after maxTotalBackoffDuration elapsed""" + consumer = Consumer(None, 'testsecret', retries=1000, + max_total_backoff_duration=5) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + call_count = 0 + fake_time = [100.0] # Start time + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + raise APIError(500, 'error', 'Server Error') + + original_time = time.time + + def mock_time(): + # Advance time by 3 seconds on each call after the first + result = fake_time[0] + fake_time[0] += 3.0 + return result + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep'): + with mock.patch('time.time', side_effect=mock_time): + with self.assertRaises(APIError) as ctx: + consumer.request([track]) + self.assertEqual(ctx.exception.status, 500) + + # With max_total_backoff_duration=5 and time advancing 3s per call: + # Attempt 1: fails, first_failure_time set at 100, time now 103 + # Attempt 2: fails, time is 106, 106-100=6 > 5, exceeds duration + # So should be called exactly 2 times + self.assertEqual(call_count, 2) + + def test_t20_max_rate_limit_duration(self): + """T20: Rate-limited state clears and batch is dropped after maxRateLimitDuration""" + q = Queue() + consumer = Consumer(q, 'testsecret', retries=3, + max_rate_limit_duration=10) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + # Pre-set rate-limit state as if we entered it 15 seconds ago + now = time.time() + consumer.rate_limit_start_time = now - 15 # 15s ago, exceeds 10s limit + consumer.rate_limited_until = now + 5 # Would still be rate-limited + + # Put a message in the queue + q.put(track) + + on_error_called = [] + + def on_error(e, batch): + on_error_called.append((e, batch)) + + consumer.on_error = on_error + + # upload() should detect duration exceeded, clear state, drop batch + result = consumer.upload() + + self.assertFalse(result) + # Rate-limit state should be cleared + self.assertIsNone(consumer.rate_limited_until) + self.assertIsNone(consumer.rate_limit_start_time) + # on_error should have been called (batch was dropped) + self.assertEqual(len(on_error_called), 1) + + def test_rate_limit_state_cleared_on_success(self): + """Rate-limit state is cleared after a successful request""" + q = Queue() + consumer = Consumer(q, 'testsecret', retries=3) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + # Set rate-limit state + consumer.rate_limited_until = time.time() - 1 # Already expired + consumer.rate_limit_start_time = time.time() - 10 + + q.put(track) + + with mock.patch('segment.analytics.consumer.post', return_value=mock.Mock(status_code=200)): + result = consumer.upload() + + self.assertTrue(result) + # Rate-limit state should be cleared on success + self.assertIsNone(consumer.rate_limited_until) + self.assertIsNone(consumer.rate_limit_start_time) From a28102fabe4b2ead37662fbb4d22c54c3b98bc79 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Wed, 25 Feb 2026 14:01:22 -0500 Subject: [PATCH 07/18] Fix Retry-After: 0 handling and 429 re-queue guard Handle Retry-After: 0 correctly by checking 'is not None' instead of truthiness. Prevent silent batch re-queue on 429 without Retry-After by gating the upload() re-queue path on rate_limited_until being set. Co-Authored-By: Claude Opus 4.6 --- segment/analytics/consumer.py | 4 +- segment/analytics/test/test_consumer.py | 53 +++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/segment/analytics/consumer.py b/segment/analytics/consumer.py index e9a9ef7..b2143f8 100644 --- a/segment/analytics/consumer.py +++ b/segment/analytics/consumer.py @@ -79,7 +79,7 @@ def pause(self): def set_rate_limit_state(self, response): """Set rate-limit state from a 429 response with a valid Retry-After header.""" retry_after = parse_retry_after(response) if response else None - if retry_after: + if retry_after is not None: self.rate_limited_until = time.time() + retry_after if self.rate_limit_start_time is None: self.rate_limit_start_time = time.time() @@ -133,7 +133,7 @@ def upload(self): self.clear_rate_limit_state() success = True except APIError as e: - if e.status == 429: + if e.status == 429 and self.rate_limited_until is not None: # 429: rate-limit state already set by request(). Re-queue batch. self.log.debug('429 received. Re-queuing batch and halting upload iteration.') for item in batch: diff --git a/segment/analytics/test/test_consumer.py b/segment/analytics/test/test_consumer.py index aa9c094..6fbe9c0 100644 --- a/segment/analytics/test/test_consumer.py +++ b/segment/analytics/test/test_consumer.py @@ -741,6 +741,59 @@ def on_error(e, batch): self.assertIsNotNone(consumer.rate_limited_until) self.assertIsNotNone(consumer.rate_limit_start_time) + def test_429_without_retry_after_does_not_requeue_batch(self): + """429 without Retry-After is treated as normal failure in upload() and is not re-queued""" + q = Queue() + consumer = Consumer(q, 'testsecret', retries=0) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + q.put(track) + + def mock_post_fn(*args, **kwargs): + error = APIError(429, 'rate_limit', 'Too Many Requests') + error.response = mock.Mock() + error.response.headers = {} + raise error + + on_error_called = [] + + def on_error(e, batch): + on_error_called.append((e, batch)) + + consumer.on_error = on_error + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep'): + result = consumer.upload() + + self.assertFalse(result) + self.assertEqual(len(on_error_called), 1) + self.assertIsNone(consumer.rate_limited_until) + self.assertEqual(q.qsize(), 0) + + def test_retry_after_zero_sets_rate_limit_state(self): + """429 with Retry-After: 0 still sets rate-limit state for consistent pipeline handling""" + consumer = Consumer(None, 'testsecret', retries=1) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + def mock_post_fn(*args, **kwargs): + response = mock.Mock() + response.headers = {'Retry-After': '0'} + error = APIError(429, 'rate_limit', 'Too Many Requests') + error.response = response + raise error + + before = time.time() + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with self.assertRaises(APIError) as ctx: + consumer.request([track]) + self.assertEqual(ctx.exception.status, 429) + after = time.time() + + self.assertIsNotNone(consumer.rate_limited_until) + self.assertIsNotNone(consumer.rate_limit_start_time) + self.assertGreaterEqual(consumer.rate_limited_until, before) + self.assertLessEqual(consumer.rate_limited_until, after + 0.1) + def test_t19_max_total_backoff_duration(self): """T19: Gives up after maxTotalBackoffDuration elapsed""" consumer = Consumer(None, 'testsecret', retries=1000, From 115d124455a70fb2ce719d470c80ae346b42cb39 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Wed, 25 Feb 2026 16:32:53 -0500 Subject: [PATCH 08/18] Address PR review: catch KeyError in response parsing - Add KeyError to except clause when parsing JSON response to handle missing 'code' or 'message' keys - Add explanatory comment on pre-existing except-pass pattern Co-Authored-By: Claude Opus 4.6 --- segment/analytics/request.py | 2 +- segment/analytics/test/test_consumer.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/segment/analytics/request.py b/segment/analytics/request.py index 38a1be9..600fda3 100644 --- a/segment/analytics/request.py +++ b/segment/analytics/request.py @@ -100,7 +100,7 @@ def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manag payload = res.json() log.debug('received response: %s', payload) raise APIError(res.status_code, payload['code'], payload['message'], res) - except ValueError: + except (ValueError, KeyError): log.error('Unknown error: [%s] %s', res.status_code, res.reason) raise APIError(res.status_code, 'unknown', res.text, res) diff --git a/segment/analytics/test/test_consumer.py b/segment/analytics/test/test_consumer.py index 6fbe9c0..a197843 100644 --- a/segment/analytics/test/test_consumer.py +++ b/segment/analytics/test/test_consumer.py @@ -155,7 +155,7 @@ def test_request_retry(self): try: self._test_request_retry(consumer, api_error, 1) except APIError: - pass + pass # Expected: 400 is non-retryable, so the error propagates here else: self.fail('request() should not retry on client errors') From 2474b4692c8dcded711ade76e6979e66fdf9f8ab Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Wed, 25 Feb 2026 17:16:54 -0500 Subject: [PATCH 09/18] Enabling retry e2e test set --- e2e-cli/e2e-config.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e-cli/e2e-config.json b/e2e-cli/e2e-config.json index b0ccf30..e1a02d5 100644 --- a/e2e-cli/e2e-config.json +++ b/e2e-cli/e2e-config.json @@ -1,6 +1,6 @@ { "sdk": "python", - "test_suites": "basic", + "test_suites": "basic,retry", "auto_settings": false, "patch": null, "env": {} From 98df1bcea0d6e82a1761de7361b5258ae4686d48 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Thu, 26 Feb 2026 16:48:22 -0500 Subject: [PATCH 10/18] Fix Retry-After header never parsed on non-2xx responses requests.Response.__bool__() returns False for non-2xx status codes. The checks `if e.response` and `if response` evaluated to False for 429 responses, so parse_retry_after() was never called and the SDK fell back to normal backoff instead of respecting Retry-After. Changed both checks to explicit `is not None` comparisons. Co-Authored-By: Claude Opus 4.6 --- segment/analytics/consumer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/segment/analytics/consumer.py b/segment/analytics/consumer.py index b2143f8..adf0565 100644 --- a/segment/analytics/consumer.py +++ b/segment/analytics/consumer.py @@ -78,7 +78,7 @@ def pause(self): def set_rate_limit_state(self, response): """Set rate-limit state from a 429 response with a valid Retry-After header.""" - retry_after = parse_retry_after(response) if response else None + retry_after = parse_retry_after(response) if response is not None else None if retry_after is not None: self.rate_limited_until = time.time() + retry_after if self.rate_limit_start_time is None: @@ -257,7 +257,7 @@ def calculate_backoff_delay(attempt): # to caller (pipeline blocking). Without Retry-After, fall # through to counted backoff like any other retryable error. if e.status == 429: - retry_after = parse_retry_after(e.response) if e.response else None + retry_after = parse_retry_after(e.response) if e.response is not None else None if retry_after is not None: self.set_rate_limit_state(e.response) raise From ec0481fb7ee9a3b7ba3927a89e1988e80e00b63b Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Thu, 26 Feb 2026 18:20:02 -0500 Subject: [PATCH 11/18] Wire on_error callback in e2e-cli for failure reporting Add on_error handler to capture delivery failures from the SDK. Reports success=false with the first error message when any batch fails (non-retryable error or retries exhausted). Co-Authored-By: Claude Opus 4.6 --- e2e-cli/src/cli.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/e2e-cli/src/cli.py b/e2e-cli/src/cli.py index ef242f6..d6501c8 100644 --- a/e2e-cli/src/cli.py +++ b/e2e-cli/src/cli.py @@ -70,6 +70,10 @@ def run(input_json: str, debug: bool): """Run the E2E CLI with the given input configuration.""" logger = setup_logging(debug) output = {"success": False, "sentBatches": 0, "error": None} + delivery_errors = [] + + def on_error(error, batch): + delivery_errors.append(str(error)) try: data = json.loads(input_json) @@ -96,6 +100,7 @@ def run(input_json: str, debug: bool): write_key=write_key, host=api_host, debug=debug, + on_error=on_error, upload_size=flush_at, upload_interval=flush_interval, max_retries=max_retries, @@ -120,10 +125,12 @@ def run(input_json: str, debug: bool): client.flush() client.join() - output["success"] = True - # Note: We don't have easy access to batch count from the SDK internals - # This could be enhanced if needed - output["sentBatches"] = 1 # Placeholder + if delivery_errors: + output["success"] = False + output["error"] = delivery_errors[0] + else: + output["success"] = True + output["sentBatches"] = 1 except json.JSONDecodeError as e: output["error"] = f"Invalid JSON input: {e}" From ea257f8963ea2503d0bc6160daf70a3e00def539 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Tue, 3 Mar 2026 20:12:52 -0500 Subject: [PATCH 12/18] Consolidate backoff parameters: max retries 1000 -> 10 Align retry configuration with cross-library defaults. Base backoff (500ms) and max backoff (60s) already matched. Co-Authored-By: Claude Opus 4.6 --- segment/analytics/client.py | 2 +- segment/analytics/consumer.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/segment/analytics/client.py b/segment/analytics/client.py index 5cb0973..6bb0ddd 100644 --- a/segment/analytics/client.py +++ b/segment/analytics/client.py @@ -30,7 +30,7 @@ class DefaultConfig(object): max_queue_size = 10000 gzip = False timeout = 15 - max_retries = 1000 + max_retries = 10 max_total_backoff_duration = 43200 max_rate_limit_duration = 43200 proxies = None diff --git a/segment/analytics/consumer.py b/segment/analytics/consumer.py index adf0565..1b84bfa 100644 --- a/segment/analytics/consumer.py +++ b/segment/analytics/consumer.py @@ -33,7 +33,7 @@ class Consumer(Thread): log = logging.getLogger('segment') def __init__(self, queue, write_key, upload_size=100, host=None, - on_error=None, upload_interval=0.5, gzip=False, retries=1000, + on_error=None, upload_interval=0.5, gzip=False, retries=10, timeout=15, proxies=None, oauth_manager=None, max_total_backoff_duration=DEFAULT_MAX_TOTAL_BACKOFF_DURATION, max_rate_limit_duration=DEFAULT_MAX_RATE_LIMIT_DURATION): From 12d2f085411ac4ed5fceee1f2f9c49666f71eb4c Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Fri, 20 Mar 2026 12:21:05 -0400 Subject: [PATCH 13/18] Omit X-Retry-Count header on first attempt, send only on retries First request (retry_count=0) no longer includes the header. Retries with retry_count > 0 continue to send X-Retry-Count: 1, 2, 3, etc. Co-Authored-By: Claude Opus 4.6 --- segment/analytics/request.py | 3 ++- segment/analytics/test/test_request.py | 7 +++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/segment/analytics/request.py b/segment/analytics/request.py index 600fda3..2c7916b 100644 --- a/segment/analytics/request.py +++ b/segment/analytics/request.py @@ -54,8 +54,9 @@ def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manag headers = { 'Content-Type': 'application/json', 'User-Agent': 'analytics-python/' + VERSION, - 'X-Retry-Count': str(retry_count) } + if retry_count > 0: + headers['X-Retry-Count'] = str(retry_count) # Add Authorization header - prefer OAuth Bearer token, fallback to Basic auth if auth: diff --git a/segment/analytics/test/test_request.py b/segment/analytics/test/test_request.py index e0206dd..c5d6b9d 100644 --- a/segment/analytics/test/test_request.py +++ b/segment/analytics/test/test_request.py @@ -120,14 +120,14 @@ def mock_post_fn(*args, **kwargs): self.assertEqual(headers['Authorization'], 'Bearer test_token_123') def test_x_retry_count_header(self): - """Test that X-Retry-Count header is included""" + """Test that X-Retry-Count header is omitted on first attempt and included on retries""" def mock_post_fn(*args, **kwargs): res = mock.Mock() res.status_code = 200 return res with mock.patch('segment.analytics.request._session.post', side_effect=mock_post_fn) as mock_post: - # Test with retry_count=0 (first attempt) + # Test with retry_count=0 (first attempt) — header should be absent post('testsecret', retry_count=0, batch=[{ 'userId': 'userId', 'event': 'python event', @@ -136,8 +136,7 @@ def mock_post_fn(*args, **kwargs): args, kwargs = mock_post.call_args headers = kwargs['headers'] - self.assertIn('X-Retry-Count', headers) - self.assertEqual(headers['X-Retry-Count'], '0') + self.assertNotIn('X-Retry-Count', headers) with mock.patch('segment.analytics.request._session.post', side_effect=mock_post_fn) as mock_post: # Test with retry_count=5 From a2fd03be786e1422a5ba42eec98ee8f75002bbf3 Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Fri, 1 May 2026 14:03:34 -0400 Subject: [PATCH 14/18] Treat 2xx and 3xx status codes as success, not just 200 Aligns with the analytics-next Node reference implementation which treats all 200-399 responses as successful delivery. Previously only exact 200 was treated as success, causing 201/204/3xx to be misreported as errors. Co-Authored-By: Claude Opus 4.6 --- segment/analytics/request.py | 2 +- segment/analytics/test/test_request.py | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/segment/analytics/request.py b/segment/analytics/request.py index 2c7916b..098b536 100644 --- a/segment/analytics/request.py +++ b/segment/analytics/request.py @@ -90,7 +90,7 @@ def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manag except Exception as e: raise e - if res.status_code == 200: + if 200 <= res.status_code < 400: log.debug('data uploaded successfully') return res diff --git a/segment/analytics/test/test_request.py b/segment/analytics/test/test_request.py index c5d6b9d..4451dd4 100644 --- a/segment/analytics/test/test_request.py +++ b/segment/analytics/test/test_request.py @@ -150,6 +150,22 @@ def mock_post_fn(*args, **kwargs): headers = kwargs['headers'] self.assertEqual(headers['X-Retry-Count'], '5') + def test_non_200_2xx_treated_as_success(self): + """Test that 2xx and 3xx status codes are treated as success""" + for status_code in [200, 201, 204, 301, 302]: + def mock_post_fn(*args, **kwargs): + res = mock.Mock() + res.status_code = status_code + return res + + with mock.patch('segment.analytics.request._session.post', side_effect=mock_post_fn): + res = post('testsecret', batch=[{ + 'userId': 'userId', + 'event': 'python event', + 'type': 'track' + }]) + self.assertEqual(res.status_code, status_code) + def test_parse_retry_after_integer(self): """Test parsing Retry-After header with integer seconds""" response = mock.Mock() From 8c5680a2c43da8b339a56f74a71a140221c2871d Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Thu, 7 May 2026 13:08:04 -0400 Subject: [PATCH 15/18] Improve e2e-cli setup: Python resolution and devbox docs - Resolve python via activated venv/devbox, fall back to python3 - Use $PYTHON -m pip instead of bare pip (fixes macOS/nix where pip is not on PATH) - Add devbox setup as recommended path in README, with venv fallback instructions --- e2e-cli/README.md | 30 ++++++++++++++++++++++++------ e2e-cli/run-e2e.sh | 18 +++++++++++++++--- 2 files changed, 39 insertions(+), 9 deletions(-) diff --git a/e2e-cli/README.md b/e2e-cli/README.md index 28c602a..1a47b61 100644 --- a/e2e-cli/README.md +++ b/e2e-cli/README.md @@ -2,17 +2,35 @@ E2E test CLI for the [analytics-python](https://github.com/segmentio/analytics-python) SDK. Accepts a JSON input describing events and SDK configuration, sends them through the real SDK, and outputs results as JSON. -## Setup +## Running E2E tests + +### With devbox (recommended) + +```bash +# From repo root — activates Python 3.12 and installs deps automatically +devbox shell + +# Then from e2e-cli dir: +./run-e2e.sh +``` + +### Without devbox + +Requires Python 3.9+ and Node.js 18+. Using a virtualenv is strongly recommended since macOS system Python is externally managed. ```bash -cd e2e-cli python3 -m venv .venv source .venv/bin/activate -pip install -r requirements.txt -pip install -e . +./run-e2e.sh +``` + +### Override sdk-e2e-tests location + +```bash +E2E_TESTS_DIR=../my-e2e-tests ./run-e2e.sh ``` -## Usage +## Manual CLI usage ```bash e2e-cli --input '{"writeKey":"...", ...}' @@ -21,7 +39,7 @@ e2e-cli --input '{"writeKey":"...", ...}' Or without installing: ```bash -python3 -m src.cli --input '{"writeKey":"...", ...}' +python3 src/cli.py --input '{"writeKey":"...", ...}' ``` ## Input Format diff --git a/e2e-cli/run-e2e.sh b/e2e-cli/run-e2e.sh index 533dba9..a90b520 100755 --- a/e2e-cli/run-e2e.sh +++ b/e2e-cli/run-e2e.sh @@ -2,7 +2,9 @@ # # Run E2E tests for analytics-python # -# Prerequisites: Python 3, pip, Node.js 18+ +# Prerequisites: Node.js 18+ and one of: +# - devbox (recommended): run `devbox shell` first, then ./run-e2e.sh +# - Python 3.9+ with a virtualenv already activated # # Usage: # ./run-e2e.sh [extra args passed to run-tests.sh] @@ -17,15 +19,25 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" SDK_ROOT="$SCRIPT_DIR/.." E2E_DIR="${E2E_TESTS_DIR:-$SDK_ROOT/../sdk-e2e-tests}" +# Resolve python and pip — prefer activated venv/devbox python, fall back to python3 +PYTHON="${PYTHON:-$(command -v python || command -v python3)}" +PIP="$PYTHON -m pip" + +if [[ -z "$PYTHON" ]]; then + echo "Error: Python not found. Run 'devbox shell' first or activate a virtualenv." + exit 1 +fi + echo "=== Building analytics-python e2e-cli ===" +echo "Using Python: $PYTHON" # Install SDK cd "$SDK_ROOT" -pip install -e . +$PIP install -e . -q # Install e2e-cli cd "$SCRIPT_DIR" -pip install -e . +$PIP install -e . -q echo "" From ad198ef8e60295d84bc3d71e4d20f56c67c4a37b Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Mon, 11 May 2026 16:27:47 -0400 Subject: [PATCH 16/18] Address all issues from deep code review - Remove unused backoff dependency from setup.py and requirements.txt - Reject None for max_total_backoff_duration/max_rate_limit_duration in client.py - Fix off-by-one: use >= for max_total_backoff_duration check - Fix Retry-After: 0 tight loop: fall through to counted backoff instead of pipeline-blocking - Log and call on_error when queue is full during 429 re-queue - Document flush() blocking behavior in docstring - Add comment explaining 410 retryable parity with Node SDK - Extract duplicate backoff logic into apply_backoff() helper - Warn on unrecognized Retry-After format (HTTP-date) instead of silently ignoring - Add comments for task_done() invariant and FatalError origin - Add 8 new tests covering all previously missing coverage gaps --- requirements.txt | 1 - segment/analytics/client.py | 15 +- segment/analytics/consumer.py | 125 +++++++--------- segment/analytics/request.py | 8 +- segment/analytics/test/test_consumer.py | 185 ++++++++++++++++++++++-- setup.py | 1 - status-response-updates-deep-review.md | 112 ++++++++++++++ 7 files changed, 346 insertions(+), 101 deletions(-) create mode 100644 status-response-updates-deep-review.md diff --git a/requirements.txt b/requirements.txt index 596c10d..912848b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ -backoff==2.2.1 cryptography==44.0.0 flake8==7.1.1 mock==2.0.0 diff --git a/segment/analytics/client.py b/segment/analytics/client.py index 6bb0ddd..9a8dcdb 100644 --- a/segment/analytics/client.py +++ b/segment/analytics/client.py @@ -72,10 +72,10 @@ def __init__(self, max_rate_limit_duration=DefaultConfig.max_rate_limit_duration,): require('write_key', write_key, str) - if max_total_backoff_duration is not None and max_total_backoff_duration < 0: - raise ValueError('max_total_backoff_duration must be non-negative') - if max_rate_limit_duration is not None and max_rate_limit_duration < 0: - raise ValueError('max_rate_limit_duration must be non-negative') + if max_total_backoff_duration is None or max_total_backoff_duration < 0: + raise ValueError('max_total_backoff_duration must be a non-negative number') + if max_rate_limit_duration is None or max_rate_limit_duration < 0: + raise ValueError('max_rate_limit_duration must be a non-negative number') self.queue = queue.Queue(max_queue_size) self.write_key = write_key @@ -331,7 +331,12 @@ def _enqueue(self, msg): return False, msg def flush(self): - """Forces a flush from the internal queue to the server""" + """Forces a flush from the internal queue to the server. + + Warning: if the consumer is currently rate-limited, this call will + block until the rate limit clears or max_rate_limit_duration elapses + (up to 12 hours by default). + """ queue = self.queue size = queue.qsize() queue.join() diff --git a/segment/analytics/consumer.py b/segment/analytics/consumer.py index 1b84bfa..3748ba1 100644 --- a/segment/analytics/consumer.py +++ b/segment/analytics/consumer.py @@ -136,11 +136,16 @@ def upload(self): if e.status == 429 and self.rate_limited_until is not None: # 429: rate-limit state already set by request(). Re-queue batch. self.log.debug('429 received. Re-queuing batch and halting upload iteration.') + dropped = [] for item in batch: try: self.queue.put(item, block=False) except Exception: - pass # Queue full, item lost + dropped.append(item) + if dropped: + self.log.error('Queue full during 429 re-queue. Dropping %d item(s).', len(dropped)) + if self.on_error: + self.on_error(Exception('Queue full, items dropped during 429 re-queue'), dropped) success = False else: self.log.error('error uploading: %s', e) @@ -153,7 +158,9 @@ def upload(self): if self.on_error: self.on_error(e, batch) finally: - # mark items as acknowledged from queue + # Each item in batch was obtained via queue.get() and must have + # exactly one matching task_done() call — including re-queued items, + # which will produce a new task_done() obligation on their next get(). for _ in batch: self.queue.task_done() return success @@ -196,14 +203,13 @@ def request(self, batch): """Attempt to upload the batch and retry before raising an error""" def is_retryable_status(status): - """ - Determine if a status code is retryable. - Retryable 4xx: 408, 410, 429, 460 - Non-retryable 4xx: 400, 401, 403, 404, 413, 422, and all other 4xx - Retryable 5xx: All except 501, 505 - - 511 is only retryable when OauthManager is configured - Non-retryable 5xx: 501, 505 - """ + # Retryable 4xx: 408, 429, 460 + # 410 Gone: permanently removed, but included for parity with the + # Node.js SDK. Retrying is harmless since the server will keep + # returning 410, and the retry budget caps total attempts. + # Non-retryable 4xx: 400, 401, 403, 404, 413, 422, and all other 4xx + # Retryable 5xx: all except 501, 505 + # 511: only retryable when OauthManager is configured if 400 <= status < 500: return status in (408, 410, 429, 460) elif 500 <= status < 600: @@ -215,15 +221,36 @@ def is_retryable_status(status): return False def calculate_backoff_delay(attempt): - """ - Calculate exponential backoff delay with jitter. - First retry is immediate, then 0.5s, 1s, 2s, 4s, etc. - """ + # First retry is immediate; thereafter 0.5s, 1s, 2s, 4s… capped at 60s if attempt == 1: - return 0 # First retry is immediate + return 0 base_delay = 0.5 * (2 ** (attempt - 2)) jitter = random.uniform(0, 0.1 * base_delay) - return min(base_delay + jitter, 60) # Cap at 60 seconds + return min(base_delay + jitter, 60) + + def apply_backoff(e, label): + """Apply retry backoff logic. Returns delay if should retry, raises if exhausted.""" + nonlocal first_failure_time, backoff_attempts + if first_failure_time is None: + first_failure_time = time.time() + if time.time() - first_failure_time >= self.max_total_backoff_duration: + self.log.error( + f"Max total backoff duration ({self.max_total_backoff_duration}s) exceeded " + f"after {total_attempts} attempts. Final error: {e}" + ) + raise e + backoff_attempts += 1 + if backoff_attempts >= self.retries + 1: + self.log.error( + f"All {self.retries} retries exhausted after {total_attempts} total attempts. Final error: {e}" + ) + raise e + delay = calculate_backoff_delay(backoff_attempts) + self.log.debug( + f"{label} {backoff_attempts}/{self.retries} (total attempts: {total_attempts}) " + f"after {delay:.2f}s: {e}" + ) + return delay total_attempts = 0 backoff_attempts = 0 @@ -233,7 +260,6 @@ def calculate_backoff_delay(attempt): total_attempts += 1 try: - # Make the request with current retry count response = post( self.write_key, self.host, @@ -244,82 +270,33 @@ def calculate_backoff_delay(attempt): oauth_manager=self.oauth_manager, retry_count=total_attempts - 1 ) - # Success return response except FatalError as e: - # Non-retryable error + # Raised by oauth_manager when token refresh fails permanently; + # not safe to retry. self.log.error(f"Fatal error after {total_attempts} attempts: {e}") raise except APIError as e: - # 429 with valid Retry-After: set rate-limit state and raise - # to caller (pipeline blocking). Without Retry-After, fall - # through to counted backoff like any other retryable error. + # 429 with valid Retry-After > 0: block the pipeline and let + # upload() re-queue the batch. Retry-After: 0 or missing falls + # through to counted backoff to avoid a tight re-queue loop. if e.status == 429: retry_after = parse_retry_after(e.response) if e.response is not None else None - if retry_after is not None: + if retry_after is not None and retry_after > 0: self.set_rate_limit_state(e.response) raise - # Check if status is retryable if not is_retryable_status(e.status): self.log.error( f"Non-retryable error {e.status} after {total_attempts} attempts: {e}" ) raise - # Transient error -- per-batch backoff - if first_failure_time is None: - first_failure_time = time.time() - if time.time() - first_failure_time > self.max_total_backoff_duration: - self.log.error( - f"Max total backoff duration ({self.max_total_backoff_duration}s) exceeded " - f"after {total_attempts} attempts. Final error: {e}" - ) - raise - - # Count this against backoff attempts - backoff_attempts += 1 - if backoff_attempts >= self.retries + 1: - self.log.error( - f"All {self.retries} retries exhausted after {total_attempts} total attempts. Final error: {e}" - ) - raise - - # Calculate exponential backoff delay with jitter - delay = calculate_backoff_delay(backoff_attempts) - - self.log.debug( - f"Retry attempt {backoff_attempts}/{self.retries} (total attempts: {total_attempts}) " - f"after {delay:.2f}s for status {e.status}" - ) + delay = apply_backoff(e, f"Retry attempt (status {e.status})") time.sleep(delay) except Exception as e: - # Network errors or other exceptions - retry with backoff - if first_failure_time is None: - first_failure_time = time.time() - if time.time() - first_failure_time > self.max_total_backoff_duration: - self.log.error( - f"Max total backoff duration ({self.max_total_backoff_duration}s) exceeded " - f"after {total_attempts} attempts. Final error: {e}" - ) - raise - - backoff_attempts += 1 - - if backoff_attempts >= self.retries + 1: - self.log.error( - f"All {self.retries} retries exhausted after {total_attempts} total attempts. Final error: {e}" - ) - raise - - # Calculate exponential backoff delay with jitter - delay = calculate_backoff_delay(backoff_attempts) - - self.log.debug( - f"Network error retry {backoff_attempts}/{self.retries} (total attempts: {total_attempts}) " - f"after {delay:.2f}s: {e}" - ) + delay = apply_backoff(e, "Network error retry") time.sleep(delay) diff --git a/segment/analytics/request.py b/segment/analytics/request.py index 098b536..8fb538d 100644 --- a/segment/analytics/request.py +++ b/segment/analytics/request.py @@ -28,13 +28,13 @@ def parse_retry_after(response): return None try: - # Try parsing as integer (delay in seconds) delay = int(retry_after) - # Ensure delay is non-negative before applying upper bound return min(max(delay, 0), MAX_RETRY_AFTER_SECONDS) except ValueError: - # Could be HTTP-date format, but for simplicity we'll skip that - # Most APIs use integer seconds + # RFC 7231 allows HTTP-date format (e.g. "Wed, 21 Oct 2015 07:28:00 GMT") + # but we don't parse it; fall back to counted backoff. + log = logging.getLogger('segment') + log.warning('Unrecognized Retry-After format %r; ignoring header.', retry_after) return None diff --git a/segment/analytics/test/test_consumer.py b/segment/analytics/test/test_consumer.py index a197843..3419bb0 100644 --- a/segment/analytics/test/test_consumer.py +++ b/segment/analytics/test/test_consumer.py @@ -770,29 +770,33 @@ def on_error(e, batch): self.assertIsNone(consumer.rate_limited_until) self.assertEqual(q.qsize(), 0) - def test_retry_after_zero_sets_rate_limit_state(self): - """429 with Retry-After: 0 still sets rate-limit state for consistent pipeline handling""" - consumer = Consumer(None, 'testsecret', retries=1) + def test_retry_after_zero_uses_counted_backoff(self): + """429 with Retry-After: 0 falls through to counted backoff (not pipeline blocking). + Prevents a tight re-queue loop when the server says retry immediately.""" + consumer = Consumer(None, 'testsecret', retries=2) track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + call_count = 0 + def mock_post_fn(*args, **kwargs): - response = mock.Mock() - response.headers = {'Retry-After': '0'} - error = APIError(429, 'rate_limit', 'Too Many Requests') - error.response = response - raise error + nonlocal call_count + call_count += 1 + if call_count < 3: + response = mock.Mock() + response.headers = {'Retry-After': '0'} + error = APIError(429, 'rate_limit', 'Too Many Requests') + error.response = response + raise error + return mock.Mock(status_code=200) - before = time.time() with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): - with self.assertRaises(APIError) as ctx: + with mock.patch('time.sleep'): consumer.request([track]) - self.assertEqual(ctx.exception.status, 429) - after = time.time() - self.assertIsNotNone(consumer.rate_limited_until) - self.assertIsNotNone(consumer.rate_limit_start_time) - self.assertGreaterEqual(consumer.rate_limited_until, before) - self.assertLessEqual(consumer.rate_limited_until, after + 0.1) + # Should retry with backoff (3 calls: initial + 2 retries) + self.assertEqual(call_count, 3) + # Rate-limit state must NOT be set (no pipeline blocking for Retry-After: 0) + self.assertIsNone(consumer.rate_limited_until) def test_t19_max_total_backoff_duration(self): """T19: Gives up after maxTotalBackoffDuration elapsed""" @@ -880,3 +884,152 @@ def test_rate_limit_state_cleared_on_success(self): # Rate-limit state should be cleared on success self.assertIsNone(consumer.rate_limited_until) self.assertIsNone(consumer.rate_limit_start_time) + + def test_retry_after_zero_does_not_trigger_pipeline_blocking(self): + """Retry-After: 0 must not cause tight re-queue loop; falls through to counted backoff""" + q = Queue() + consumer = Consumer(q, 'testsecret', retries=2) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + q.put(track) + + call_count = 0 + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count == 1: + response = mock.Mock() + response.headers = {'Retry-After': '0'} + error = APIError(429, 'rate_limit', 'Too Many Requests') + error.response = response + raise error + return mock.Mock(status_code=200) + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep'): + result = consumer.upload() + + # Should succeed on retry (counted backoff path, not pipeline-blocking) + self.assertTrue(result) + # Rate-limit state must NOT be set (no pipeline blocking) + self.assertIsNone(consumer.rate_limited_until) + + def test_queue_full_during_429_requeue_calls_on_error(self): + """Queue-full during 429 re-queue calls on_error with dropped items""" + from queue import Full + q = Queue() + consumer = Consumer(q, 'testsecret', retries=3) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + q.put(track) + + dropped_batches = [] + + def on_error(e, batch): + dropped_batches.append(batch) + + consumer.on_error = on_error + + def mock_post_fn(*args, **kwargs): + response = mock.Mock() + response.headers = {'Retry-After': '5'} + error = APIError(429, 'rate_limit', 'Too Many Requests') + error.response = response + raise error + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep'): + # Make queue.put raise Full to simulate a full queue + with mock.patch.object(q, 'put', side_effect=Full): + result = consumer.upload() + + self.assertFalse(result) + # on_error should have been called with the dropped item + self.assertEqual(len(dropped_batches), 1) + self.assertEqual(len(dropped_batches[0]), 1) + + def test_none_max_total_backoff_duration_rejected_by_client(self): + """Client rejects None for max_total_backoff_duration""" + from segment.analytics.client import Client + with self.assertRaises(ValueError): + Client('testsecret', max_total_backoff_duration=None) + + def test_none_max_rate_limit_duration_rejected_by_client(self): + """Client rejects None for max_rate_limit_duration""" + from segment.analytics.client import Client + with self.assertRaises(ValueError): + Client('testsecret', max_rate_limit_duration=None) + + def test_max_total_backoff_duration_zero_prevents_retry(self): + """max_total_backoff_duration=0 prevents any retry attempt""" + consumer = Consumer(None, 'testsecret', retries=1000, + max_total_backoff_duration=0) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + + call_count = 0 + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + raise APIError(500, 'error', 'Server Error') + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep'): + with self.assertRaises(APIError): + consumer.request([track]) + + # With duration=0 and >= check, first failure sets first_failure_time + # and immediately satisfies time.time() - first_failure_time >= 0, + # so it raises on the very first failure (1 attempt total). + self.assertEqual(call_count, 1) + + def test_parse_retry_after_http_date_logs_warning(self): + """parse_retry_after logs a warning for HTTP-date format and returns None""" + import logging + from segment.analytics.request import parse_retry_after + + response = mock.Mock() + response.headers = {'Retry-After': 'Wed, 21 Oct 2015 07:28:00 GMT'} + + with self.assertLogs('segment', level=logging.WARNING) as cm: + result = parse_retry_after(response) + + self.assertIsNone(result) + self.assertTrue(any('Unrecognized Retry-After' in line for line in cm.output)) + + def test_410_and_460_retried(self): + """410 and 460 are retryable status codes""" + for status_code in [410, 460]: + consumer = Consumer(None, 'testsecret', retries=2) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + call_count = 0 + + def mock_post_fn(*args, _status=status_code, **kwargs): + nonlocal call_count + call_count += 1 + if call_count < 2: + raise APIError(_status, 'error', f'Error {_status}') + return mock.Mock(status_code=200) + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with mock.patch('time.sleep'): + consumer.request([track]) + + self.assertEqual(call_count, 2, f'{status_code} should be retried') + + def test_505_not_retried(self): + """505 HTTP Version Not Supported is non-retryable""" + consumer = Consumer(None, 'testsecret', retries=3) + track = {'type': 'track', 'event': 'python event', 'userId': 'userId'} + call_count = 0 + + def mock_post_fn(*args, **kwargs): + nonlocal call_count + call_count += 1 + raise APIError(505, 'error', 'HTTP Version Not Supported') + + with mock.patch('segment.analytics.consumer.post', side_effect=mock_post_fn): + with self.assertRaises(APIError) as ctx: + consumer.request([track]) + self.assertEqual(ctx.exception.status, 505) + + self.assertEqual(call_count, 1) diff --git a/setup.py b/setup.py index c8ab0a3..c8bb56c 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,6 @@ install_requires = [ "requests~=2.7", - "backoff~=2.1", "python-dateutil~=2.2", "PyJWT~=2.12" ] diff --git a/status-response-updates-deep-review.md b/status-response-updates-deep-review.md new file mode 100644 index 0000000..4ffdef9 --- /dev/null +++ b/status-response-updates-deep-review.md @@ -0,0 +1,112 @@ +# Deep Code Review — analytics-python `response-status-updates` + +PR: https://github.com/segmentio/analytics-python/pull/520 + +--- + +## 🔴 Critical / Must-Fix + +**1. `backoff` dependency not removed** + +`setup.py` and `requirements.txt` still declare `backoff~=2.1` / `backoff==2.2.1` even though the import was removed from `consumer.py`. Every user still gets an unused transitive dependency shipped to them. + +> ✅ **Resolved**: Removed `backoff~=2.1` from `setup.py` `install_requires` and `backoff==2.2.1` from `requirements.txt`. + +--- + +**2. `None` values for `max_total_backoff_duration` / `max_rate_limit_duration` cause `TypeError`** + +`client.py` accepts `None` (only rejects negative values), but `consumer.py` does unguarded arithmetic comparisons against these values. Passing `None` will raise a `TypeError` at runtime. + +> ✅ **Resolved**: `client.py` validation now rejects `None` with a `ValueError` (same branch as the negative-value check). Tests added: `test_none_max_total_backoff_duration_rejected_by_client` and `test_none_max_rate_limit_duration_rejected_by_client`. + +--- + +## 🟠 Important / Should-Fix + +**3. `max_total_backoff_duration=0` allows one extra attempt** + +The duration check uses `>` rather than `>=`, and `first_failure_time` is set on the same line as the check, creating a race where the first failure always slips through even with a duration of 0. + +> ✅ **Resolved**: Changed `>` to `>=` in both `APIError` and generic `Exception` backoff branches. With `duration=0`, the first failure sets `first_failure_time` and immediately satisfies the check (`0 >= 0`), so the request raises after 1 attempt. Test added: `test_max_total_backoff_duration_zero_prevents_retry`. + +--- + +**4. `Retry-After: 0` causes a tight re-queue loop** + +`parse_retry_after` returns `0`, which sets `rate_limited_until` to "now already expired", causing upload to immediately re-attempt with no delay, re-queue on 429, and loop tight. + +> ✅ **Resolved**: `request()` now only triggers pipeline-blocking (set rate-limit state + raise) when `retry_after > 0`. A `Retry-After: 0` header falls through to counted backoff. Tests updated: `test_retry_after_zero_sets_rate_limit_state` renamed and rewritten as `test_retry_after_zero_uses_counted_backoff`; `test_retry_after_zero_does_not_trigger_pipeline_blocking` added for the `upload()` path. + +--- + +**5. Queue full during 429 re-queue silently drops items** + +`except Exception: pass` in the re-queue loop swallows queue-full errors with no log and no `on_error` callback. Items are dropped silently. + +> ✅ **Resolved**: Collects dropped items, logs an error with the count, and calls `on_error` if configured. Test added: `test_queue_full_during_429_requeue_calls_on_error`. + +--- + +**6. `flush()` can block for up to 12 hours** + +When rate-limited with pipeline-blocking 429s, `queue.join()` in `flush()` waits for all pending `task_done()` calls, which can take up to `max_rate_limit_duration`. This is intentional but undocumented. + +> ✅ **Resolved**: Added a docstring warning on `flush()` documenting the blocking behavior and worst-case duration. + +--- + +**7. `410 Gone` is marked retryable** + +HTTP 410 means permanently removed; retrying will never succeed with the same payload. The Node reference does it too, but there should be a code comment explaining the rationale, or it should be removed. + +> ✅ **Resolved**: Added an inline comment in `is_retryable_status` explaining that 410 is included for parity with the Node.js SDK, and that the retry budget caps total attempts. Test added: `test_410_and_460_retried`. + +--- + +## 🟡 Minor / Nitpick + +**8. Duplicate backoff code in two `except` branches** + +The backoff delay calculation is duplicated in two `except` branches of `request()`. Should be extracted to a helper to prevent drift. + +> ✅ **Resolved**: Extracted shared duration-check + retry-count-check + delay-calc + log into an `apply_backoff(e, label)` inner function. Both `APIError` and generic `Exception` branches now call it. + +--- + +**9. `Retry-After` HTTP-date format silently falls back with no warning** + +RFC 7231 allows `Retry-After` to be either a delay-seconds integer or an HTTP-date string. When an HTTP-date is received, `parse_retry_after` silently falls back to counted backoff with no warning log. No test covers this path. + +> ✅ **Resolved**: `parse_retry_after` now logs `WARNING: Unrecognized Retry-After format ...; ignoring header.` on `ValueError`. Test added: `test_parse_retry_after_http_date_logs_warning`. + +--- + +**10. `task_done()` separation between early-return and `finally` is fragile** + +The placement of `task_done()` calls relative to early returns needs a comment explaining the invariant, otherwise it's easy to introduce a double-call or missed-call on future edits. + +> ✅ **Resolved**: Added a comment above the `finally` block explaining the invariant: each item obtained via `queue.get()` must have exactly one `task_done()`, including re-queued items (which incur a new obligation on their next `get()`). + +--- + +**11. `FatalError` catch is non-obvious** + +The `FatalError` catch in `consumer.py` is non-obvious without reading `oauth_manager.py`. Add a comment explaining what raises it and why it should be terminal. + +> ✅ **Resolved**: Added an inline comment: "Raised by oauth_manager when token refresh fails permanently; not safe to retry." + +--- + +## Test Coverage Gaps + +| Status | Gap | +|--------|-----| +| ✅ Added | `Retry-After: 0` tight-loop behavior | +| ✅ Added | Queue-full during 429 re-queue (silent drop) | +| ✅ Added | `None` passed for `max_total_backoff_duration` / `max_rate_limit_duration` | +| ✅ Added | `parse_retry_after` with HTTP-date format input | +| ✅ Added | `max_total_backoff_duration=0` off-by-one (first failure always passes) | +| ✅ Added | 410 and 460 retryable | +| ✅ Added | 505 non-retryable 5xx | +| ⚠️ Untested | `flush()` blocking behavior during active rate-limit | From 211f094a75e8c666989e78fd31c4fbcfd95eca0c Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Mon, 11 May 2026 16:34:45 -0400 Subject: [PATCH 17/18] Refine documentation in flush method Removed unnecessary line in flush method documentation. --- segment/analytics/client.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/segment/analytics/client.py b/segment/analytics/client.py index 9a8dcdb..e1ffb98 100644 --- a/segment/analytics/client.py +++ b/segment/analytics/client.py @@ -334,8 +334,7 @@ def flush(self): """Forces a flush from the internal queue to the server. Warning: if the consumer is currently rate-limited, this call will - block until the rate limit clears or max_rate_limit_duration elapses - (up to 12 hours by default). + block until the rate limit clears or max_rate_limit_duration elapses. """ queue = self.queue size = queue.qsize() From a1314f8bc8efd160b7504e4b870a035c124afc7f Mon Sep 17 00:00:00 2001 From: Michael Grosse Huelsewiesche Date: Mon, 11 May 2026 16:38:51 -0400 Subject: [PATCH 18/18] Delete status-response-updates-deep-review.md --- status-response-updates-deep-review.md | 112 ------------------------- 1 file changed, 112 deletions(-) delete mode 100644 status-response-updates-deep-review.md diff --git a/status-response-updates-deep-review.md b/status-response-updates-deep-review.md deleted file mode 100644 index 4ffdef9..0000000 --- a/status-response-updates-deep-review.md +++ /dev/null @@ -1,112 +0,0 @@ -# Deep Code Review — analytics-python `response-status-updates` - -PR: https://github.com/segmentio/analytics-python/pull/520 - ---- - -## 🔴 Critical / Must-Fix - -**1. `backoff` dependency not removed** - -`setup.py` and `requirements.txt` still declare `backoff~=2.1` / `backoff==2.2.1` even though the import was removed from `consumer.py`. Every user still gets an unused transitive dependency shipped to them. - -> ✅ **Resolved**: Removed `backoff~=2.1` from `setup.py` `install_requires` and `backoff==2.2.1` from `requirements.txt`. - ---- - -**2. `None` values for `max_total_backoff_duration` / `max_rate_limit_duration` cause `TypeError`** - -`client.py` accepts `None` (only rejects negative values), but `consumer.py` does unguarded arithmetic comparisons against these values. Passing `None` will raise a `TypeError` at runtime. - -> ✅ **Resolved**: `client.py` validation now rejects `None` with a `ValueError` (same branch as the negative-value check). Tests added: `test_none_max_total_backoff_duration_rejected_by_client` and `test_none_max_rate_limit_duration_rejected_by_client`. - ---- - -## 🟠 Important / Should-Fix - -**3. `max_total_backoff_duration=0` allows one extra attempt** - -The duration check uses `>` rather than `>=`, and `first_failure_time` is set on the same line as the check, creating a race where the first failure always slips through even with a duration of 0. - -> ✅ **Resolved**: Changed `>` to `>=` in both `APIError` and generic `Exception` backoff branches. With `duration=0`, the first failure sets `first_failure_time` and immediately satisfies the check (`0 >= 0`), so the request raises after 1 attempt. Test added: `test_max_total_backoff_duration_zero_prevents_retry`. - ---- - -**4. `Retry-After: 0` causes a tight re-queue loop** - -`parse_retry_after` returns `0`, which sets `rate_limited_until` to "now already expired", causing upload to immediately re-attempt with no delay, re-queue on 429, and loop tight. - -> ✅ **Resolved**: `request()` now only triggers pipeline-blocking (set rate-limit state + raise) when `retry_after > 0`. A `Retry-After: 0` header falls through to counted backoff. Tests updated: `test_retry_after_zero_sets_rate_limit_state` renamed and rewritten as `test_retry_after_zero_uses_counted_backoff`; `test_retry_after_zero_does_not_trigger_pipeline_blocking` added for the `upload()` path. - ---- - -**5. Queue full during 429 re-queue silently drops items** - -`except Exception: pass` in the re-queue loop swallows queue-full errors with no log and no `on_error` callback. Items are dropped silently. - -> ✅ **Resolved**: Collects dropped items, logs an error with the count, and calls `on_error` if configured. Test added: `test_queue_full_during_429_requeue_calls_on_error`. - ---- - -**6. `flush()` can block for up to 12 hours** - -When rate-limited with pipeline-blocking 429s, `queue.join()` in `flush()` waits for all pending `task_done()` calls, which can take up to `max_rate_limit_duration`. This is intentional but undocumented. - -> ✅ **Resolved**: Added a docstring warning on `flush()` documenting the blocking behavior and worst-case duration. - ---- - -**7. `410 Gone` is marked retryable** - -HTTP 410 means permanently removed; retrying will never succeed with the same payload. The Node reference does it too, but there should be a code comment explaining the rationale, or it should be removed. - -> ✅ **Resolved**: Added an inline comment in `is_retryable_status` explaining that 410 is included for parity with the Node.js SDK, and that the retry budget caps total attempts. Test added: `test_410_and_460_retried`. - ---- - -## 🟡 Minor / Nitpick - -**8. Duplicate backoff code in two `except` branches** - -The backoff delay calculation is duplicated in two `except` branches of `request()`. Should be extracted to a helper to prevent drift. - -> ✅ **Resolved**: Extracted shared duration-check + retry-count-check + delay-calc + log into an `apply_backoff(e, label)` inner function. Both `APIError` and generic `Exception` branches now call it. - ---- - -**9. `Retry-After` HTTP-date format silently falls back with no warning** - -RFC 7231 allows `Retry-After` to be either a delay-seconds integer or an HTTP-date string. When an HTTP-date is received, `parse_retry_after` silently falls back to counted backoff with no warning log. No test covers this path. - -> ✅ **Resolved**: `parse_retry_after` now logs `WARNING: Unrecognized Retry-After format ...; ignoring header.` on `ValueError`. Test added: `test_parse_retry_after_http_date_logs_warning`. - ---- - -**10. `task_done()` separation between early-return and `finally` is fragile** - -The placement of `task_done()` calls relative to early returns needs a comment explaining the invariant, otherwise it's easy to introduce a double-call or missed-call on future edits. - -> ✅ **Resolved**: Added a comment above the `finally` block explaining the invariant: each item obtained via `queue.get()` must have exactly one `task_done()`, including re-queued items (which incur a new obligation on their next `get()`). - ---- - -**11. `FatalError` catch is non-obvious** - -The `FatalError` catch in `consumer.py` is non-obvious without reading `oauth_manager.py`. Add a comment explaining what raises it and why it should be terminal. - -> ✅ **Resolved**: Added an inline comment: "Raised by oauth_manager when token refresh fails permanently; not safe to retry." - ---- - -## Test Coverage Gaps - -| Status | Gap | -|--------|-----| -| ✅ Added | `Retry-After: 0` tight-loop behavior | -| ✅ Added | Queue-full during 429 re-queue (silent drop) | -| ✅ Added | `None` passed for `max_total_backoff_duration` / `max_rate_limit_duration` | -| ✅ Added | `parse_retry_after` with HTTP-date format input | -| ✅ Added | `max_total_backoff_duration=0` off-by-one (first failure always passes) | -| ✅ Added | 410 and 460 retryable | -| ✅ Added | 505 non-retryable 5xx | -| ⚠️ Untested | `flush()` blocking behavior during active rate-limit |