Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ dist/
build/
.coverage
htmlcov/

.DS_Store
51 changes: 43 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ uv pip install ibis-hotdata
# or: python -m pip install ibis-hotdata
```

## Features

- **Ibis connection API** — connect with `ibis.hotdata.connect(...)` or `ibis.connect("hotdata://...")`.
- **Hotdata catalog mapping** — expose Hotdata connections, schemas, and tables through Ibis catalogs, databases, and tables.
- **SQL-backed expression execution** — compile Ibis expressions with the Postgres SQLGlot compiler and execute them through Hotdata query APIs.
- **Typed table discovery** — load schema metadata from Hotdata information schema and map SQL types into Ibis types.
- **Arrow and pandas results** — materialize expressions as pandas DataFrames, PyArrow tables, or local Arrow record batches.
- **Raw SQL escape hatch** — use `con.sql(..., dialect="postgres")` when Hotdata-specific federated SQL is clearer than modeled Ibis expressions.
- **Dataset upload helpers** — upload local pandas or PyArrow data as Parquet-backed Hotdata datasets through `create_table`.
- **Dataset cleanup** — delete Hotdata-managed datasets through the limited `drop_table` implementation.

## Connect

Programmatic API:
Expand Down Expand Up @@ -44,32 +55,56 @@ con = ibis.connect(

**Execution:** SQL is compiled with Ibis’s **Postgres** SQLGlot compiler. The client submits queries asynchronously with `POST /v1/query`, polls `GET /v1/query-runs/{id}`, then downloads ready results as Arrow IPC from `GET /v1/results/{id}`. Tuning: `poll_interval_s`, `poll_timeout_s` on `connect()`.

**Types:** Typed tables come from Hotdata’s information schema. `con.sql(...)` types are inferred from a small preview query; see [Hotdata SQL](https://www.hotdata.dev/docs/sql) for server behavior.
**Types:** Typed tables come from Hotdata’s information schema. `con.sql(...)` types are inferred from a small preview query and Arrow schema; see [Hotdata SQL](https://www.hotdata.dev/docs/sql) for server behavior.

## Ibis Support Overview

`ibis-hotdata` is a read-oriented SQL backend. It is useful for exploring Hotdata workspaces with Ibis expressions, running federated SQL, and materializing results locally, but it is not a full mutable database backend.

Supported today:

- **Connection setup:** `ibis.hotdata.connect(...)` and `ibis.connect("hotdata://...")` with token, workspace, optional sandbox session, TLS, timeout, and polling settings.
- **Catalog discovery:** `list_catalogs`, `list_databases`, `list_tables`, `current_catalog`, and `current_database` map Hotdata connections and remote schemas into Ibis' catalog/database/table hierarchy.
- **Table schemas:** `con.table(...)` uses Hotdata information schema column metadata and maps SQL types through Ibis' Postgres type parser.
- **SQL-backed expressions:** Ibis expressions compile with the Postgres SQLGlot compiler and execute through Hotdata. Common `SELECT` workloads such as projection, filtering, joins, grouping, aggregation, ordering, limits, scalar expressions, and `con.sql(...)` work when the generated SQL is accepted by Hotdata.
- **Result materialization:** `.execute()` returns pandas objects. `.to_pyarrow()` and `.to_pyarrow_batches()` use the Arrow IPC result data exposed by Hotdata without converting through JSON rows; batches are split locally after the result is downloaded.
- **Raw SQL escape hatch:** `con.sql("SELECT ...", dialect="postgres")` is the most reliable way to use Hotdata-specific federated table names or SQL that Ibis does not model directly.
- **Uploads and datasets:** `upload_file` plus `create_dataset_from_upload` can create Hotdata datasets; query them as `datasets.<schema>.<table>` after creation.
- **Limited `create_table`:** `con.create_table("name", pandas_df)` and `con.create_table("name", pyarrow_table)` serialize local data to Parquet, upload it, and create a Hotdata dataset. Schema-only calls create an empty Parquet-backed dataset.
- **Dataset-only `drop_table`:** `con.drop_table(...)` deletes matching Hotdata-managed datasets. It does not drop external source tables.

Not supported as full Ibis backend features:

**Not in v1:** Ibis `create_table`, embeddings, indexes. **Uploads:** use `upload_file` + `create_dataset_from_upload` on the connection object (or raw SQL); query datasets as `datasets.<schema>.<table>` per Hotdata.
- **General DDL and mutations:** Arbitrary remote `CREATE TABLE` / `DROP TABLE`, inserts, updates, deletes, overwrites, and schema-altering operations are not implemented. `create_table` and `drop_table` are limited to Hotdata datasets as described above.
- **Temporary tables and in-memory registration:** `supports_temporary_tables` is false, and in-memory tables are not uploaded automatically for joins.
- **Python UDFs:** `supports_python_udfs` is false.
- **Transactions and sessions as database state:** Hotdata sandbox sessions can be passed as `session_id`, but the backend does not expose transaction APIs.
- **Backend-native SQL dialect:** Compilation uses Ibis' Postgres dialect as the closest fit. Hotdata SQL and federation rules are authoritative, so not every Ibis expression that compiles is guaranteed to execute remotely.
- **Complete Ibis compliance:** The backend is experimental and has focused test coverage for connection, discovery, schema mapping, execution, uploads, and Arrow results. It has not yet been validated against the full Ibis backend test suite.
- **Hotdata platform APIs beyond SQL datasets:** embeddings, indexes, query history management, sandbox lifecycle management, and other Hotdata-specific APIs are outside the Ibis backend surface.

## Development

```bash
uv sync --group dev # pytest, ruff, httpx (for examples)
uv sync # installs dev group by default (pytest, ruff, httpx for examples)
uv run pytest
uv run ruff check src tests examples
```

Lockfile CI: `uv sync --locked --group dev && uv run pytest`.
Lockfile CI: `uv sync --locked && uv run pytest`.

## TPC-H for the examples

Examples assume something like **`tpch.tpch_sf1.customer`**. Provision TPC-H in your workspace (commonly a **DuckDB** connection, then DuckDB’s `tpch` extension and `CALL dbgen(sf = 1)` — see [DuckDB TPC-H](https://www.duckdb.org/docs/current/core_extensions/tpch.html) and [Hotdata Quick Start](https://www.hotdata.dev/docs/quick-start)). If your data lives under `main` instead, pass `--default-schema` / `--default-connection` or set `HOTDATA_DEFAULT_*` (see `examples/_helpers.py`).

## Examples

Needs `HOTDATA_TOKEN` and `HOTDATA_WORKSPACE_ID`.
Needs `HOTDATA_API_KEY` and `HOTDATA_WORKSPACE`.

```bash
uv sync --group dev
export HOTDATA_TOKEN=…
export HOTDATA_WORKSPACE_ID=…
uv sync
export HOTDATA_API_KEY=…
export HOTDATA_WORKSPACE=…
uv run python examples/01_catalog_introspection.py
uv run python examples/02_execute_sql.py 'SELECT COUNT(*) AS n FROM tpch.tpch_sf1.customer'
uv run python examples/03_connect_via_url.py
Expand Down
2 changes: 1 addition & 1 deletion examples/02_execute_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

