Skip to content

Commit e3fe94b

Browse files
committed
feat(arrow): add ResultsApi subclass for Arrow IPC results
1 parent 11e4c61 commit e3fe94b

6 files changed

Lines changed: 614 additions & 0 deletions

File tree

.github/workflows/regenerate.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,14 @@ jobs:
8484
'keywords = ["hotdata", "api-client", "data-platform"]',
8585
re.MULTILINE,
8686
),
87+
# Insert [project.optional-dependencies] (for hotdata.arrow) just
88+
# before [project.urls]. Run before the urls patch so the urls
89+
# anchor is unchanged when this fires.
90+
(
91+
r'(\ndependencies = \[\n(?:[^\]]|\][^\n])*\]\n)\n(\[project\.urls\])',
92+
r'\1\n[project.optional-dependencies]\narrow = ["pyarrow >= 14"]\n\n\2',
93+
0,
94+
),
8795
(
8896
r'\[project\.urls\]\nRepository = "[^"]*"\n',
8997
'[project.urls]\nHomepage = "https://www.hotdata.dev"\nRepository = "https://github.com/hotdata-dev/sdk-python"\n',

README.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,35 @@ with hotdata.ApiClient(configuration) as api_client:
6060

6161
Each `Api` class groups endpoints by resource. Construct the client, then call the typed methods you need.
6262

63+
## Arrow results
64+
65+
Query results can be fetched as an [Apache Arrow](https://arrow.apache.org/) IPC stream instead of JSON, which is faster and far more memory-efficient for large result sets. Install the optional extra:
66+
67+
```sh
68+
pip install 'hotdata[arrow]'
69+
```
70+
71+
Use `hotdata.arrow.ResultsApi` (a drop-in subclass of `ResultsApi` that adds Arrow methods):
72+
73+
```python
74+
from hotdata import ApiClient, Configuration
75+
from hotdata.arrow import ResultsApi
76+
77+
with ApiClient(Configuration(api_key="...", workspace_id="...")) as client:
78+
results = ResultsApi(client)
79+
80+
# Buffered: returns a pyarrow.Table.
81+
table = results.get_result_arrow(result_id)
82+
83+
# Streaming: yields a pyarrow.RecordBatchStreamReader without
84+
# materializing the full table in memory.
85+
with results.stream_result_arrow(result_id) as reader:
86+
for batch in reader:
87+
...
88+
```
89+
90+
Both methods accept `offset` and `limit` for pagination. They raise `hotdata.arrow.ResultNotReadyError` if the result is still pending or processing — poll `results.get_result(result_id)` until `status == "ready"` first.
91+
6392
## API reference
6493

6594
Generated Markdown for every operation and model is in [`docs/`](https://github.com/hotdata-dev/sdk-python/tree/main/docs):

hotdata/arrow.py

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
"""Arrow IPC helpers for ``GET /v1/results/{id}``.
2+
3+
The auto-generated :class:`hotdata.api.results_api.ResultsApi` understands the
4+
``format=arrow`` query parameter but cannot decode the
5+
``application/vnd.apache.arrow.stream`` response body — openapi-generator picks
6+
the JSON content variant for status 200 and routes Arrow bytes through the
7+
JSON deserializer, which raises ``Unsupported content type``.
8+
9+
This module wraps the generated client with a thin subclass that:
10+
11+
* sets ``Accept: application/vnd.apache.arrow.stream`` and ``?format=arrow``,
12+
* uses the generator's ``*_without_preload_content`` plumbing to hold the
13+
underlying ``urllib3.HTTPResponse`` open as a byte stream,
14+
* hands that stream to ``pyarrow.ipc.open_stream`` so callers get a
15+
:class:`pyarrow.Table` (or a :class:`pyarrow.RecordBatchStreamReader` for
16+
the streaming variant).
17+
18+
Install with ``pip install 'hotdata[arrow]'`` to pull in pyarrow.
19+
"""
20+
21+
from __future__ import annotations
22+
23+
from contextlib import contextmanager
24+
from typing import TYPE_CHECKING, Any, Dict, Iterator, Optional
25+
26+
from hotdata.api.results_api import ResultsApi as _GeneratedResultsApi
27+
from hotdata.models.results_format_query import ResultsFormatQuery
28+
29+
if TYPE_CHECKING: # pragma: no cover - import-time only for type checkers
30+
import pyarrow as pa # type: ignore[import-untyped]
31+
32+
33+
ARROW_STREAM_MEDIA_TYPE = "application/vnd.apache.arrow.stream"
34+
35+
36+
class ResultNotReadyError(Exception):
37+
"""Raised when the result exists but is not yet ``ready``.
38+
39+
The server replies with HTTP 202 while a result is ``pending`` or
40+
``processing``. Poll :meth:`ResultsApi.get_result` until ``status='ready'``
41+
before fetching as Arrow.
42+
"""
43+
44+
def __init__(self, status: str, result_id: str) -> None:
45+
self.status = status
46+
self.result_id = result_id
47+
super().__init__(
48+
f"Result {result_id} is not ready (status={status!r}); "
49+
"poll get_result until status='ready' before fetching as Arrow."
50+
)
51+
52+
53+
def _import_pyarrow() -> Any:
54+
try:
55+
import pyarrow.ipc as ipc # type: ignore[import-untyped]
56+
except ImportError as exc: # pragma: no cover - exercised via tests
57+
raise ImportError(
58+
"pyarrow is required to fetch results as Arrow. "
59+
"Install with: pip install 'hotdata[arrow]'"
60+
) from exc
61+
return ipc
62+
63+
64+
class ResultsApi(_GeneratedResultsApi):
65+
"""Drop-in replacement for :class:`hotdata.api.results_api.ResultsApi`
66+
that adds Arrow IPC fetch helpers.
67+
68+
All methods on the base class continue to work unchanged.
69+
"""
70+
71+
def get_result_arrow(
72+
self,
73+
id: str,
74+
*,
75+
offset: Optional[int] = None,
76+
limit: Optional[int] = None,
77+
_request_timeout: Any = None,
78+
) -> "pa.Table":
79+
"""Fetch a ready result as a :class:`pyarrow.Table`.
80+
81+
Buffers the full Arrow IPC stream into memory before returning. Use
82+
:meth:`stream_result_arrow` for large results where you want to
83+
iterate batches without materializing the whole table.
84+
85+
:param id: Result ID.
86+
:param offset: Rows to skip (default: 0).
87+
:param limit: Maximum rows to return (default: unbounded).
88+
:raises ResultNotReadyError: result is still pending or processing.
89+
:raises hotdata.exceptions.ApiException: for other HTTP errors
90+
(400 invalid params, 404 not found, 409 failed result).
91+
"""
92+
ipc = _import_pyarrow()
93+
response = self._call_arrow(id=id, offset=offset, limit=limit,
94+
_request_timeout=_request_timeout)
95+
try:
96+
return ipc.open_stream(response).read_all()
97+
finally:
98+
response.release_conn()
99+
100+
@contextmanager
101+
def stream_result_arrow(
102+
self,
103+
id: str,
104+
*,
105+
offset: Optional[int] = None,
106+
limit: Optional[int] = None,
107+
_request_timeout: Any = None,
108+
) -> Iterator["pa.RecordBatchStreamReader"]:
109+
"""Yield a :class:`pyarrow.RecordBatchStreamReader` for a ready result.
110+
111+
The HTTP connection is released when the context exits. Iterate the
112+
reader to consume :class:`pyarrow.RecordBatch` messages, or call
113+
``reader.read_all()`` for a full :class:`pyarrow.Table`.
114+
115+
Example::
116+
117+
with results.stream_result_arrow(result_id) as reader:
118+
for batch in reader:
119+
process(batch)
120+
121+
:raises ResultNotReadyError: result is still pending or processing.
122+
:raises hotdata.exceptions.ApiException: for other HTTP errors.
123+
"""
124+
ipc = _import_pyarrow()
125+
response = self._call_arrow(id=id, offset=offset, limit=limit,
126+
_request_timeout=_request_timeout)
127+
try:
128+
yield ipc.open_stream(response)
129+
finally:
130+
response.release_conn()
131+
132+
def _call_arrow(
133+
self,
134+
*,
135+
id: str,
136+
offset: Optional[int],
137+
limit: Optional[int],
138+
_request_timeout: Any,
139+
) -> Any:
140+
# Build the request via the generator's private serialize helper so
141+
# path/query/auth handling stays in lockstep with the generated client.
142+
# Override only what we need: the Accept header and the format query.
143+
headers: Dict[str, Any] = {"Accept": ARROW_STREAM_MEDIA_TYPE}
144+
params = self._get_result_serialize(
145+
id=id,
146+
offset=offset,
147+
limit=limit,
148+
format=ResultsFormatQuery.ARROW,
149+
_request_auth=None,
150+
_content_type=None,
151+
_headers=headers,
152+
_host_index=0,
153+
)
154+
response_data = self.api_client.call_api(
155+
*params,
156+
_request_timeout=_request_timeout,
157+
)
158+
159+
if response_data.status == 200:
160+
# Hand the raw urllib3.HTTPResponse to the caller. preload_content
161+
# was False on the way in, so the body has not been consumed.
162+
return response_data.response
163+
164+
# Non-200: drain, deserialize as JSON, then raise. response_deserialize
165+
# raises ApiException for status >= 400 itself; only 202 falls through.
166+
try:
167+
response_data.read()
168+
body = self.api_client.response_deserialize(
169+
response_data=response_data,
170+
response_types_map={
171+
"202": "GetResultResponse",
172+
"400": "ApiErrorResponse",
173+
"404": "ApiErrorResponse",
174+
"409": "GetResultResponse",
175+
},
176+
).data
177+
finally:
178+
response_data.response.release_conn()
179+
180+
raise ResultNotReadyError(
181+
status=getattr(body, "status", "pending"),
182+
result_id=getattr(body, "result_id", id),
183+
)
184+
185+
186+
__all__ = [
187+
"ARROW_STREAM_MEDIA_TYPE",
188+
"ResultNotReadyError",
189+
"ResultsApi",
190+
]

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ dependencies = [
1717
"typing-extensions (>=4.7.1)",
1818
]
1919

20+
[project.optional-dependencies]
21+
arrow = ["pyarrow >= 14"]
22+
2023
[project.urls]
2124
Homepage = "https://www.hotdata.dev"
2225
Repository = "https://github.com/hotdata-dev/sdk-python"
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
"""Scenario: results_arrow.
2+
3+
Submit a small query, poll until the result is ready, then fetch the result
4+
as a pyarrow.Table via hotdata.arrow.ResultsApi.get_result_arrow. Verifies
5+
that Arrow IPC content negotiation works end-to-end and that the streaming
6+
variant yields the same data.
7+
8+
Skipped if pyarrow is not installed (the helper requires the ``arrow`` extra).
9+
"""
10+
11+
from __future__ import annotations
12+
13+
import time
14+
15+
import pytest
16+
17+
pa = pytest.importorskip("pyarrow")
18+
19+
from hotdata.api.query_api import QueryApi
20+
from hotdata.api.query_runs_api import QueryRunsApi
21+
from hotdata.arrow import ResultsApi
22+
from hotdata.models.query_request import QueryRequest
23+
24+
25+
TERMINAL_STATUSES = {"succeeded", "failed", "cancelled"}
26+
POLL_TIMEOUT_S = 60.0
27+
POLL_INTERVAL_S = 1.0
28+
29+
30+
@pytest.fixture
31+
def query_api(api_client) -> QueryApi:
32+
return QueryApi(api_client)
33+
34+
35+
@pytest.fixture
36+
def query_runs_api(api_client) -> QueryRunsApi:
37+
return QueryRunsApi(api_client)
38+
39+
40+
@pytest.fixture
41+
def results_api(api_client) -> ResultsApi:
42+
return ResultsApi(api_client)
43+
44+
45+
def test_results_arrow(
46+
query_api: QueryApi,
47+
query_runs_api: QueryRunsApi,
48+
results_api: ResultsApi,
49+
) -> None:
50+
submitted = query_api.query(
51+
QueryRequest(
52+
var_async=True,
53+
async_after_ms=1000,
54+
sql="SELECT 1 AS x, 'hello' AS msg UNION ALL SELECT 2, 'world'",
55+
)
56+
)
57+
query_run_id = submitted.query_run_id
58+
assert query_run_id
59+
60+
deadline = time.monotonic() + POLL_TIMEOUT_S
61+
run = None
62+
while time.monotonic() < deadline:
63+
run = query_runs_api.get_query_run(query_run_id)
64+
if run.status in TERMINAL_STATUSES:
65+
break
66+
time.sleep(POLL_INTERVAL_S)
67+
assert run is not None
68+
assert run.status == "succeeded", (
69+
f"expected succeeded, got {run.status}: {run.error_message}"
70+
)
71+
assert run.result_id, "succeeded run must expose a result_id"
72+
result_id = run.result_id
73+
74+
# Wait for ready before fetching as Arrow — get_result_arrow raises
75+
# ResultNotReadyError on 202.
76+
deadline = time.monotonic() + POLL_TIMEOUT_S
77+
while time.monotonic() < deadline:
78+
result = results_api.get_result(result_id)
79+
if result.status == "ready":
80+
break
81+
time.sleep(POLL_INTERVAL_S)
82+
else:
83+
pytest.fail(f"result {result_id} never became ready")
84+
85+
# Buffered: returns a full pyarrow.Table.
86+
table = results_api.get_result_arrow(result_id)
87+
assert isinstance(table, pa.Table)
88+
assert table.num_rows == 2
89+
assert set(table.column_names) == {"x", "msg"}
90+
assert table.column("x").to_pylist() == [1, 2]
91+
assert table.column("msg").to_pylist() == ["hello", "world"]
92+
93+
# Streaming: same data via RecordBatchStreamReader.
94+
with results_api.stream_result_arrow(result_id) as reader:
95+
streamed = pa.Table.from_batches(list(reader), schema=reader.schema)
96+
assert streamed.equals(table)
97+
98+
# Pagination forwards correctly.
99+
sliced = results_api.get_result_arrow(result_id, offset=1, limit=1)
100+
assert sliced.num_rows == 1
101+
assert sliced.column("x").to_pylist() == [2]

0 commit comments

Comments
 (0)