Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions crazy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import apache_beam as beam
from apache_beam.testing.util import assert_that, equal_to

with beam.Pipeline() as p:
output_titles = (
p
| "Create input" >> beam.Create([(0,0)])
| "Batch in groups" >> beam.GroupIntoBatches(5)
| beam.Reshuffle()
)
output_titles | beam.Map(print)
assert_that(output_titles, equal_to([(0, (0,))]))
46 changes: 46 additions & 0 deletions isort.diff
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
--- /runner/_work/beam/beam/sdks/python/test-suites/tox/pycommon/build/srcs/sdks/python/apache_beam/metrics/metric_test.py:before 2025-10-16 13:27:21.916235
1560.31user 33.00system 6:49.10elapsed 389%CPU (0avgtext+0avgdata 925408maxresident)k
11760inputs+2912outputs (13major+2158492minor)pagefaults 0swaps
+++ /runner/_work/beam/beam/sdks/python/test-suites/tox/pycommon/build/srcs/sdks/python/apache_beam/metrics/metric_test.py:after 2025-10-16 13:42:50.772388
@@ -17,10 +17,11 @@

# pytype: skip-file

+import re
+import unittest
+
import hamcrest as hc
import pytest
-import re
-import unittest

import apache_beam as beam
from apache_beam import metrics
ERROR: /runner/_work/beam/beam/sdks/python/test-suites/tox/pycommon/build/srcs/sdks/python/apache_beam/io/gcp/bigquery_tools.py Imports are incorrectly sorted.
--- /runner/_work/beam/beam/sdks/python/test-suites/tox/pycommon/build/srcs/sdks/python/apache_beam/io/gcp/bigquery_tools.py:before 2025-10-16 13:27:22.059248
+++ /runner/_work/beam/beam/sdks/python/test-suites/tox/pycommon/build/srcs/sdks/python/apache_beam/io/gcp/bigquery_tools.py:after 2025-10-16 13:42:57.634626
@@ -30,13 +30,10 @@

import datetime
import decimal
-import fastavro
import io
import json
import logging
-import numpy as np
import re
-import regex
import sys
import time
import uuid
@@ -46,6 +43,10 @@
from typing import Tuple
from typing import TypeVar
from typing import Union
+
+import fastavro
+import numpy as np
+import regex

import apache_beam
from apache_beam import coders
11 changes: 11 additions & 0 deletions sdks/python/.claude/settings.local.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"permissions": {
"allow": [
"Bash(git add:*)",
"Bash(git commit:*)",
"Bash(python -m pytest:*)",
"Bash(python:*)",
"Bash(BEAM_RUNNER_ENABLE_PRISM=0 python:*)"
]
}
}
130 changes: 130 additions & 0 deletions sdks/python/TDIGEST_IMPLEMENTATION.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# TDigest Distribution Enhancement - Session Refresh Document

## Goal
Extend Beam's existing `Distribution` metric to internally use TDigest (via `fastdigest` package), enabling percentile queries (p50, p90, p95, p99) **without any user code changes**.

Users continue calling:
```python
dist = Metrics.distribution("namespace", "name")
dist.update(value)
```

But can now query:
```python
result.p50, result.p95, result.p99, result.quantile(0.75)
```

## Key Design Decisions
- **Package**: Use `fastdigest` (Rust-based, fast, compatible API)
- **Compression factor**: Use default (100)
- **Backwards compatible**: Old payloads decode with `tdigest=None`; percentile methods return `None` when unavailable
- **Conditional import**: Graceful fallback if fastdigest not installed

## Files to Modify

| File | Purpose |
|------|---------|
| `setup.py` | Add `fastdigest>=0.6.0,<1` to install_requires |
| `apache_beam/metrics/cells.py` | Core changes: DistributionData, DistributionCell, DistributionResult |
| `apache_beam/metrics/monitoring_infos.py` | Serialization: _encode/_decode_distribution |
| `apache_beam/metrics/cells_test.py` | Unit tests for cells changes |
| `apache_beam/metrics/monitoring_infos_test.py` | Unit tests for serialization |
| `apache_beam/metrics/metric_test.py` | Integration tests |

## Files to Read for Context

**Essential** (read these first):
1. `apache_beam/metrics/cells.py` lines 164-220 (DistributionCell), 349-402 (DistributionResult), 496-563 (DistributionData)
2. `apache_beam/metrics/monitoring_infos.py` lines 249-281 (int64_user_distribution), 501-510 (distribution_payload_combiner), 561-581 (_encode/_decode_distribution)

**Detailed plan**:
- `/Users/jtran/.claude/plans/tdigest-distribution-plan.md`