From the repo root::

HOTDATA_TOKEN=... HOTDATA_WORKSPACE_ID=... \\
HOTDATA_API_KEY=... HOTDATA_WORKSPACE=... \\
uv run python examples/02_execute_sql.py \\
'SELECT COUNT(*) AS n FROM tpch.tpch_sf1.customer'

Expand Down
2 changes: 1 addition & 1 deletion examples/04_ibis_table_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

From the repo root::

HOTDATA_TOKEN=... HOTDATA_WORKSPACE_ID=... \\
HOTDATA_API_KEY=... HOTDATA_WORKSPACE=... \\
uv run python examples/04_ibis_table_workflows.py
"""

Expand Down
10 changes: 5 additions & 5 deletions examples/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,14 @@ def parser(description: str) -> argparse.ArgumentParser:
)
p.add_argument(
"--token",
default=os.environ.get("HOTDATA_TOKEN", ""),
help="API bearer token (env HOTDATA_TOKEN)",
default=os.environ.get("HOTDATA_API_KEY", ""),
help="API bearer token (env HOTDATA_API_KEY)",
)
p.add_argument(
"--workspace",
dest="workspace_id",
default=os.environ.get("HOTDATA_WORKSPACE_ID", ""),
help="Workspace public id (env HOTDATA_WORKSPACE_ID)",
default=os.environ.get("HOTDATA_WORKSPACE", ""),
help="Workspace public id (env HOTDATA_WORKSPACE)",
)
p.add_argument(
"--session",
Expand Down Expand Up @@ -194,7 +194,7 @@ def parser(description: str) -> argparse.ArgumentParser:
def parsed_args(parser: argparse.ArgumentParser) -> argparse.Namespace:
ns = parser.parse_args()
if not ns.token.strip() or not ns.workspace_id.strip():
parser.error("Set HOTDATA_TOKEN and HOTDATA_WORKSPACE_ID, or pass --token and --workspace.")
parser.error("Set HOTDATA_API_KEY and HOTDATA_WORKSPACE, or pass --token and --workspace.")
normalize_tpch_defaults(ns)
return ns

Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "ibis-hotdata"
version = "0.1.0"
description = "Ibis backend for Hotdata federated SQL API"
description = "Ibis backend for Hotdata federated SQL API (depends on the hotdata SDK only; not hotdata-runtime)"
readme = "README.md"
requires-python = ">=3.10"
license = { text = "Apache-2.0" }
Expand Down Expand Up @@ -39,6 +39,9 @@ dev = [
"ruff>=0.5",
]

[tool.uv]
default-groups = ["dev"]

[project.urls]
Documentation = "https://www.hotdata.dev/docs/api-reference"
"Ibis" = "https://ibis-project.org/"
Expand Down
9 changes: 8 additions & 1 deletion src/ibis_hotdata/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
from importlib.metadata import PackageNotFoundError, version

try:
__version__ = version("ibis-hotdata")
except PackageNotFoundError:
__version__ = "0.0.0+unknown"

from ibis_hotdata.backend import Backend

__all__ = ["Backend"]
__all__ = ["Backend", "__version__"]
171 changes: 163 additions & 8 deletions src/ibis_hotdata/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from __future__ import annotations

import contextlib
import io
from collections.abc import Iterable, Mapping
from functools import cached_property
from importlib.metadata import PackageNotFoundError
Expand Down Expand Up @@ -319,6 +320,45 @@ def _iterate_information_schema(
if cursor is None:
break

def _iterate_datasets(self) -> Iterable[dict[str, Any]]:
offset = 0
limit = 1000
while True:
chunk = self._http.list_datasets(limit=limit, offset=offset)
yield from chunk["datasets"]
if not chunk.get("has_more"):
break
offset += limit

def _dataset_database(self, database: tuple[str, str] | str | None) -> str | None:
if database is None:
return None
table_loc = self._to_sqlglot_table(database)
catalog, schema_name = self._to_catalog_db_tuple(table_loc)
if catalog and catalog != "datasets":
return "__not_datasets__"
return schema_name or catalog

def _find_dataset(
self, table_name: str, database: tuple[str, str] | str | None
) -> dict[str, Any]:
schema_name = self._dataset_database(database)
if schema_name == "__not_datasets__":
raise com.TableNotFound(table_name)
matches = [
ds
for ds in self._iterate_datasets()
if ds["table_name"] == table_name
and (schema_name is None or ds["schema_name"] == schema_name)
]
if not matches:
raise com.TableNotFound(table_name)
if len(matches) > 1:
raise com.IbisInputError(
f"Multiple Hotdata datasets named {table_name!r}; pass database=('datasets', schema)."
)
return matches[0]
Comment on lines +333 to +360
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: (not blocking) The "__not_datasets__" magic string is fragile — if a Hotdata schema is ever literally named __not_datasets__, lookups would silently misbehave. More importantly, when the sentinel is set, _find_dataset still iterates every page of list_datasets() only to discard each row against schema_name != "__not_datasets__". Consider short-circuiting before the iteration (e.g., raise TableNotFound directly in _find_dataset when the catalog isn't "datasets") which avoids both the sentinel and the unnecessary network calls — see test_drop_table_raises_for_non_dataset_catalog where the dataset list endpoint is hit needlessly.


# --- schema / sql execution --------------------------------------------

def get_schema(
Expand Down Expand Up @@ -391,10 +431,58 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
df = cursor.to_pandas()
return PandasData.convert_table(df, schema)

def upload_file(self, data: bytes) -> dict[str, Any]:
def to_pyarrow(
self,
expr: ir.Expr,
/,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
**kwargs: Any,
):
self._run_pre_execute_hooks(expr)
table_expr = expr.as_table()
sql = self.compile(table_expr, params=params, limit=limit, **kwargs)
try:
payload = self._http.execute_query(
sql,
poll_interval_s=self._poll_interval_s,
poll_timeout_s=self._poll_timeout_s,
)
except HotdataAPIError as exc:
raise _ibis_err_from_hotdata(exc) from exc
table = payload["pa_table"]
arrow_schema = table_expr.schema().to_pyarrow()
table = table.rename_columns(list(table_expr.columns)).cast(arrow_schema)
return expr.__pyarrow_result__(table)

def to_pyarrow_batches(
self,
expr: ir.Expr,
/,
*,
params: Mapping[ir.Scalar, Any] | None = None,
limit: int | str | None = None,
chunk_size: int = 1_000_000,
**kwargs: Any,
):
"""Execute to Arrow and expose local record batches.

