diff --git a/.gitignore b/.gitignore index 4750470..5e83f2e 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ dist/ build/ .coverage htmlcov/ + +.DS_Store diff --git a/README.md b/README.md index 7c85c93..fc95075 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,17 @@ uv pip install ibis-hotdata # or: python -m pip install ibis-hotdata ``` +## Features + +- **Ibis connection API** — connect with `ibis.hotdata.connect(...)` or `ibis.connect("hotdata://...")`. +- **Hotdata catalog mapping** — expose Hotdata connections, schemas, and tables through Ibis catalogs, databases, and tables. +- **SQL-backed expression execution** — compile Ibis expressions with the Postgres SQLGlot compiler and execute them through Hotdata query APIs. +- **Typed table discovery** — load schema metadata from Hotdata information schema and map SQL types into Ibis types. +- **Arrow and pandas results** — materialize expressions as pandas DataFrames, PyArrow tables, or local Arrow record batches. +- **Raw SQL escape hatch** — use `con.sql(..., dialect="postgres")` when Hotdata-specific federated SQL is clearer than modeled Ibis expressions. +- **Dataset upload helpers** — upload local pandas or PyArrow data as Parquet-backed Hotdata datasets through `create_table`. +- **Dataset cleanup** — delete Hotdata-managed datasets through the limited `drop_table` implementation. + ## Connect Programmatic API: @@ -44,19 +55,43 @@ con = ibis.connect( **Execution:** SQL is compiled with Ibis’s **Postgres** SQLGlot compiler. The client submits queries asynchronously with `POST /v1/query`, polls `GET /v1/query-runs/{id}`, then downloads ready results as Arrow IPC from `GET /v1/results/{id}`. Tuning: `poll_interval_s`, `poll_timeout_s` on `connect()`. -**Types:** Typed tables come from Hotdata’s information schema. `con.sql(...)` types are inferred from a small preview query; see [Hotdata SQL](https://www.hotdata.dev/docs/sql) for server behavior. +**Types:** Typed tables come from Hotdata’s information schema. `con.sql(...)` types are inferred from a small preview query and Arrow schema; see [Hotdata SQL](https://www.hotdata.dev/docs/sql) for server behavior. + +## Ibis Support Overview + +`ibis-hotdata` is a read-oriented SQL backend. It is useful for exploring Hotdata workspaces with Ibis expressions, running federated SQL, and materializing results locally, but it is not a full mutable database backend. + +Supported today: + +- **Connection setup:** `ibis.hotdata.connect(...)` and `ibis.connect("hotdata://...")` with token, workspace, optional sandbox session, TLS, timeout, and polling settings. +- **Catalog discovery:** `list_catalogs`, `list_databases`, `list_tables`, `current_catalog`, and `current_database` map Hotdata connections and remote schemas into Ibis' catalog/database/table hierarchy. +- **Table schemas:** `con.table(...)` uses Hotdata information schema column metadata and maps SQL types through Ibis' Postgres type parser. +- **SQL-backed expressions:** Ibis expressions compile with the Postgres SQLGlot compiler and execute through Hotdata. Common `SELECT` workloads such as projection, filtering, joins, grouping, aggregation, ordering, limits, scalar expressions, and `con.sql(...)` work when the generated SQL is accepted by Hotdata. +- **Result materialization:** `.execute()` returns pandas objects. `.to_pyarrow()` and `.to_pyarrow_batches()` use the Arrow IPC result data exposed by Hotdata without converting through JSON rows; batches are split locally after the result is downloaded. +- **Raw SQL escape hatch:** `con.sql("SELECT ...", dialect="postgres")` is the most reliable way to use Hotdata-specific federated table names or SQL that Ibis does not model directly. +- **Uploads and datasets:** `upload_file` plus `create_dataset_from_upload` can create Hotdata datasets; query them as `datasets..` after creation. +- **Limited `create_table`:** `con.create_table("name", pandas_df)` and `con.create_table("name", pyarrow_table)` serialize local data to Parquet, upload it, and create a Hotdata dataset. Schema-only calls create an empty Parquet-backed dataset. +- **Dataset-only `drop_table`:** `con.drop_table(...)` deletes matching Hotdata-managed datasets. It does not drop external source tables. + +Not supported as full Ibis backend features: -**Not in v1:** Ibis `create_table`, embeddings, indexes. **Uploads:** use `upload_file` + `create_dataset_from_upload` on the connection object (or raw SQL); query datasets as `datasets..
` per Hotdata. +- **General DDL and mutations:** Arbitrary remote `CREATE TABLE` / `DROP TABLE`, inserts, updates, deletes, overwrites, and schema-altering operations are not implemented. `create_table` and `drop_table` are limited to Hotdata datasets as described above. +- **Temporary tables and in-memory registration:** `supports_temporary_tables` is false, and in-memory tables are not uploaded automatically for joins. +- **Python UDFs:** `supports_python_udfs` is false. +- **Transactions and sessions as database state:** Hotdata sandbox sessions can be passed as `session_id`, but the backend does not expose transaction APIs. +- **Backend-native SQL dialect:** Compilation uses Ibis' Postgres dialect as the closest fit. Hotdata SQL and federation rules are authoritative, so not every Ibis expression that compiles is guaranteed to execute remotely. +- **Complete Ibis compliance:** The backend is experimental and has focused test coverage for connection, discovery, schema mapping, execution, uploads, and Arrow results. It has not yet been validated against the full Ibis backend test suite. +- **Hotdata platform APIs beyond SQL datasets:** embeddings, indexes, query history management, sandbox lifecycle management, and other Hotdata-specific APIs are outside the Ibis backend surface. ## Development ```bash -uv sync --group dev # pytest, ruff, httpx (for examples) +uv sync # installs dev group by default (pytest, ruff, httpx for examples) uv run pytest uv run ruff check src tests examples ``` -Lockfile CI: `uv sync --locked --group dev && uv run pytest`. +Lockfile CI: `uv sync --locked && uv run pytest`. ## TPC-H for the examples @@ -64,12 +99,12 @@ Examples assume something like **`tpch.tpch_sf1.customer`**. Provision TPC-H in ## Examples -Needs `HOTDATA_TOKEN` and `HOTDATA_WORKSPACE_ID`. +Needs `HOTDATA_API_KEY` and `HOTDATA_WORKSPACE`. ```bash -uv sync --group dev -export HOTDATA_TOKEN=… -export HOTDATA_WORKSPACE_ID=… +uv sync +export HOTDATA_API_KEY=… +export HOTDATA_WORKSPACE=… uv run python examples/01_catalog_introspection.py uv run python examples/02_execute_sql.py 'SELECT COUNT(*) AS n FROM tpch.tpch_sf1.customer' uv run python examples/03_connect_via_url.py diff --git a/examples/02_execute_sql.py b/examples/02_execute_sql.py index 308217d..70ecad9 100644 --- a/examples/02_execute_sql.py +++ b/examples/02_execute_sql.py @@ -4,7 +4,7 @@ From the repo root:: - HOTDATA_TOKEN=... HOTDATA_WORKSPACE_ID=... \\ + HOTDATA_API_KEY=... HOTDATA_WORKSPACE=... \\ uv run python examples/02_execute_sql.py \\ 'SELECT COUNT(*) AS n FROM tpch.tpch_sf1.customer' diff --git a/examples/04_ibis_table_workflows.py b/examples/04_ibis_table_workflows.py index 7dfd1b3..e83bc14 100644 --- a/examples/04_ibis_table_workflows.py +++ b/examples/04_ibis_table_workflows.py @@ -8,7 +8,7 @@ From the repo root:: - HOTDATA_TOKEN=... HOTDATA_WORKSPACE_ID=... \\ + HOTDATA_API_KEY=... HOTDATA_WORKSPACE=... \\ uv run python examples/04_ibis_table_workflows.py """ diff --git a/examples/_helpers.py b/examples/_helpers.py index 8b9e887..7073559 100644 --- a/examples/_helpers.py +++ b/examples/_helpers.py @@ -155,14 +155,14 @@ def parser(description: str) -> argparse.ArgumentParser: ) p.add_argument( "--token", - default=os.environ.get("HOTDATA_TOKEN", ""), - help="API bearer token (env HOTDATA_TOKEN)", + default=os.environ.get("HOTDATA_API_KEY", ""), + help="API bearer token (env HOTDATA_API_KEY)", ) p.add_argument( "--workspace", dest="workspace_id", - default=os.environ.get("HOTDATA_WORKSPACE_ID", ""), - help="Workspace public id (env HOTDATA_WORKSPACE_ID)", + default=os.environ.get("HOTDATA_WORKSPACE", ""), + help="Workspace public id (env HOTDATA_WORKSPACE)", ) p.add_argument( "--session", @@ -194,7 +194,7 @@ def parser(description: str) -> argparse.ArgumentParser: def parsed_args(parser: argparse.ArgumentParser) -> argparse.Namespace: ns = parser.parse_args() if not ns.token.strip() or not ns.workspace_id.strip(): - parser.error("Set HOTDATA_TOKEN and HOTDATA_WORKSPACE_ID, or pass --token and --workspace.") + parser.error("Set HOTDATA_API_KEY and HOTDATA_WORKSPACE, or pass --token and --workspace.") normalize_tpch_defaults(ns) return ns diff --git a/pyproject.toml b/pyproject.toml index a7bff98..fc77aad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "ibis-hotdata" version = "0.1.0" -description = "Ibis backend for Hotdata federated SQL API" +description = "Ibis backend for Hotdata federated SQL API (depends on the hotdata SDK only; not hotdata-runtime)" readme = "README.md" requires-python = ">=3.10" license = { text = "Apache-2.0" } @@ -39,6 +39,9 @@ dev = [ "ruff>=0.5", ] +[tool.uv] +default-groups = ["dev"] + [project.urls] Documentation = "https://www.hotdata.dev/docs/api-reference" "Ibis" = "https://ibis-project.org/" diff --git a/src/ibis_hotdata/__init__.py b/src/ibis_hotdata/__init__.py index 5098e2b..21570c2 100644 --- a/src/ibis_hotdata/__init__.py +++ b/src/ibis_hotdata/__init__.py @@ -1,3 +1,10 @@ +from importlib.metadata import PackageNotFoundError, version + +try: + __version__ = version("ibis-hotdata") +except PackageNotFoundError: + __version__ = "0.0.0+unknown" + from ibis_hotdata.backend import Backend -__all__ = ["Backend"] +__all__ = ["Backend", "__version__"] diff --git a/src/ibis_hotdata/backend.py b/src/ibis_hotdata/backend.py index 7737f2d..f7ca286 100644 --- a/src/ibis_hotdata/backend.py +++ b/src/ibis_hotdata/backend.py @@ -16,6 +16,7 @@ from __future__ import annotations import contextlib +import io from collections.abc import Iterable, Mapping from functools import cached_property from importlib.metadata import PackageNotFoundError @@ -319,6 +320,45 @@ def _iterate_information_schema( if cursor is None: break + def _iterate_datasets(self) -> Iterable[dict[str, Any]]: + offset = 0 + limit = 1000 + while True: + chunk = self._http.list_datasets(limit=limit, offset=offset) + yield from chunk["datasets"] + if not chunk.get("has_more"): + break + offset += limit + + def _dataset_database(self, database: tuple[str, str] | str | None) -> str | None: + if database is None: + return None + table_loc = self._to_sqlglot_table(database) + catalog, schema_name = self._to_catalog_db_tuple(table_loc) + if catalog and catalog != "datasets": + return "__not_datasets__" + return schema_name or catalog + + def _find_dataset( + self, table_name: str, database: tuple[str, str] | str | None + ) -> dict[str, Any]: + schema_name = self._dataset_database(database) + if schema_name == "__not_datasets__": + raise com.TableNotFound(table_name) + matches = [ + ds + for ds in self._iterate_datasets() + if ds["table_name"] == table_name + and (schema_name is None or ds["schema_name"] == schema_name) + ] + if not matches: + raise com.TableNotFound(table_name) + if len(matches) > 1: + raise com.IbisInputError( + f"Multiple Hotdata datasets named {table_name!r}; pass database=('datasets', schema)." + ) + return matches[0] + # --- schema / sql execution -------------------------------------------- def get_schema( @@ -391,10 +431,58 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: df = cursor.to_pandas() return PandasData.convert_table(df, schema) - def upload_file(self, data: bytes) -> dict[str, Any]: + def to_pyarrow( + self, + expr: ir.Expr, + /, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + **kwargs: Any, + ): + self._run_pre_execute_hooks(expr) + table_expr = expr.as_table() + sql = self.compile(table_expr, params=params, limit=limit, **kwargs) + try: + payload = self._http.execute_query( + sql, + poll_interval_s=self._poll_interval_s, + poll_timeout_s=self._poll_timeout_s, + ) + except HotdataAPIError as exc: + raise _ibis_err_from_hotdata(exc) from exc + table = payload["pa_table"] + arrow_schema = table_expr.schema().to_pyarrow() + table = table.rename_columns(list(table_expr.columns)).cast(arrow_schema) + return expr.__pyarrow_result__(table) + + def to_pyarrow_batches( + self, + expr: ir.Expr, + /, + *, + params: Mapping[ir.Scalar, Any] | None = None, + limit: int | str | None = None, + chunk_size: int = 1_000_000, + **kwargs: Any, + ): + """Execute to Arrow and expose local record batches. + + Hotdata currently returns one Arrow IPC result for the full query. This + method downloads that result first, then splits it into local batches. + """ + import pyarrow as pa + + table = self.to_pyarrow(expr.as_table(), params=params, limit=limit, **kwargs) + return pa.ipc.RecordBatchReader.from_batches( + table.schema, + table.to_batches(max_chunksize=chunk_size), + ) + + def upload_file(self, data: bytes, *, content_type: str | None = None) -> dict[str, Any]: """POST ``/v1/files``; returns the upload record (use ``id`` with :meth:`create_dataset_from_upload`).""" try: - return self._http.upload_file(data) + return self._http.upload_file(data, content_type=content_type) except HotdataAPIError as exc: raise _ibis_err_from_hotdata(exc) from exc @@ -421,14 +509,81 @@ def create_dataset_from_upload( except HotdataAPIError as exc: raise _ibis_err_from_hotdata(exc) from exc - def create_table(self, *_args: Any, **_kwargs: Any) -> ir.Table: - raise NotImplementedError( - "Hotdata does not implement Ibis create_table in v1; use upload_file + " - "create_dataset_from_upload, then SQL or con.table with the returned names." + def _local_table_to_parquet(self, obj: Any, schema: sch.Schema | None): + import pandas as pd + import pyarrow as pa + import pyarrow.parquet as pq + + if obj is not None and schema is not None: + raise com.IbisInputError("create_table accepts only one of obj or schema") + + if obj is None: + if schema is None: + raise com.IbisInputError("create_table requires a pandas/pyarrow object or schema") + arrow_schema = schema.to_pyarrow() + table = pa.Table.from_arrays( + [pa.array([], type=field.type) for field in arrow_schema], + schema=arrow_schema, + ) + elif isinstance(obj, pa.Table): + table = obj + elif isinstance(obj, pd.DataFrame): + table = pa.Table.from_pandas(obj, preserve_index=False) + else: + raise com.IbisInputError( + "create_table currently accepts pandas.DataFrame or pyarrow.Table" + ) + + sink = io.BytesIO() + pq.write_table(table, sink) + return sink.getvalue() + + def create_table( + self, + name: str, + /, + obj: Any = None, + *, + schema: sch.Schema | None = None, + database: str | None = None, + temp: bool = False, + overwrite: bool = False, + ) -> ir.Table: + if temp: + raise NotImplementedError("Hotdata does not support temporary tables.") + if overwrite: + raise NotImplementedError("Hotdata create_table does not support overwrite.") + if database is not None: + raise NotImplementedError("Hotdata datasets choose their schema at creation time.") + + data = self._local_table_to_parquet(obj, schema) + upload = self.upload_file(data, content_type="application/parquet") + dataset = self.create_dataset_from_upload( + upload_id=upload["id"], + label=name, + table_name=name, + file_format="parquet", ) + return self.table(dataset["table_name"], database=("datasets", dataset["schema_name"])) - def drop_table(self, *_args: Any, **_kwargs: Any) -> None: - raise NotImplementedError("Hotdata backend does not implement drop_table in v1.") + def drop_table( + self, + name: str, + /, + *, + database: tuple[str, str] | str | None = None, + force: bool = False, + ) -> None: + try: + dataset = self._find_dataset(name, database) + except com.TableNotFound: + if force: + return + raise + try: + self._http.delete_dataset(dataset["id"]) + except HotdataAPIError as exc: + raise _ibis_err_from_hotdata(exc) from exc def _register_in_memory_table(self, _op: ops.InMemoryTable) -> None: return diff --git a/src/ibis_hotdata/http.py b/src/ibis_hotdata/http.py index b76ac71..8953750 100644 --- a/src/ibis_hotdata/http.py +++ b/src/ibis_hotdata/http.py @@ -161,10 +161,20 @@ def execute_query( raise HotdataAPIError("Timeout waiting for asynchronous query") raise HotdataAPIError("Unexpected query response type") - def upload_file(self, data: bytes) -> dict[str, Any]: - resp = self._safe_call(self._uploads.upload_file, data) + def upload_file(self, data: bytes, *, content_type: str | None = None) -> dict[str, Any]: + kwargs: dict[str, Any] = {} + if content_type is not None: + kwargs["_content_type"] = content_type + resp = self._safe_call(self._uploads.upload_file, data, **kwargs) return resp.model_dump(by_alias=True, mode="json") + def list_datasets(self, *, limit: int = 1000, offset: int = 0) -> dict[str, Any]: + resp = self._safe_call(self._datasets.list_datasets, limit=limit, offset=offset) + return resp.model_dump(by_alias=True, mode="json") + + def delete_dataset(self, dataset_id: str) -> None: + self._safe_call(self._datasets.delete_dataset, dataset_id) + def create_dataset_from_upload( self, *, diff --git a/tests/test_architecture_guardrails.py b/tests/test_architecture_guardrails.py new file mode 100644 index 0000000..6456d7b --- /dev/null +++ b/tests/test_architecture_guardrails.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import re +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[1] + + +def test_pyproject_dependencies_do_not_include_hotdata_runtime() -> None: + pyproject_text = (REPO_ROOT / "pyproject.toml").read_text(encoding="utf-8") + match = re.search(r"(?ms)^dependencies\s*=\s*\[(.*?)\]", pyproject_text) + assert match is not None + dependencies_block = match.group(1) + assert "hotdata-runtime" not in dependencies_block + assert "hotdata_runtime" not in dependencies_block + + +def test_source_tree_does_not_import_hotdata_runtime() -> None: + violations: list[str] = [] + import_patterns = ( + re.compile(r"(?m)^\s*from\s+hotdata_runtime(?:\.|\s+import)"), + re.compile(r"(?m)^\s*import\s+hotdata_runtime(?:\s|$|,)"), + ) + + for folder in ("src", "tests", "examples"): + for path in (REPO_ROOT / folder).rglob("*.py"): + text = path.read_text(encoding="utf-8") + if any(pattern.search(text) for pattern in import_patterns): + violations.append(str(path.relative_to(REPO_ROOT))) + + assert not violations, ( + "hotdata-ibis must remain independent from hotdata-runtime; " + f"found forbidden imports in: {', '.join(violations)}" + ) diff --git a/tests/test_hotdata_backend.py b/tests/test_hotdata_backend.py index 771d23b..94f80ad 100644 --- a/tests/test_hotdata_backend.py +++ b/tests/test_hotdata_backend.py @@ -5,8 +5,10 @@ import ibis import ibis.common.exceptions as com +import pandas as pd import pyarrow as pa import pyarrow.ipc as ipc +import pyarrow.parquet as pq import pytest from werkzeug.wrappers import Request, Response @@ -35,6 +37,48 @@ def arrow_stream(table: pa.Table) -> bytes: return sink.getvalue() +def dataset_list_response(*datasets: dict, has_more: bool = False, offset: int = 0) -> dict: + return { + "count": len(datasets), + "datasets": list(datasets), + "has_more": has_more, + "limit": 1000, + "offset": offset, + } + + +def dataset_summary(dataset_id: str, table_name: str, schema_name: str = "sch_1") -> dict: + return { + "created_at": "2026-01-01T00:00:00Z", + "id": dataset_id, + "label": table_name, + "latest_version": 1, + "pinned_version": None, + "schema_name": schema_name, + "table_name": table_name, + "updated_at": "2026-01-01T00:00:00Z", + } + + +def information_schema_response(table_name: str, schema_name: str, columns: list[dict]) -> dict: + return { + "count": 1, + "has_more": False, + "limit": 500, + "next_cursor": None, + "tables": [ + { + "connection": "datasets", + "schema": schema_name, + "table": table_name, + "synced": True, + "last_sync": None, + "columns": columns, + } + ], + } + + def test_connect_via_url(httpserver: HTTPServer, srv: str): url = f"hotdata://127.0.0.1:{httpserver.port}?token=tok&workspace_id=ws_demo&verify_ssl=false" con = ibis.connect(url) @@ -89,6 +133,319 @@ def test_sql_execution(httpserver: HTTPServer, srv: str): assert list(pdf["x"]) == [1] +def test_to_pyarrow_uses_arrow_result(httpserver: HTTPServer, srv: str): + httpserver.expect_request("/v1/query", method="POST").respond_with_json( + { + "query_run_id": "run1", + "status": "queued", + "status_url": "http://poll", + "reason": None, + }, + status=202, + ) + httpserver.expect_request("/v1/query-runs/run1").respond_with_json( + { + "created_at": "2026-01-01T00:00:00Z", + "snapshot_id": "snap", + "sql_hash": "h", + "sql_text": "select 1", + "status": "succeeded", + "result_id": "res1", + "id": "run1", + } + ) + httpserver.expect_request("/v1/results/res1").respond_with_data( + arrow_stream(pa.table({"x": [1, 2]})), + status=200, + content_type="application/vnd.apache.arrow.stream", + ) + + con = ibis.hotdata.connect( + api_url=srv, + token="tok", + workspace_id="ws", + verify_ssl=False, + default_connection=TPCH_CONN, + default_schema=TPCH_SF1, + ) + + tbl = con.sql("SELECT 1 AS x", dialect="postgres") + out = con.to_pyarrow(tbl) + assert out.to_pydict() == {"x": [1, 2]} + + +def test_to_pyarrow_batches_uses_arrow_result(httpserver: HTTPServer, srv: str): + httpserver.expect_request("/v1/query", method="POST").respond_with_json( + { + "query_run_id": "run1", + "status": "queued", + "status_url": "http://poll", + "reason": None, + }, + status=202, + ) + httpserver.expect_request("/v1/query-runs/run1").respond_with_json( + { + "created_at": "2026-01-01T00:00:00Z", + "snapshot_id": "snap", + "sql_hash": "h", + "sql_text": "select 1", + "status": "succeeded", + "result_id": "res1", + "id": "run1", + } + ) + httpserver.expect_request("/v1/results/res1").respond_with_data( + arrow_stream(pa.table({"x": [1, 2, 3]})), + status=200, + content_type="application/vnd.apache.arrow.stream", + ) + + con = ibis.hotdata.connect( + api_url=srv, + token="tok", + workspace_id="ws", + verify_ssl=False, + default_connection=TPCH_CONN, + default_schema=TPCH_SF1, + ) + + tbl = con.sql("SELECT 1 AS x", dialect="postgres") + with con.to_pyarrow_batches(tbl, chunk_size=2) as reader: + out = reader.read_all() + assert out.to_pydict() == {"x": [1, 2, 3]} + + +def test_create_table_from_pandas_uploads_parquet_dataset(httpserver: HTTPServer, srv: str): + uploaded: dict[str, pa.Table] = {} + + def on_upload(req: Request) -> Response: + assert req.headers["Content-Type"] == "application/parquet" + uploaded["table"] = pq.read_table(io.BytesIO(req.get_data())) + return Response( + json.dumps( + { + "id": "upl_1", + "status": "ready", + "size_bytes": len(req.get_data()), + "created_at": "2026-01-01T00:00:00Z", + "content_type": "application/parquet", + } + ), + status=201, + content_type="application/json", + ) + + def on_dataset(req: Request) -> Response: + body = req.get_json() + assert body == { + "label": "demo", + "source": {"upload_id": "upl_1", "format": "parquet"}, + "table_name": "demo", + } + return Response( + json.dumps( + { + "id": "ds_1", + "label": "demo", + "schema_name": "sch_1", + "table_name": "demo", + "status": "ready", + "created_at": "2026-01-01T00:00:00Z", + } + ), + status=201, + content_type="application/json", + ) + + httpserver.expect_request("/v1/files", method="POST").respond_with_handler(on_upload) + httpserver.expect_request("/v1/datasets", method="POST").respond_with_handler(on_dataset) + httpserver.expect_request("/v1/information_schema").respond_with_json( + information_schema_response( + "demo", + "sch_1", + [{"name": "x", "data_type": "BIGINT", "nullable": True}], + ) + ) + + con = ibis.hotdata.connect( + api_url=srv, + token="tok", + workspace_id="ws", + verify_ssl=False, + ) + + table = con.create_table("demo", pd.DataFrame({"x": [1, 2]})) + + assert uploaded["table"].to_pydict() == {"x": [1, 2]} + assert table.schema().names == ("x",) + + +def test_create_table_from_pyarrow_uploads_parquet_dataset(httpserver: HTTPServer, srv: str): + uploaded: dict[str, pa.Table] = {} + + def on_upload(req: Request) -> Response: + uploaded["table"] = pq.read_table(io.BytesIO(req.get_data())) + return Response( + json.dumps( + { + "id": "upl_1", + "status": "ready", + "size_bytes": len(req.get_data()), + "created_at": "2026-01-01T00:00:00Z", + "content_type": "application/parquet", + } + ), + status=201, + content_type="application/json", + ) + + httpserver.expect_request("/v1/files", method="POST").respond_with_handler(on_upload) + httpserver.expect_request("/v1/datasets", method="POST").respond_with_json( + { + "id": "ds_1", + "label": "arrow_demo", + "schema_name": "sch_1", + "table_name": "arrow_demo", + "status": "ready", + "created_at": "2026-01-01T00:00:00Z", + }, + status=201, + ) + httpserver.expect_request("/v1/information_schema").respond_with_json( + information_schema_response( + "arrow_demo", + "sch_1", + [ + {"name": "x", "data_type": "BIGINT", "nullable": True}, + {"name": "y", "data_type": "VARCHAR", "nullable": True}, + ], + ) + ) + + con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) + expr = con.create_table("arrow_demo", pa.table({"x": [1], "y": ["a"]})) + + assert uploaded["table"].to_pydict() == {"x": [1], "y": ["a"]} + assert expr.schema().names == ("x", "y") + + +def test_create_table_schema_only_uploads_empty_parquet(httpserver: HTTPServer, srv: str): + uploaded: dict[str, pa.Table] = {} + + def on_upload(req: Request) -> Response: + uploaded["table"] = pq.read_table(io.BytesIO(req.get_data())) + return Response( + json.dumps( + { + "id": "upl_1", + "status": "ready", + "size_bytes": len(req.get_data()), + "created_at": "2026-01-01T00:00:00Z", + "content_type": "application/parquet", + } + ), + status=201, + content_type="application/json", + ) + + httpserver.expect_request("/v1/files", method="POST").respond_with_handler(on_upload) + httpserver.expect_request("/v1/datasets", method="POST").respond_with_json( + { + "id": "ds_1", + "label": "empty_demo", + "schema_name": "sch_1", + "table_name": "empty_demo", + "status": "ready", + "created_at": "2026-01-01T00:00:00Z", + }, + status=201, + ) + httpserver.expect_request("/v1/information_schema").respond_with_json( + information_schema_response( + "empty_demo", + "sch_1", + [ + {"name": "x", "data_type": "BIGINT", "nullable": True}, + {"name": "y", "data_type": "VARCHAR", "nullable": True}, + ], + ) + ) + + con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) + expr = con.create_table("empty_demo", schema=ibis.schema({"x": "int64", "y": "string"})) + + assert uploaded["table"].num_rows == 0 + assert uploaded["table"].schema.names == ["x", "y"] + assert expr.schema().names == ("x", "y") + + +def test_create_table_rejects_unsupported_options(httpserver: HTTPServer, srv: str): + con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) + + with pytest.raises(NotImplementedError, match="temporary"): + con.create_table("tmp", pd.DataFrame({"x": [1]}), temp=True) + with pytest.raises(NotImplementedError, match="overwrite"): + con.create_table("tmp", pd.DataFrame({"x": [1]}), overwrite=True) + with pytest.raises(NotImplementedError, match="schema"): + con.create_table("tmp", pd.DataFrame({"x": [1]}), database="main") + with pytest.raises(com.IbisInputError, match="only one of obj or schema"): + con.create_table( + "tmp", + pd.DataFrame({"x": [1]}), + schema=ibis.schema({"x": "int64"}), + ) + with pytest.raises(com.IbisInputError, match="pandas.DataFrame or pyarrow.Table"): + con.create_table("tmp", obj=[{"x": 1}]) + + +def test_drop_table_deletes_matching_dataset(httpserver: HTTPServer, srv: str): + httpserver.expect_request("/v1/datasets").respond_with_json( + dataset_list_response(dataset_summary("ds_1", "demo")) + ) + httpserver.expect_request("/v1/datasets/ds_1", method="DELETE").respond_with_data( + b"", status=204 + ) + + con = ibis.hotdata.connect( + api_url=srv, + token="tok", + workspace_id="ws", + verify_ssl=False, + ) + + con.drop_table("demo", database=("datasets", "sch_1")) + + +def test_drop_table_force_ignores_missing_dataset(httpserver: HTTPServer, srv: str): + httpserver.expect_request("/v1/datasets").respond_with_json(dataset_list_response()) + + con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) + + con.drop_table("missing", force=True) + + +def test_drop_table_raises_for_ambiguous_dataset_name(httpserver: HTTPServer, srv: str): + httpserver.expect_request("/v1/datasets").respond_with_json( + dataset_list_response( + dataset_summary("ds_1", "demo", schema_name="a"), + dataset_summary("ds_2", "demo", schema_name="b"), + ) + ) + + con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) + + with pytest.raises(com.IbisInputError, match="Multiple Hotdata datasets"): + con.drop_table("demo") + + +def test_drop_table_raises_for_non_dataset_catalog(httpserver: HTTPServer, srv: str): + con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) + + with pytest.raises(com.TableNotFound): + con.drop_table("demo", database=("tpch", "sch_1")) + + def test_compile_scalar_no_roundtrip(httpserver: HTTPServer, srv: str): con = ibis.hotdata.connect( api_url=srv, diff --git a/tests/test_hotdata_http.py b/tests/test_hotdata_http.py index 044d4f2..d926523 100644 --- a/tests/test_hotdata_http.py +++ b/tests/test_hotdata_http.py @@ -218,3 +218,71 @@ def on_dataset(req: Request) -> Response: assert ds["schema_name"] == "main" assert ds["table_name"] == "demo_tbl" client.close() + + +def test_upload_file_accepts_content_type(httpserver: HTTPServer): + def on_upload(req: Request) -> Response: + assert req.headers["Content-Type"] == "application/parquet" + return Response( + json.dumps( + { + "id": "upl_1", + "status": "ready", + "size_bytes": len(req.get_data()), + "created_at": "2026-01-01T00:00:00Z", + "content_type": "application/parquet", + } + ), + status=201, + content_type="application/json", + ) + + httpserver.expect_oneshot_request("/v1/files", method="POST").respond_with_handler(on_upload) + + client = HotdataClient( + api_url=httpserver.url_for("/").rstrip("/"), + token="t", + workspace_id="w", + verify_ssl=False, + ) + out = client.upload_file(b"parquet", content_type="application/parquet") + assert out["id"] == "upl_1" + client.close() + + +def test_list_and_delete_datasets(httpserver: HTTPServer): + httpserver.expect_oneshot_request("/v1/datasets").respond_with_json( + { + "count": 1, + "datasets": [ + { + "created_at": "2026-01-01T00:00:00Z", + "id": "ds_1", + "label": "demo", + "latest_version": 1, + "pinned_version": None, + "schema_name": "sch_1", + "table_name": "demo", + "updated_at": "2026-01-01T00:00:00Z", + } + ], + "has_more": False, + "limit": 1000, + "offset": 0, + } + ) + httpserver.expect_oneshot_request("/v1/datasets/ds_1", method="DELETE").respond_with_data( + b"", status=204 + ) + + client = HotdataClient( + api_url=httpserver.url_for("/").rstrip("/"), + token="t", + workspace_id="w", + verify_ssl=False, + ) + + datasets = client.list_datasets() + assert datasets["datasets"][0]["id"] == "ds_1" + client.delete_dataset("ds_1") + client.close() diff --git a/tests/test_version.py b/tests/test_version.py new file mode 100644 index 0000000..d52a9df --- /dev/null +++ b/tests/test_version.py @@ -0,0 +1,13 @@ +import re + +from importlib.metadata import version as dist_version + +import ibis_hotdata + + +def test_version_is_pep440_core(): + assert re.fullmatch(r"\d+\.\d+\.\d+(\+.*)?", ibis_hotdata.__version__) + + +def test_version_matches_distribution_metadata(): + assert dist_version("ibis-hotdata") == ibis_hotdata.__version__