## fastdigest API Reference
```python
from fastdigest import TDigest

# Create
t = TDigest() # Empty
t = TDigest.from_values([1, 2, 3]) # From values

# Update
t += TDigest.from_values([4, 5]) # Merge with +

# Query
t.quantile(0.5) # Value at percentile
t.cdf(value) # Percentile of value

# Serialize
d = t.to_dict() # To dict
t2 = TDigest.from_dict(d) # From dict
```

## Implementation Status: COMPLETE

### Phase 1: Dependencies
- [x] Add fastdigest to setup.py
- [x] Verify installation

### Phase 2: DistributionData (cells.py)
- [x] Add TDigest import (conditional)
- [x] Extend __init__ with tdigest param
- [x] Extend __eq__, __hash__, __repr__
- [x] Extend get_cumulative (copy tdigest)
- [x] Extend combine (merge tdigests)
- [x] Extend singleton, identity_element
- [x] Add tests (7 tests)

### Phase 3: DistributionCell (cells.py)
- [x] Update _update to feed tdigest
- [x] Add tests (2 tests)

### Phase 4: DistributionResult (cells.py)
- [x] Add p50, p90, p95, p99 properties
- [x] Add quantile(q) method
- [x] Update __repr__
- [x] Add tests (4 tests)

### Phase 5: Serialization (monitoring_infos.py)
- [x] Add TDigest import
- [x] Extend _encode_distribution (append tdigest bytes)
- [x] Extend _decode_distribution (read tdigest bytes)
- [x] Update int64_user_distribution, int64_distribution
- [x] Update extract_metric_result_map_value
- [x] Update distribution_payload_combiner
- [x] Add tests (4 tests)

### Phase 6-7: Integration & Full Tests
- [x] All cells_test.py tests pass (42 tests)
- [x] All monitoring_infos_test.py tests pass (15 tests)

### Demo Script
- [x] Created `tdigest_demo.py` to visualize TDigest quantiles
- Generates normal, bimodal, and longtail distributions
- Shows percentile comparison table and ASCII visualization
- Compares TDigest results with numpy ground truth

## Known Limitations

### Prism Runner Issue
As of 2024, DirectRunner uses PrismRunner by default, which is a Go-based portable runner.
Prism does not properly preserve TDigest data in distribution metric payloads. The Python SDK correctly encodes TDigest data (verified with 2000+ byte payloads), but Prism truncates the payload to only 4-5 bytes (basic count/sum/min/max).

**Root Cause**: The issue is in the Go Prism implementation, not the Python SDK. The Python worker correctly creates and serializes MonitoringInfo protobufs with full TDigest payloads, but Prism does not properly handle the extended distribution format.

**Workaround**: Use `BundleBasedDirectRunner` for local testing:
```python
with beam.Pipeline(runner='BundleBasedDirectRunner') as p:
...
```

**Investigation Status**:
- ✅ Python SDK creates MonitoringInfos with TDigest correctly (290+ bytes)
- ✅ Protobuf serialization/deserialization preserves TDigest
- ✅ BundleBasedDirectRunner works perfectly
- ❌ PrismRunner (DirectRunner default) truncates TDigest payloads

**Next Steps**: This requires a fix in the Go Prism codebase to properly handle extended distribution payloads. The portable runner protocol and protobuf schema support the extended format, but Prism's implementation needs to be updated.

## Current Branch
`tdigestdistribution`
117 changes: 107 additions & 10 deletions sdks/python/apache_beam/metrics/cells.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@
from apache_beam.portability.api import metrics_pb2
from apache_beam.utils.histogram import Histogram

try:
from fastdigest import TDigest
_TDIGEST_AVAILABLE = True
except ImportError:
_TDIGEST_AVAILABLE = False
TDigest = None # type: ignore

try:
import cython
except ImportError:
Expand Down Expand Up @@ -205,6 +212,9 @@ def _update(self, value):
self.data.min = ivalue
if ivalue > self.data.max:
self.data.max = ivalue
# Also update tdigest for percentile tracking
if self.data.tdigest is not None and _TDIGEST_AVAILABLE:
self.data.tdigest += TDigest.from_values([float(ivalue)])

def get_cumulative(self):
# type: () -> DistributionData
Expand Down Expand Up @@ -365,9 +375,13 @@ def __hash__(self):

def __repr__(self):
# type: () -> str
return (
base = (
'DistributionResult(sum={}, count={}, min={}, max={}, '
'mean={})'.format(self.sum, self.count, self.min, self.max, self.mean))
'mean={}'.format(self.sum, self.count, self.min, self.max, self.mean))
if self.data.tdigest is not None and self.data.count > 0:
base += ', p50={:.2f}, p95={:.2f}, p99={:.2f}'.format(
self.p50, self.p95, self.p99)
return base + ')'