Hotdata currently returns one Arrow IPC result for the full query. This
method downloads that result first, then splits it into local batches.
"""
import pyarrow as pa

table = self.to_pyarrow(expr.as_table(), params=params, limit=limit, **kwargs)
return pa.ipc.RecordBatchReader.from_batches(
table.schema,
table.to_batches(max_chunksize=chunk_size),
)
Comment on lines +459 to +480
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: (not blocking) to_pyarrow_batches fully materializes the result via to_pyarrow before slicing into batches, so it offers no memory advantage over to_pyarrow for large results — chunk_size only affects batch granularity, not peak memory. The README phrasing ("use the Arrow IPC result data exposed by Hotdata without converting through JSON rows") is accurate, but users familiar with other backends may assume this method streams. Worth a brief docstring noting the in-memory materialization.


def upload_file(self, data: bytes, *, content_type: str | None = None) -> dict[str, Any]:
"""POST ``/v1/files``; returns the upload record (use ``id`` with :meth:`create_dataset_from_upload`)."""
try:
return self._http.upload_file(data)
return self._http.upload_file(data, content_type=content_type)
except HotdataAPIError as exc:
raise _ibis_err_from_hotdata(exc) from exc

Expand All @@ -421,14 +509,81 @@ def create_dataset_from_upload(
except HotdataAPIError as exc:
raise _ibis_err_from_hotdata(exc) from exc

def create_table(self, *_args: Any, **_kwargs: Any) -> ir.Table:
raise NotImplementedError(
"Hotdata does not implement Ibis create_table in v1; use upload_file + "
"create_dataset_from_upload, then SQL or con.table with the returned names."
def _local_table_to_parquet(self, obj: Any, schema: sch.Schema | None):
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

if obj is not None and schema is not None:
raise com.IbisInputError("create_table accepts only one of obj or schema")

if obj is None:
if schema is None:
raise com.IbisInputError("create_table requires a pandas/pyarrow object or schema")
arrow_schema = schema.to_pyarrow()
table = pa.Table.from_arrays(
[pa.array([], type=field.type) for field in arrow_schema],
schema=arrow_schema,
)
elif isinstance(obj, pa.Table):
table = obj
elif isinstance(obj, pd.DataFrame):
table = pa.Table.from_pandas(obj, preserve_index=False)
else:
raise com.IbisInputError(
"create_table currently accepts pandas.DataFrame or pyarrow.Table"
)
Comment on lines +512 to +535
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: (not blocking) When both obj and schema are provided, schema is silently ignored. A user passing create_table("x", df, schema=...) would reasonably expect the schema to be applied (cast/validate). Either raise on the combo or cast obj to the requested schema before serializing.


sink = io.BytesIO()
pq.write_table(table, sink)
return sink.getvalue()

def create_table(
self,
name: str,
/,
obj: Any = None,
*,
schema: sch.Schema | None = None,
database: str | None = None,
temp: bool = False,
overwrite: bool = False,
) -> ir.Table:
if temp:
raise NotImplementedError("Hotdata does not support temporary tables.")
if overwrite:
raise NotImplementedError("Hotdata create_table does not support overwrite.")
if database is not None:
raise NotImplementedError("Hotdata datasets choose their schema at creation time.")

data = self._local_table_to_parquet(obj, schema)
upload = self.upload_file(data, content_type="application/parquet")
dataset = self.create_dataset_from_upload(
upload_id=upload["id"],
label=name,
table_name=name,
file_format="parquet",
)
return self.table(dataset["table_name"], database=("datasets", dataset["schema_name"]))

def drop_table(self, *_args: Any, **_kwargs: Any) -> None:
raise NotImplementedError("Hotdata backend does not implement drop_table in v1.")
def drop_table(
self,
name: str,
/,
*,
database: tuple[str, str] | str | None = None,
force: bool = False,
) -> None:
try:
dataset = self._find_dataset(name, database)
except com.TableNotFound:
if force:
return
raise
try:
self._http.delete_dataset(dataset["id"])
except HotdataAPIError as exc:
raise _ibis_err_from_hotdata(exc) from exc

def _register_in_memory_table(self, _op: ops.InMemoryTable) -> None:
return
Expand Down
14 changes: 12 additions & 2 deletions src/ibis_hotdata/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,20 @@ def execute_query(
raise HotdataAPIError("Timeout waiting for asynchronous query")
raise HotdataAPIError("Unexpected query response type")

def upload_file(self, data: bytes) -> dict[str, Any]:
resp = self._safe_call(self._uploads.upload_file, data)
def upload_file(self, data: bytes, *, content_type: str | None = None) -> dict[str, Any]:
kwargs: dict[str, Any] = {}
if content_type is not None:
kwargs["_content_type"] = content_type
resp = self._safe_call(self._uploads.upload_file, data, **kwargs)
return resp.model_dump(by_alias=True, mode="json")

def list_datasets(self, *, limit: int = 1000, offset: int = 0) -> dict[str, Any]:
resp = self._safe_call(self._datasets.list_datasets, limit=limit, offset=offset)
return resp.model_dump(by_alias=True, mode="json")

def delete_dataset(self, dataset_id: str) -> None:
self._safe_call(self._datasets.delete_dataset, dataset_id)

def create_dataset_from_upload(
self,
*,
Expand Down
Loading
Loading