@property
def max(self):
Expand Down Expand Up @@ -401,6 +415,58 @@ def mean(self):
return None
return self.data.sum / self.data.count

@property
def p50(self):
# type: () -> Optional[float]

"""Returns the 50th percentile (median) of the distribution."""
if self.data.tdigest is None or self.data.count == 0:
return None
return self.data.tdigest.quantile(0.50)

@property
def p90(self):
# type: () -> Optional[float]

"""Returns the 90th percentile of the distribution."""
if self.data.tdigest is None or self.data.count == 0:
return None
return self.data.tdigest.quantile(0.90)

@property
def p95(self):
# type: () -> Optional[float]

"""Returns the 95th percentile of the distribution."""
if self.data.tdigest is None or self.data.count == 0:
return None
return self.data.tdigest.quantile(0.95)

@property
def p99(self):
# type: () -> Optional[float]

"""Returns the 99th percentile of the distribution."""
if self.data.tdigest is None or self.data.count == 0:
return None
return self.data.tdigest.quantile(0.99)

def quantile(self, q):
# type: (float) -> Optional[float]

"""Returns the value at the given quantile (0.0 to 1.0).

Args:
q: The quantile to retrieve, between 0.0 and 1.0.

Returns:
The estimated value at the given quantile, or None if no tdigest
is available or the distribution is empty.
"""
if self.data.tdigest is None or self.data.count == 0:
return None
return self.data.tdigest.quantile(q)


class GaugeResult(object):
def __init__(self, data):
Expand Down Expand Up @@ -503,8 +569,8 @@ class DistributionData(object):
This object is not thread safe, so it's not supposed to be modified
by other than the DistributionCell that contains it.
"""
def __init__(self, sum, count, min, max):
# type: (int, int, int, int) -> None
def __init__(self, sum, count, min, max, tdigest=None):
# type: (int, int, int, int, ...) -> None
if count:
self.sum = sum
self.count = count
Expand All @@ -515,13 +581,21 @@ def __init__(self, sum, count, min, max):
self.min = 2**63 - 1
# Avoid Wimplicitly-unsigned-literal caused by -2**63.
self.max = -self.min - 1
self.tdigest = tdigest

def __eq__(self, other):
# type: (object) -> bool
if isinstance(other, DistributionData):
return (
basic_eq = (
self.sum == other.sum and self.count == other.count and
self.min == other.min and self.max == other.max)
if not basic_eq:
return False
# Compare tdigests only if both are present
# If either is None, consider them equal (backwards compatibility)
if self.tdigest is None or other.tdigest is None:
return True
return self.tdigest.to_dict() == other.tdigest.to_dict()
else:
return False

Expand All @@ -531,12 +605,19 @@ def __hash__(self):

def __repr__(self):
# type: () -> str
return 'DistributionData(sum={}, count={}, min={}, max={})'.format(
base = 'DistributionData(sum={}, count={}, min={}, max={}'.format(
self.sum, self.count, self.min, self.max)
if self.tdigest is not None:
base += ', tdigest=<TDigest>'
return base + ')'

def get_cumulative(self):
# type: () -> DistributionData
return DistributionData(self.sum, self.count, self.min, self.max)
tdigest_copy = None
if self.tdigest is not None and _TDIGEST_AVAILABLE:
tdigest_copy = TDigest.from_dict(self.tdigest.to_dict())
return DistributionData(
self.sum, self.count, self.min, self.max, tdigest_copy)

def get_result(self) -> DistributionResult:
return DistributionResult(self.get_cumulative())
Expand All @@ -546,21 +627,37 @@ def combine(self, other):
if other is None:
return self

# Merge tdigests
merged_tdigest = None
if self.tdigest is not None and other.tdigest is not None:
merged_tdigest = self.tdigest + other.tdigest
elif self.tdigest is not None:
merged_tdigest = self.tdigest
elif other.tdigest is not None:
merged_tdigest = other.tdigest

return DistributionData(
self.sum + other.sum,
self.count + other.count,
self.min if self.min < other.min else other.min,
self.max if self.max > other.max else other.max)
self.max if self.max > other.max else other.max,
merged_tdigest)

@staticmethod
def singleton(value):
# type: (int) -> DistributionData
return DistributionData(value, 1, value, value)
tdigest = None
if _TDIGEST_AVAILABLE:
tdigest = TDigest.from_values([float(value)])
return DistributionData(value, 1, value, value, tdigest)

@staticmethod
def identity_element():
# type: () -> DistributionData
return DistributionData(0, 0, 2**63 - 1, -2**63)
tdigest = None
if _TDIGEST_AVAILABLE:
tdigest = TDigest()
return DistributionData(0, 0, 2**63 - 1, -2**63, tdigest)


class StringSetData(object):
Expand Down
Loading
Loading