From faea8ae89c4183ff071648e3c78d0ea710ea48cb Mon Sep 17 00:00:00 2001 From: Eddie A Tejeda <669988+eddietejeda@users.noreply.github.com> Date: Mon, 18 May 2026 21:21:04 -0700 Subject: [PATCH 1/5] chore: require hotdata SDK 0.2.0 Bump dependency to hotdata>=0.2.0 and adapt dataset upload requests to the internally tagged DatasetSource shape introduced in the SDK. --- README.md | 2 +- pyproject.toml | 2 +- src/ibis_hotdata/http.py | 11 +++++++++-- tests/test_hotdata_backend.py | 2 +- tests/test_hotdata_http.py | 2 +- uv.lock | 8 ++++---- 6 files changed, 17 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index fc95075..3e93a20 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Experimental [Ibis](https://ibis-project.org/) backend for [Hotdata](https://www.hotdata.dev/docs/api-reference): compile expressions with Ibis, run federated SQL over the Hotdata API. REST calls use the official **[hotdata](https://github.com/hotdata-dev/sdk-python)** Python SDK. Repo examples use **httpx** (listed under the **dev** dependency group). -**Requirements:** Python 3.10+, **ibis-framework** 10.x, **hotdata** ≥0.1. +**Requirements:** Python 3.10+, **ibis-framework** 10.x, **hotdata** ≥0.2. ## Install diff --git a/pyproject.toml b/pyproject.toml index fc77aad..d7136cc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ classifiers = [ ] dependencies = [ "ibis-framework>=10.0,<11", - "hotdata>=0.1.0", + "hotdata>=0.2.0", "pyarrow>=15", "pyarrow-hotfix>=0.6", "pandas>=2", diff --git a/src/ibis_hotdata/http.py b/src/ibis_hotdata/http.py index 8953750..c8f0bae 100644 --- a/src/ibis_hotdata/http.py +++ b/src/ibis_hotdata/http.py @@ -23,8 +23,9 @@ UploadsApi, ) from hotdata.exceptions import ApiException -from hotdata.models import CreateDatasetRequest, DatasetSource, QueryRequest, UploadDatasetSource +from hotdata.models import CreateDatasetRequest, DatasetSource, QueryRequest from hotdata.models.async_query_response import AsyncQueryResponse +from hotdata.models.dataset_source_one_of import DatasetSourceOneOf T = TypeVar("T") @@ -183,7 +184,13 @@ def create_dataset_from_upload( table_name: str | None = None, file_format: str = "csv", ) -> dict[str, Any]: - src = DatasetSource(UploadDatasetSource(upload_id=upload_id, format=file_format)) + src = DatasetSource( + DatasetSourceOneOf( + type="upload", + upload_id=upload_id, + format=file_format, + ) + ) fields: dict[str, Any] = {"label": label, "source": src} if table_name is not None: fields["table_name"] = table_name diff --git a/tests/test_hotdata_backend.py b/tests/test_hotdata_backend.py index 94f80ad..ad41ce3 100644 --- a/tests/test_hotdata_backend.py +++ b/tests/test_hotdata_backend.py @@ -240,7 +240,7 @@ def on_dataset(req: Request) -> Response: body = req.get_json() assert body == { "label": "demo", - "source": {"upload_id": "upl_1", "format": "parquet"}, + "source": {"type": "upload", "upload_id": "upl_1", "format": "parquet"}, "table_name": "demo", } return Response( diff --git a/tests/test_hotdata_http.py b/tests/test_hotdata_http.py index d926523..6a7477e 100644 --- a/tests/test_hotdata_http.py +++ b/tests/test_hotdata_http.py @@ -185,7 +185,7 @@ def test_upload_file_then_create_dataset(httpserver: HTTPServer): def on_dataset(req: Request) -> Response: body = req.get_json() assert body["label"] == "demo" - assert body["source"] == {"upload_id": "upl_1", "format": "csv"} + assert body["source"] == {"type": "upload", "upload_id": "upl_1", "format": "csv"} assert body.get("table_name") == "demo_tbl" payload = { "id": "ds_1", diff --git a/uv.lock b/uv.lock index f9e5a1e..85331fe 100644 --- a/uv.lock +++ b/uv.lock @@ -84,7 +84,7 @@ wheels = [ [[package]] name = "hotdata" -version = "0.1.0" +version = "0.2.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "pydantic" }, @@ -92,9 +92,9 @@ dependencies = [ { name = "typing-extensions" }, { name = "urllib3" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/63/a2/7e997581dc23fca35330c355cd433135c4d18cc5506fb77fb35fd0180e97/hotdata-0.1.0.tar.gz", hash = "sha256:6795ff7381fb8f2f258ee3f0c31f9b1ba2f5908728c51fa399840fdf603acc46", size = 97691, upload-time = "2026-04-25T17:57:00.102Z" } +sdist = { url = "https://files.pythonhosted.org/packages/ce/0f/1e9e024aa13f8d4bf8f9fb1bce777da6ca19da05b8435f2ba5cd5f87ec80/hotdata-0.2.0.tar.gz", hash = "sha256:e1131c05ed34d2f39ddee84930eb6694ed46971d7a442df5932689b28a6c9b4f", size = 108780, upload-time = "2026-05-19T04:01:38.345Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/3f/21/e04ca377e7e3db50215bf207867ef02a56af11f61022390b7689e6ff2db3/hotdata-0.1.0-py3-none-any.whl", hash = "sha256:304f46d7c7ed5b586a9102684ef42e45972955dfb66a492c5e0b016e8bc545fa", size = 242376, upload-time = "2026-04-25T17:56:58.126Z" }, + { url = "https://files.pythonhosted.org/packages/9a/e7/63b4820963ec475fe16403d363e5ddec237cfe01a39c2d7aff6a6d48d720/hotdata-0.2.0-py3-none-any.whl", hash = "sha256:d3d644a3b607f4891a784b8d5afa30a00bd9e437db013fd0581bf8bca501ac0d", size = 256603, upload-time = "2026-05-19T04:01:36.253Z" }, ] [[package]] @@ -167,7 +167,7 @@ dev = [ [package.metadata] requires-dist = [ - { name = "hotdata", specifier = ">=0.1.0" }, + { name = "hotdata", specifier = ">=0.2.0" }, { name = "ibis-framework", specifier = ">=10.0,<11" }, { name = "pandas", specifier = ">=2" }, { name = "pyarrow", specifier = ">=15" }, From eba3e585d34d2ad9b3c9eb307928305fcd76013b Mon Sep 17 00:00:00 2001 From: Eddie A Tejeda <669988+eddietejeda@users.noreply.github.com> Date: Mon, 18 May 2026 21:39:53 -0700 Subject: [PATCH 2/5] feat: replace dataset writes with managed database API Use managed connections for create_database, create_table, drop_table, and drop_database instead of the datasets API. --- README.md | 12 +- src/ibis_hotdata/backend.py | 183 +++++++++++-------- src/ibis_hotdata/http.py | 64 ++++--- src/ibis_hotdata/managed.py | 21 +++ tests/test_hotdata_backend.py | 323 ++++++++++++++++++++-------------- tests/test_hotdata_http.py | 136 ++++++++------ 6 files changed, 450 insertions(+), 289 deletions(-) create mode 100644 src/ibis_hotdata/managed.py diff --git a/README.md b/README.md index 3e93a20..af217db 100644 --- a/README.md +++ b/README.md @@ -19,8 +19,7 @@ uv pip install ibis-hotdata - **Typed table discovery** — load schema metadata from Hotdata information schema and map SQL types into Ibis types. - **Arrow and pandas results** — materialize expressions as pandas DataFrames, PyArrow tables, or local Arrow record batches. - **Raw SQL escape hatch** — use `con.sql(..., dialect="postgres")` when Hotdata-specific federated SQL is clearer than modeled Ibis expressions. -- **Dataset upload helpers** — upload local pandas or PyArrow data as Parquet-backed Hotdata datasets through `create_table`. -- **Dataset cleanup** — delete Hotdata-managed datasets through the limited `drop_table` implementation. +- **Managed database writes** — create managed connections with `create_database`, load local pandas or PyArrow data through `create_table`, and clean up with `drop_table` / `drop_database`. ## Connect @@ -69,19 +68,18 @@ Supported today: - **SQL-backed expressions:** Ibis expressions compile with the Postgres SQLGlot compiler and execute through Hotdata. Common `SELECT` workloads such as projection, filtering, joins, grouping, aggregation, ordering, limits, scalar expressions, and `con.sql(...)` work when the generated SQL is accepted by Hotdata. - **Result materialization:** `.execute()` returns pandas objects. `.to_pyarrow()` and `.to_pyarrow_batches()` use the Arrow IPC result data exposed by Hotdata without converting through JSON rows; batches are split locally after the result is downloaded. - **Raw SQL escape hatch:** `con.sql("SELECT ...", dialect="postgres")` is the most reliable way to use Hotdata-specific federated table names or SQL that Ibis does not model directly. -- **Uploads and datasets:** `upload_file` plus `create_dataset_from_upload` can create Hotdata datasets; query them as `datasets..` after creation. -- **Limited `create_table`:** `con.create_table("name", pandas_df)` and `con.create_table("name", pyarrow_table)` serialize local data to Parquet, upload it, and create a Hotdata dataset. Schema-only calls create an empty Parquet-backed dataset. -- **Dataset-only `drop_table`:** `con.drop_table(...)` deletes matching Hotdata-managed datasets. It does not drop external source tables. +- **Managed database lifecycle:** `create_database("sales", schema="public", tables=["orders"])` registers a managed connection (Ibis catalog). `create_table("orders", pandas_df, database=("sales", "public"))` uploads Parquet and loads it with replace mode. Query as `sales.public.orders` in SQL. `drop_table` clears a managed table; `drop_database` deletes the connection. +- **Parquet uploads:** `create_table` accepts pandas DataFrames, PyArrow tables, or schema-only empty tables. Tables must live in a managed connection — declare them with `create_database(..., tables=[...])` first. Not supported as full Ibis backend features: -- **General DDL and mutations:** Arbitrary remote `CREATE TABLE` / `DROP TABLE`, inserts, updates, deletes, overwrites, and schema-altering operations are not implemented. `create_table` and `drop_table` are limited to Hotdata datasets as described above. +- **General DDL and mutations:** Arbitrary remote DDL, inserts, updates, deletes, and schema-altering operations on external connections are not implemented. Managed-database writes are limited to `create_database`, `create_table`, `drop_table`, and `drop_database` as described above. - **Temporary tables and in-memory registration:** `supports_temporary_tables` is false, and in-memory tables are not uploaded automatically for joins. - **Python UDFs:** `supports_python_udfs` is false. - **Transactions and sessions as database state:** Hotdata sandbox sessions can be passed as `session_id`, but the backend does not expose transaction APIs. - **Backend-native SQL dialect:** Compilation uses Ibis' Postgres dialect as the closest fit. Hotdata SQL and federation rules are authoritative, so not every Ibis expression that compiles is guaranteed to execute remotely. - **Complete Ibis compliance:** The backend is experimental and has focused test coverage for connection, discovery, schema mapping, execution, uploads, and Arrow results. It has not yet been validated against the full Ibis backend test suite. -- **Hotdata platform APIs beyond SQL datasets:** embeddings, indexes, query history management, sandbox lifecycle management, and other Hotdata-specific APIs are outside the Ibis backend surface. +- **Hotdata platform APIs beyond SQL and managed databases:** embeddings, indexes, query history management, sandbox lifecycle management, and other Hotdata-specific APIs are outside the Ibis backend surface. ## Development diff --git a/src/ibis_hotdata/backend.py b/src/ibis_hotdata/backend.py index f7ca286..989eb26 100644 --- a/src/ibis_hotdata/backend.py +++ b/src/ibis_hotdata/backend.py @@ -17,7 +17,7 @@ import contextlib import io -from collections.abc import Iterable, Mapping +from collections.abc import Iterable, Mapping, Sequence from functools import cached_property from importlib.metadata import PackageNotFoundError from importlib.metadata import version as pkg_version @@ -33,6 +33,7 @@ import sqlglot as sg import sqlglot.expressions as sge from ibis.backends import ( + CanCreateDatabase, CanListCatalog, CanListDatabase, HasCurrentCatalog, @@ -42,6 +43,7 @@ from ibis.backends.sql import SQLBackend from ibis_hotdata.http import HotdataAPIError, HotdataClient +from ibis_hotdata.managed import DEFAULT_SCHEMA, MANAGED_SOURCE_TYPE from ibis_hotdata.types import dtype_from_hotdata_sql_type _INFORMATION_SCHEMA_PAGE_SIZE = 500 @@ -59,6 +61,7 @@ def _ibis_err_from_hotdata(exc: HotdataAPIError) -> com.IbisError: class Backend( SQLBackend, + CanCreateDatabase, CanListCatalog, CanListDatabase, HasCurrentCatalog, @@ -320,44 +323,48 @@ def _iterate_information_schema( if cursor is None: break - def _iterate_datasets(self) -> Iterable[dict[str, Any]]: - offset = 0 - limit = 1000 - while True: - chunk = self._http.list_datasets(limit=limit, offset=offset) - yield from chunk["datasets"] - if not chunk.get("has_more"): - break - offset += limit - - def _dataset_database(self, database: tuple[str, str] | str | None) -> str | None: - if database is None: - return None - table_loc = self._to_sqlglot_table(database) - catalog, schema_name = self._to_catalog_db_tuple(table_loc) - if catalog and catalog != "datasets": - return "__not_datasets__" - return schema_name or catalog - - def _find_dataset( - self, table_name: str, database: tuple[str, str] | str | None - ) -> dict[str, Any]: - schema_name = self._dataset_database(database) - if schema_name == "__not_datasets__": - raise com.TableNotFound(table_name) - matches = [ - ds - for ds in self._iterate_datasets() - if ds["table_name"] == table_name - and (schema_name is None or ds["schema_name"] == schema_name) - ] - if not matches: - raise com.TableNotFound(table_name) - if len(matches) > 1: + def _resolve_connection(self, name_or_id: str) -> dict[str, Any]: + data = self._http.list_connections() + for conn in data["connections"]: + if conn["id"] == name_or_id or conn.get("name") == name_or_id: + return conn + raise com.IbisError(f"Unknown Hotdata connection {name_or_id!r}") + + def _resolve_managed_connection(self, name_or_id: str) -> dict[str, Any]: + conn = self._resolve_connection(name_or_id) + if conn.get("source_type") != MANAGED_SOURCE_TYPE: raise com.IbisInputError( - f"Multiple Hotdata datasets named {table_name!r}; pass database=('datasets', schema)." + f"{name_or_id!r} is not a managed database " + f"(source_type={conn.get('source_type')!r})" ) - return matches[0] + return conn + + def _connection_id(self, name_or_id: str) -> str: + return self._resolve_connection(name_or_id)["id"] + + def _table_location( + self, + database: tuple[str, str] | str | None, + ) -> tuple[str, str]: + if database is None: + if self._default_connection is None or self._default_schema is None: + raise com.IbisInputError( + "Requires database=(catalog, schema) or default_connection and default_schema" + ) + conn = self._default_connection + schema = self._default_schema + elif isinstance(database, tuple): + conn, schema = database + else: + conn = self._default_connection or self.current_catalog + schema = database + if conn is None: + raise com.IbisInputError( + "create_table with database=schema requires default_connection or current catalog" + ) + conn_id = self._connection_id(conn) + self._resolve_managed_connection(conn) + return conn_id, schema # --- schema / sql execution -------------------------------------------- @@ -480,33 +487,65 @@ def to_pyarrow_batches( ) def upload_file(self, data: bytes, *, content_type: str | None = None) -> dict[str, Any]: - """POST ``/v1/files``; returns the upload record (use ``id`` with :meth:`create_dataset_from_upload`).""" + """POST ``/v1/files``; returns the upload record (use ``id`` with managed table loads).""" try: return self._http.upload_file(data, content_type=content_type) except HotdataAPIError as exc: raise _ibis_err_from_hotdata(exc) from exc - def create_dataset_from_upload( + def create_database( self, - upload_id: str, - label: str, + name: str, + /, *, - table_name: str | None = None, - file_format: str = "csv", - ) -> dict[str, Any]: - """POST ``/v1/datasets`` with an upload source—materializes a queryable dataset table. - - The response includes ``schema_name`` and ``table_name``. Reference the table in SQL as - ``datasets..`` (see Hotdata ``datasets`` documentation). - """ + catalog: str | None = None, + schema: str = DEFAULT_SCHEMA, + tables: Sequence[str] | None = None, + force: bool = False, + ) -> None: + """Create a managed Hotdata connection (Ibis catalog) with optional declared tables.""" + if catalog is not None: + raise com.UnsupportedOperationError( + "Hotdata create_database creates a managed connection (catalog); catalog= is not supported" + ) try: - return self._http.create_dataset_from_upload( - upload_id=upload_id, - label=label, - table_name=table_name, - file_format=file_format, + self._resolve_connection(name) + except com.IbisError: + pass + else: + if not force: + raise com.IbisInputError(f"Managed database {name!r} already exists") + self._resolve_managed_connection(name) + return + try: + self._http.create_managed_database(name, schema=schema, tables=list(tables or ())) + except HotdataAPIError as exc: + raise _ibis_err_from_hotdata(exc) from exc + + def drop_database( + self, + name: str, + /, + *, + catalog: str | None = None, + force: bool = False, + ) -> None: + """Delete a managed Hotdata connection (Ibis catalog).""" + if catalog is not None: + raise com.UnsupportedOperationError( + "Hotdata drop_database deletes a managed connection (catalog); catalog= is not supported" ) + try: + conn = self._resolve_managed_connection(name) + except com.IbisError: + if force: + return + raise + try: + self._http.delete_connection(conn["id"]) except HotdataAPIError as exc: + if force and exc.status_code == 404: + return raise _ibis_err_from_hotdata(exc) from exc def _local_table_to_parquet(self, obj: Any, schema: sch.Schema | None): @@ -545,26 +584,30 @@ def create_table( obj: Any = None, *, schema: sch.Schema | None = None, - database: str | None = None, + database: tuple[str, str] | str | None = None, temp: bool = False, overwrite: bool = False, ) -> ir.Table: if temp: raise NotImplementedError("Hotdata does not support temporary tables.") - if overwrite: - raise NotImplementedError("Hotdata create_table does not support overwrite.") - if database is not None: - raise NotImplementedError("Hotdata datasets choose their schema at creation time.") + del overwrite # loads always use replace mode (only API option) + + if obj is not None and schema is not None: + raise com.IbisInputError("create_table accepts only one of obj or schema") data = self._local_table_to_parquet(obj, schema) + connection_id, schema_name = self._table_location(database) upload = self.upload_file(data, content_type="application/parquet") - dataset = self.create_dataset_from_upload( - upload_id=upload["id"], - label=name, - table_name=name, - file_format="parquet", - ) - return self.table(dataset["table_name"], database=("datasets", dataset["schema_name"])) + try: + self._http.load_managed_table( + connection_id, + schema_name, + name, + upload_id=upload["id"], + ) + except HotdataAPIError as exc: + raise _ibis_err_from_hotdata(exc) from exc + return self.table(name, database=(connection_id, schema_name)) def drop_table( self, @@ -575,14 +618,16 @@ def drop_table( force: bool = False, ) -> None: try: - dataset = self._find_dataset(name, database) - except com.TableNotFound: + connection_id, schema_name = self._table_location(database) + except com.IbisError: if force: return raise try: - self._http.delete_dataset(dataset["id"]) + self._http.delete_managed_table(connection_id, schema_name, name) except HotdataAPIError as exc: + if force and exc.status_code == 404: + return raise _ibis_err_from_hotdata(exc) from exc def _register_in_memory_table(self, _op: ops.InMemoryTable) -> None: diff --git a/src/ibis_hotdata/http.py b/src/ibis_hotdata/http.py index c8f0bae..f5b6185 100644 --- a/src/ibis_hotdata/http.py +++ b/src/ibis_hotdata/http.py @@ -5,7 +5,7 @@ import io import json import time -from collections.abc import Callable, Mapping +from collections.abc import Callable, Mapping, Sequence from typing import Any, TypeVar import pyarrow as pa @@ -15,7 +15,6 @@ from hotdata import ApiClient, Configuration from hotdata.api import ( ConnectionsApi, - DatasetsApi, InformationSchemaApi, QueryApi, QueryRunsApi, @@ -23,9 +22,11 @@ UploadsApi, ) from hotdata.exceptions import ApiException -from hotdata.models import CreateDatasetRequest, DatasetSource, QueryRequest +from hotdata.models import CreateConnectionRequest, QueryRequest from hotdata.models.async_query_response import AsyncQueryResponse -from hotdata.models.dataset_source_one_of import DatasetSourceOneOf +from hotdata.models.load_managed_table_request import LoadManagedTableRequest + +from ibis_hotdata.managed import DEFAULT_SCHEMA, MANAGED_SOURCE_TYPE, build_managed_config T = TypeVar("T") @@ -95,7 +96,6 @@ def __init__( self._connections = ConnectionsApi(self._client) self._information_schema = InformationSchemaApi(self._client) self._uploads = UploadsApi(self._client) - self._datasets = DatasetsApi(self._client) def close(self) -> None: client = self._client @@ -169,35 +169,51 @@ def upload_file(self, data: bytes, *, content_type: str | None = None) -> dict[s resp = self._safe_call(self._uploads.upload_file, data, **kwargs) return resp.model_dump(by_alias=True, mode="json") - def list_datasets(self, *, limit: int = 1000, offset: int = 0) -> dict[str, Any]: - resp = self._safe_call(self._datasets.list_datasets, limit=limit, offset=offset) + def create_managed_database( + self, + name: str, + *, + schema: str = DEFAULT_SCHEMA, + tables: Sequence[str] = (), + ) -> dict[str, Any]: + req = CreateConnectionRequest( + name=name, + source_type=MANAGED_SOURCE_TYPE, + config=build_managed_config(schema, list(tables)), + skip_discovery=True, + ) + resp = self._safe_call(self._connections.create_connection, req) return resp.model_dump(by_alias=True, mode="json") - def delete_dataset(self, dataset_id: str) -> None: - self._safe_call(self._datasets.delete_dataset, dataset_id) + def delete_connection(self, connection_id: str) -> None: + self._safe_call(self._connections.delete_connection, connection_id) - def create_dataset_from_upload( + def load_managed_table( self, + connection_id: str, + schema: str, + table: str, *, upload_id: str, - label: str, - table_name: str | None = None, - file_format: str = "csv", ) -> dict[str, Any]: - src = DatasetSource( - DatasetSourceOneOf( - type="upload", - upload_id=upload_id, - format=file_format, - ) + req = LoadManagedTableRequest(mode="replace", upload_id=upload_id) + resp = self._safe_call( + self._connections.load_managed_table, + connection_id, + schema, + table, + req, ) - fields: dict[str, Any] = {"label": label, "source": src} - if table_name is not None: - fields["table_name"] = table_name - req = CreateDatasetRequest(**fields) - resp = self._safe_call(self._datasets.create_dataset, req) return resp.model_dump(by_alias=True, mode="json") + def delete_managed_table(self, connection_id: str, schema: str, table: str) -> None: + self._safe_call( + self._connections.delete_managed_table, + connection_id, + schema, + table, + ) + def _poll_result_arrow( self, result_id: str, diff --git a/src/ibis_hotdata/managed.py b/src/ibis_hotdata/managed.py new file mode 100644 index 0000000..4f47d24 --- /dev/null +++ b/src/ibis_hotdata/managed.py @@ -0,0 +1,21 @@ +"""Helpers for Hotdata managed database connections.""" + +from __future__ import annotations + +from typing import Any + +MANAGED_SOURCE_TYPE = "managed" +DEFAULT_SCHEMA = "public" + + +def build_managed_config(schema: str, tables: list[str]) -> dict[str, Any]: + if not tables: + return {} + return { + "schemas": [ + { + "name": schema, + "tables": [{"name": table} for table in tables], + } + ] + } diff --git a/tests/test_hotdata_backend.py b/tests/test_hotdata_backend.py index ad41ce3..9bd69d1 100644 --- a/tests/test_hotdata_backend.py +++ b/tests/test_hotdata_backend.py @@ -1,5 +1,7 @@ from __future__ import annotations +from collections.abc import Callable + import io import json @@ -15,7 +17,10 @@ pytest.importorskip("pytest_httpserver") from pytest_httpserver import HTTPServer -# Federated identifiers for mocked Hotdata (matches SQL shape ``tpch.tpch_sf1.customer``). +# Managed database identifiers for mocked Hotdata (SQL shape ``sales.public.orders``). +MANAGED_CONN = "conn_sales" +MANAGED_NAME = "sales" +PUBLIC = "public" TPCH_CONN = "tpch" TPCH_SF1 = "tpch_sf1" TPCH_CUSTOMER_COLS = [ @@ -37,30 +42,25 @@ def arrow_stream(table: pa.Table) -> bytes: return sink.getvalue() -def dataset_list_response(*datasets: dict, has_more: bool = False, offset: int = 0) -> dict: - return { - "count": len(datasets), - "datasets": list(datasets), - "has_more": has_more, - "limit": 1000, - "offset": offset, - } - - -def dataset_summary(dataset_id: str, table_name: str, schema_name: str = "sch_1") -> dict: +def managed_connections_response() -> dict: return { - "created_at": "2026-01-01T00:00:00Z", - "id": dataset_id, - "label": table_name, - "latest_version": 1, - "pinned_version": None, - "schema_name": schema_name, - "table_name": table_name, - "updated_at": "2026-01-01T00:00:00Z", + "connections": [ + { + "id": MANAGED_CONN, + "name": MANAGED_NAME, + "source_type": "managed", + } + ] } -def information_schema_response(table_name: str, schema_name: str, columns: list[dict]) -> dict: +def information_schema_response( + table_name: str, + schema_name: str, + columns: list[dict], + *, + connection: str = MANAGED_CONN, +) -> dict: return { "count": 1, "has_more": False, @@ -68,7 +68,7 @@ def information_schema_response(table_name: str, schema_name: str, columns: list "next_cursor": None, "tables": [ { - "connection": "datasets", + "connection": connection, "schema": schema_name, "table": table_name, "synced": True, @@ -79,6 +79,62 @@ def information_schema_response(table_name: str, schema_name: str, columns: list } +def mock_managed_create_table_flow( + httpserver: HTTPServer, + *, + table_name: str, + schema_name: str = PUBLIC, + columns: list[dict], + on_upload: Callable[[Request], Response] | None = None, +) -> None: + httpserver.expect_request("/v1/connections").respond_with_json(managed_connections_response()) + + def default_upload(req: Request) -> Response: + assert req.headers["Content-Type"] == "application/parquet" + return Response( + json.dumps( + { + "id": "upl_1", + "status": "ready", + "size_bytes": len(req.get_data()), + "created_at": "2026-01-01T00:00:00Z", + "content_type": "application/parquet", + } + ), + status=201, + content_type="application/json", + ) + + httpserver.expect_request("/v1/files", method="POST").respond_with_handler( + on_upload or default_upload + ) + + def on_load(req: Request) -> Response: + body = req.get_json() + assert body == {"mode": "replace", "upload_id": "upl_1"} + return Response( + json.dumps( + { + "connection_id": MANAGED_CONN, + "schema_name": schema_name, + "table_name": table_name, + "row_count": 1, + "arrow_schema_json": "{}", + } + ), + status=200, + content_type="application/json", + ) + + httpserver.expect_request( + f"/v1/connections/{MANAGED_CONN}/schemas/{schema_name}/tables/{table_name}/loads", + method="POST", + ).respond_with_handler(on_load) + httpserver.expect_request("/v1/information_schema").respond_with_json( + information_schema_response(table_name, schema_name, columns) + ) + + def test_connect_via_url(httpserver: HTTPServer, srv: str): url = f"hotdata://127.0.0.1:{httpserver.port}?token=tok&workspace_id=ws_demo&verify_ssl=false" con = ibis.connect(url) @@ -216,11 +272,10 @@ def test_to_pyarrow_batches_uses_arrow_result(httpserver: HTTPServer, srv: str): assert out.to_pydict() == {"x": [1, 2, 3]} -def test_create_table_from_pandas_uploads_parquet_dataset(httpserver: HTTPServer, srv: str): +def test_create_table_from_pandas_uploads_managed_table(httpserver: HTTPServer, srv: str): uploaded: dict[str, pa.Table] = {} def on_upload(req: Request) -> Response: - assert req.headers["Content-Type"] == "application/parquet" uploaded["table"] = pq.read_table(io.BytesIO(req.get_data())) return Response( json.dumps( @@ -236,36 +291,11 @@ def on_upload(req: Request) -> Response: content_type="application/json", ) - def on_dataset(req: Request) -> Response: - body = req.get_json() - assert body == { - "label": "demo", - "source": {"type": "upload", "upload_id": "upl_1", "format": "parquet"}, - "table_name": "demo", - } - return Response( - json.dumps( - { - "id": "ds_1", - "label": "demo", - "schema_name": "sch_1", - "table_name": "demo", - "status": "ready", - "created_at": "2026-01-01T00:00:00Z", - } - ), - status=201, - content_type="application/json", - ) - - httpserver.expect_request("/v1/files", method="POST").respond_with_handler(on_upload) - httpserver.expect_request("/v1/datasets", method="POST").respond_with_handler(on_dataset) - httpserver.expect_request("/v1/information_schema").respond_with_json( - information_schema_response( - "demo", - "sch_1", - [{"name": "x", "data_type": "BIGINT", "nullable": True}], - ) + mock_managed_create_table_flow( + httpserver, + table_name="demo", + columns=[{"name": "x", "data_type": "BIGINT", "nullable": True}], + on_upload=on_upload, ) con = ibis.hotdata.connect( @@ -275,13 +305,17 @@ def on_dataset(req: Request) -> Response: verify_ssl=False, ) - table = con.create_table("demo", pd.DataFrame({"x": [1, 2]})) + table = con.create_table( + "demo", + pd.DataFrame({"x": [1, 2]}), + database=(MANAGED_CONN, PUBLIC), + ) assert uploaded["table"].to_pydict() == {"x": [1, 2]} assert table.schema().names == ("x",) -def test_create_table_from_pyarrow_uploads_parquet_dataset(httpserver: HTTPServer, srv: str): +def test_create_table_from_pyarrow_uploads_managed_table(httpserver: HTTPServer, srv: str): uploaded: dict[str, pa.Table] = {} def on_upload(req: Request) -> Response: @@ -300,31 +334,22 @@ def on_upload(req: Request) -> Response: content_type="application/json", ) - httpserver.expect_request("/v1/files", method="POST").respond_with_handler(on_upload) - httpserver.expect_request("/v1/datasets", method="POST").respond_with_json( - { - "id": "ds_1", - "label": "arrow_demo", - "schema_name": "sch_1", - "table_name": "arrow_demo", - "status": "ready", - "created_at": "2026-01-01T00:00:00Z", - }, - status=201, - ) - httpserver.expect_request("/v1/information_schema").respond_with_json( - information_schema_response( - "arrow_demo", - "sch_1", - [ - {"name": "x", "data_type": "BIGINT", "nullable": True}, - {"name": "y", "data_type": "VARCHAR", "nullable": True}, - ], - ) + mock_managed_create_table_flow( + httpserver, + table_name="arrow_demo", + columns=[ + {"name": "x", "data_type": "BIGINT", "nullable": True}, + {"name": "y", "data_type": "VARCHAR", "nullable": True}, + ], + on_upload=on_upload, ) con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) - expr = con.create_table("arrow_demo", pa.table({"x": [1], "y": ["a"]})) + expr = con.create_table( + "arrow_demo", + pa.table({"x": [1], "y": ["a"]}), + database=(MANAGED_NAME, PUBLIC), + ) assert uploaded["table"].to_pydict() == {"x": [1], "y": ["a"]} assert expr.schema().names == ("x", "y") @@ -349,30 +374,24 @@ def on_upload(req: Request) -> Response: content_type="application/json", ) - httpserver.expect_request("/v1/files", method="POST").respond_with_handler(on_upload) - httpserver.expect_request("/v1/datasets", method="POST").respond_with_json( - { - "id": "ds_1", - "label": "empty_demo", - "schema_name": "sch_1", - "table_name": "empty_demo", - "status": "ready", - "created_at": "2026-01-01T00:00:00Z", - }, - status=201, - ) - httpserver.expect_request("/v1/information_schema").respond_with_json( - information_schema_response( - "empty_demo", - "sch_1", - [ - {"name": "x", "data_type": "BIGINT", "nullable": True}, - {"name": "y", "data_type": "VARCHAR", "nullable": True}, - ], - ) + mock_managed_create_table_flow( + httpserver, + table_name="empty_demo", + columns=[ + {"name": "x", "data_type": "BIGINT", "nullable": True}, + {"name": "y", "data_type": "VARCHAR", "nullable": True}, + ], + on_upload=on_upload, ) - con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) + con = ibis.hotdata.connect( + api_url=srv, + token="tok", + workspace_id="ws", + verify_ssl=False, + default_connection=MANAGED_CONN, + default_schema=PUBLIC, + ) expr = con.create_table("empty_demo", schema=ibis.schema({"x": "int64", "y": "string"})) assert uploaded["table"].num_rows == 0 @@ -385,10 +404,8 @@ def test_create_table_rejects_unsupported_options(httpserver: HTTPServer, srv: s with pytest.raises(NotImplementedError, match="temporary"): con.create_table("tmp", pd.DataFrame({"x": [1]}), temp=True) - with pytest.raises(NotImplementedError, match="overwrite"): - con.create_table("tmp", pd.DataFrame({"x": [1]}), overwrite=True) - with pytest.raises(NotImplementedError, match="schema"): - con.create_table("tmp", pd.DataFrame({"x": [1]}), database="main") + with pytest.raises(com.IbisInputError, match="Requires database"): + con.create_table("tmp", pd.DataFrame({"x": [1]})) with pytest.raises(com.IbisInputError, match="only one of obj or schema"): con.create_table( "tmp", @@ -399,51 +416,95 @@ def test_create_table_rejects_unsupported_options(httpserver: HTTPServer, srv: s con.create_table("tmp", obj=[{"x": 1}]) -def test_drop_table_deletes_matching_dataset(httpserver: HTTPServer, srv: str): - httpserver.expect_request("/v1/datasets").respond_with_json( - dataset_list_response(dataset_summary("ds_1", "demo")) - ) - httpserver.expect_request("/v1/datasets/ds_1", method="DELETE").respond_with_data( - b"", status=204 +def test_create_table_overwrite_loads_with_replace(httpserver: HTTPServer, srv: str): + mock_managed_create_table_flow( + httpserver, + table_name="demo", + columns=[{"name": "x", "data_type": "BIGINT", "nullable": True}], ) - con = ibis.hotdata.connect( - api_url=srv, - token="tok", - workspace_id="ws", - verify_ssl=False, + con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) + con.create_table( + "demo", + pd.DataFrame({"x": [1]}), + database=(MANAGED_CONN, PUBLIC), + overwrite=True, ) - con.drop_table("demo", database=("datasets", "sch_1")) + +def test_create_database_posts_managed_connection(httpserver: HTTPServer, srv: str): + def on_create(req: Request) -> Response: + body = req.get_json() + assert body == { + "name": "sales", + "source_type": "managed", + "config": { + "schemas": [{"name": "public", "tables": [{"name": "orders"}]}], + }, + "skip_discovery": True, + } + return Response( + json.dumps( + { + "id": MANAGED_CONN, + "name": "sales", + "source_type": "managed", + "discovery_status": "skipped", + "tables_discovered": 0, + } + ), + status=201, + content_type="application/json", + ) + + httpserver.expect_request("/v1/connections", method="GET").respond_with_json({"connections": []}) + httpserver.expect_request("/v1/connections", method="POST").respond_with_handler(on_create) + + con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) + con.create_database("sales", schema="public", tables=["orders"]) -def test_drop_table_force_ignores_missing_dataset(httpserver: HTTPServer, srv: str): - httpserver.expect_request("/v1/datasets").respond_with_json(dataset_list_response()) +def test_drop_table_deletes_managed_table(httpserver: HTTPServer, srv: str): + httpserver.expect_request("/v1/connections").respond_with_json(managed_connections_response()) + httpserver.expect_request( + f"/v1/connections/{MANAGED_CONN}/schemas/{PUBLIC}/tables/demo", + method="DELETE", + ).respond_with_data(b"", status=204) con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) + con.drop_table("demo", database=(MANAGED_CONN, PUBLIC)) - con.drop_table("missing", force=True) +def test_drop_table_force_ignores_missing_table(httpserver: HTTPServer, srv: str): + httpserver.expect_request("/v1/connections").respond_with_json(managed_connections_response()) + httpserver.expect_request( + f"/v1/connections/{MANAGED_CONN}/schemas/{PUBLIC}/tables/missing", + method="DELETE", + ).respond_with_data(b"not found", status=404) -def test_drop_table_raises_for_ambiguous_dataset_name(httpserver: HTTPServer, srv: str): - httpserver.expect_request("/v1/datasets").respond_with_json( - dataset_list_response( - dataset_summary("ds_1", "demo", schema_name="a"), - dataset_summary("ds_2", "demo", schema_name="b"), - ) + con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) + con.drop_table("missing", database=(MANAGED_CONN, PUBLIC), force=True) + + +def test_drop_table_raises_for_non_managed_connection(httpserver: HTTPServer, srv: str): + httpserver.expect_request("/v1/connections").respond_with_json( + {"connections": [{"id": TPCH_CONN, "name": "TPC-H", "source_type": "duckdb"}]} ) con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) - with pytest.raises(com.IbisInputError, match="Multiple Hotdata datasets"): - con.drop_table("demo") + with pytest.raises(com.IbisInputError, match="not a managed database"): + con.drop_table("demo", database=(TPCH_CONN, PUBLIC)) -def test_drop_table_raises_for_non_dataset_catalog(httpserver: HTTPServer, srv: str): - con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) +def test_drop_database_deletes_managed_connection(httpserver: HTTPServer, srv: str): + httpserver.expect_request("/v1/connections").respond_with_json(managed_connections_response()) + httpserver.expect_request(f"/v1/connections/{MANAGED_CONN}", method="DELETE").respond_with_data( + b"", status=204 + ) - with pytest.raises(com.TableNotFound): - con.drop_table("demo", database=("tpch", "sch_1")) + con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) + con.drop_database(MANAGED_NAME) def test_compile_scalar_no_roundtrip(httpserver: HTTPServer, srv: str): diff --git a/tests/test_hotdata_http.py b/tests/test_hotdata_http.py index 6a7477e..8402e44 100644 --- a/tests/test_hotdata_http.py +++ b/tests/test_hotdata_http.py @@ -167,7 +167,7 @@ def test_list_connections_raises_on_http_error(httpserver: HTTPServer): client.close() -def test_upload_file_then_create_dataset(httpserver: HTTPServer): +def test_upload_file_then_load_managed_table(httpserver: HTTPServer): httpserver.expect_oneshot_request( "/v1/files", method="POST", @@ -177,29 +177,27 @@ def test_upload_file_then_create_dataset(httpserver: HTTPServer): "status": "ready", "size_bytes": 3, "created_at": "2026-01-01T00:00:00Z", - "content_type": None, + "content_type": "application/parquet", }, status=201, ) - def on_dataset(req: Request) -> Response: + def on_load(req: Request) -> Response: body = req.get_json() - assert body["label"] == "demo" - assert body["source"] == {"type": "upload", "upload_id": "upl_1", "format": "csv"} - assert body.get("table_name") == "demo_tbl" + assert body == {"mode": "replace", "upload_id": "upl_1"} payload = { - "id": "ds_1", - "label": "demo", - "schema_name": "main", + "connection_id": "conn_sales", + "schema_name": "public", "table_name": "demo_tbl", - "status": "ready", - "created_at": "2026-01-01T00:00:00Z", + "row_count": 1, + "arrow_schema_json": "{}", } - return Response(json.dumps(payload), status=201, content_type="application/json") + return Response(json.dumps(payload), status=200, content_type="application/json") - httpserver.expect_oneshot_request("/v1/datasets", method="POST").respond_with_handler( - on_dataset - ) + httpserver.expect_oneshot_request( + "/v1/connections/conn_sales/schemas/public/tables/demo_tbl/loads", + method="POST", + ).respond_with_handler(on_load) client = HotdataClient( api_url=httpserver.url_for("/").rstrip("/"), @@ -207,37 +205,46 @@ def on_dataset(req: Request) -> Response: workspace_id="w", verify_ssl=False, ) - up = client.upload_file(b"a,b\n1,2") + up = client.upload_file(b"parquet", content_type="application/parquet") assert up["id"] == "upl_1" - ds = client.create_dataset_from_upload( + loaded = client.load_managed_table( + "conn_sales", + "public", + "demo_tbl", upload_id=up["id"], - label="demo", - table_name="demo_tbl", - file_format="csv", ) - assert ds["schema_name"] == "main" - assert ds["table_name"] == "demo_tbl" + assert loaded["table_name"] == "demo_tbl" client.close() -def test_upload_file_accepts_content_type(httpserver: HTTPServer): - def on_upload(req: Request) -> Response: - assert req.headers["Content-Type"] == "application/parquet" +def test_create_managed_database(httpserver: HTTPServer): + def on_create(req: Request) -> Response: + body = req.get_json() + assert body == { + "name": "sales", + "source_type": "managed", + "config": { + "schemas": [{"name": "public", "tables": [{"name": "orders"}]}], + }, + "skip_discovery": True, + } return Response( json.dumps( { - "id": "upl_1", - "status": "ready", - "size_bytes": len(req.get_data()), - "created_at": "2026-01-01T00:00:00Z", - "content_type": "application/parquet", + "id": "conn_sales", + "name": "sales", + "source_type": "managed", + "discovery_status": "skipped", + "tables_discovered": 0, } ), status=201, content_type="application/json", ) - httpserver.expect_oneshot_request("/v1/files", method="POST").respond_with_handler(on_upload) + httpserver.expect_oneshot_request("/v1/connections", method="POST").respond_with_handler( + on_create + ) client = HotdataClient( api_url=httpserver.url_for("/").rstrip("/"), @@ -245,35 +252,50 @@ def on_upload(req: Request) -> Response: workspace_id="w", verify_ssl=False, ) - out = client.upload_file(b"parquet", content_type="application/parquet") - assert out["id"] == "upl_1" + out = client.create_managed_database("sales", schema="public", tables=["orders"]) + assert out["id"] == "conn_sales" client.close() -def test_list_and_delete_datasets(httpserver: HTTPServer): - httpserver.expect_oneshot_request("/v1/datasets").respond_with_json( - { - "count": 1, - "datasets": [ +def test_delete_managed_table_and_connection(httpserver: HTTPServer): + httpserver.expect_oneshot_request( + "/v1/connections/conn_sales/schemas/public/tables/demo", + method="DELETE", + ).respond_with_data(b"", status=204) + httpserver.expect_oneshot_request( + "/v1/connections/conn_sales", + method="DELETE", + ).respond_with_data(b"", status=204) + + client = HotdataClient( + api_url=httpserver.url_for("/").rstrip("/"), + token="t", + workspace_id="w", + verify_ssl=False, + ) + client.delete_managed_table("conn_sales", "public", "demo") + client.delete_connection("conn_sales") + client.close() + + +def test_upload_file_accepts_content_type(httpserver: HTTPServer): + def on_upload(req: Request) -> Response: + assert req.headers["Content-Type"] == "application/parquet" + return Response( + json.dumps( { + "id": "upl_1", + "status": "ready", + "size_bytes": len(req.get_data()), "created_at": "2026-01-01T00:00:00Z", - "id": "ds_1", - "label": "demo", - "latest_version": 1, - "pinned_version": None, - "schema_name": "sch_1", - "table_name": "demo", - "updated_at": "2026-01-01T00:00:00Z", + "content_type": "application/parquet", } - ], - "has_more": False, - "limit": 1000, - "offset": 0, - } - ) - httpserver.expect_oneshot_request("/v1/datasets/ds_1", method="DELETE").respond_with_data( - b"", status=204 - ) + ), + status=201, + content_type="application/json", + ) + + httpserver.expect_oneshot_request("/v1/files", method="POST").respond_with_handler(on_upload) client = HotdataClient( api_url=httpserver.url_for("/").rstrip("/"), @@ -281,8 +303,6 @@ def test_list_and_delete_datasets(httpserver: HTTPServer): workspace_id="w", verify_ssl=False, ) - - datasets = client.list_datasets() - assert datasets["datasets"][0]["id"] == "ds_1" - client.delete_dataset("ds_1") + out = client.upload_file(b"parquet", content_type="application/parquet") + assert out["id"] == "upl_1" client.close() From 5def1dce5a3ec381fb3219cbddabc7cd763e06fc Mon Sep 17 00:00:00 2001 From: Eddie A Tejeda <669988+eddietejeda@users.noreply.github.com> Date: Mon, 18 May 2026 21:43:57 -0700 Subject: [PATCH 3/5] fix: address PR review nits for managed table writes Resolve managed connections once in _table_location and honor overwrite=False when a synced table already exists. --- README.md | 2 +- src/ibis_hotdata/backend.py | 31 +++++++++++++++++++++----- tests/test_hotdata_backend.py | 41 ++++++++++++++++++++++++++++++++--- 3 files changed, 64 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index af217db..e6d0931 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ Supported today: - **Result materialization:** `.execute()` returns pandas objects. `.to_pyarrow()` and `.to_pyarrow_batches()` use the Arrow IPC result data exposed by Hotdata without converting through JSON rows; batches are split locally after the result is downloaded. - **Raw SQL escape hatch:** `con.sql("SELECT ...", dialect="postgres")` is the most reliable way to use Hotdata-specific federated table names or SQL that Ibis does not model directly. - **Managed database lifecycle:** `create_database("sales", schema="public", tables=["orders"])` registers a managed connection (Ibis catalog). `create_table("orders", pandas_df, database=("sales", "public"))` uploads Parquet and loads it with replace mode. Query as `sales.public.orders` in SQL. `drop_table` clears a managed table; `drop_database` deletes the connection. -- **Parquet uploads:** `create_table` accepts pandas DataFrames, PyArrow tables, or schema-only empty tables. Tables must live in a managed connection — declare them with `create_database(..., tables=[...])` first. +- **Parquet uploads:** `create_table` accepts pandas DataFrames, PyArrow tables, or schema-only empty tables. Tables must live in a managed connection — declare them with `create_database(..., tables=[...])` first. Loads always use replace mode; pass `overwrite=True` to replace an existing synced table (the default `overwrite=False` raises if the table already exists). Not supported as full Ibis backend features: diff --git a/src/ibis_hotdata/backend.py b/src/ibis_hotdata/backend.py index 989eb26..84e0500 100644 --- a/src/ibis_hotdata/backend.py +++ b/src/ibis_hotdata/backend.py @@ -339,8 +339,19 @@ def _resolve_managed_connection(self, name_or_id: str) -> dict[str, Any]: ) return conn - def _connection_id(self, name_or_id: str) -> str: - return self._resolve_connection(name_or_id)["id"] + def _managed_table_synced( + self, + connection_id: str, + schema_name: str, + table_name: str, + ) -> bool: + for row in self._iterate_information_schema( + {"connection_id": connection_id, "schema": schema_name, "table": table_name}, + include_columns=False, + ): + if row["table"] == table_name and row["schema"] == schema_name: + return bool(row.get("synced", True)) + return False def _table_location( self, @@ -362,9 +373,8 @@ def _table_location( raise com.IbisInputError( "create_table with database=schema requires default_connection or current catalog" ) - conn_id = self._connection_id(conn) - self._resolve_managed_connection(conn) - return conn_id, schema + conn_record = self._resolve_managed_connection(conn) + return conn_record["id"], schema # --- schema / sql execution -------------------------------------------- @@ -588,15 +598,24 @@ def create_table( temp: bool = False, overwrite: bool = False, ) -> ir.Table: + """Upload local data into a declared managed table. + + Hotdata loads always use ``replace`` mode (the only API option). When + ``overwrite=False`` (the Ibis default), an existing synced table raises + :class:`~ibis.common.exceptions.IbisInputError` instead of replacing it. + """ if temp: raise NotImplementedError("Hotdata does not support temporary tables.") - del overwrite # loads always use replace mode (only API option) if obj is not None and schema is not None: raise com.IbisInputError("create_table accepts only one of obj or schema") data = self._local_table_to_parquet(obj, schema) connection_id, schema_name = self._table_location(database) + if not overwrite and self._managed_table_synced(connection_id, schema_name, name): + raise com.IbisInputError( + f"Table {name!r} already exists; pass overwrite=True to replace" + ) upload = self.upload_file(data, content_type="application/parquet") try: self._http.load_managed_table( diff --git a/tests/test_hotdata_backend.py b/tests/test_hotdata_backend.py index 9bd69d1..42e1520 100644 --- a/tests/test_hotdata_backend.py +++ b/tests/test_hotdata_backend.py @@ -86,6 +86,7 @@ def mock_managed_create_table_flow( schema_name: str = PUBLIC, columns: list[dict], on_upload: Callable[[Request], Response] | None = None, + table_exists: bool = False, ) -> None: httpserver.expect_request("/v1/connections").respond_with_json(managed_connections_response()) @@ -130,9 +131,24 @@ def on_load(req: Request) -> Response: f"/v1/connections/{MANAGED_CONN}/schemas/{schema_name}/tables/{table_name}/loads", method="POST", ).respond_with_handler(on_load) - httpserver.expect_request("/v1/information_schema").respond_with_json( - information_schema_response(table_name, schema_name, columns) - ) + + info_calls = {"n": 0} + + def on_information_schema(req: Request) -> Response: + info_calls["n"] += 1 + if table_exists or info_calls["n"] > 1: + payload = information_schema_response(table_name, schema_name, columns) + else: + payload = { + "count": 0, + "tables": [], + "has_more": False, + "next_cursor": None, + "limit": 500, + } + return Response(json.dumps(payload), status=200, content_type="application/json") + + httpserver.expect_request("/v1/information_schema").respond_with_handler(on_information_schema) def test_connect_via_url(httpserver: HTTPServer, srv: str): @@ -421,6 +437,7 @@ def test_create_table_overwrite_loads_with_replace(httpserver: HTTPServer, srv: httpserver, table_name="demo", columns=[{"name": "x", "data_type": "BIGINT", "nullable": True}], + table_exists=True, ) con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) @@ -432,6 +449,24 @@ def test_create_table_overwrite_loads_with_replace(httpserver: HTTPServer, srv: ) +def test_create_table_without_overwrite_rejects_existing_table(httpserver: HTTPServer, srv: str): + mock_managed_create_table_flow( + httpserver, + table_name="demo", + columns=[{"name": "x", "data_type": "BIGINT", "nullable": True}], + table_exists=True, + ) + + con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) + + with pytest.raises(com.IbisInputError, match="already exists"): + con.create_table( + "demo", + pd.DataFrame({"x": [1]}), + database=(MANAGED_CONN, PUBLIC), + ) + + def test_create_database_posts_managed_connection(httpserver: HTTPServer, srv: str): def on_create(req: Request) -> Response: body = req.get_json() From a9d350c274eac4700896cac82903963700fe87bf Mon Sep 17 00:00:00 2001 From: Eddie A Tejeda <669988+eddietejeda@users.noreply.github.com> Date: Mon, 18 May 2026 21:45:56 -0700 Subject: [PATCH 4/5] ci: add PyPI publish workflow on version tags Publish ibis-hotdata to PyPI via trusted publishing when v* tags are pushed. --- .github/workflows/publish.yml | 70 +++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 .github/workflows/publish.yml diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..2ee653f --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,70 @@ +name: Publish to PyPI + +on: + push: + tags: + - 'v[0-9]*' + +concurrency: + group: pypi-publish-${{ github.ref_name }} + cancel-in-progress: false + +permissions: + contents: read + +jobs: + build: + name: Build distribution + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6 + + - uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6 + with: + python-version: '3.12' + + - name: Install build tooling + run: python -m pip install --upgrade build twine + + - name: Verify tag matches pyproject version + run: | + # Release tags must start with `v` followed by a PEP 440 version (e.g. v1.2.3, v1.2.3a1). + if [[ ! "$GITHUB_REF_NAME" =~ ^v[0-9] ]]; then + echo "Release tag '$GITHUB_REF_NAME' must start with 'v' followed by a digit (e.g. v1.0.0)" >&2 + exit 1 + fi + tag="${GITHUB_REF_NAME#v}" + pkg_version=$(python -c "import tomllib,pathlib; print(tomllib.loads(pathlib.Path('pyproject.toml').read_text())['project']['version'])") + if [ "$tag" != "$pkg_version" ]; then + echo "Release tag ($tag) does not match pyproject.toml version ($pkg_version)" >&2 + exit 1 + fi + + - name: Build sdist and wheel + run: python -m build + + - name: Check distribution metadata + run: python -m twine check --strict dist/* + + - uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5 + with: + name: dist + path: dist/ + + publish: + name: Publish to PyPI + needs: build + runs-on: ubuntu-latest + environment: + name: pypi + url: https://pypi.org/p/ibis-hotdata + permissions: + id-token: write + steps: + - uses: actions/download-artifact@634f93cb2916e3fdff6788551b99b062d0335ce0 # v5 + with: + name: dist + path: dist/ + + - name: Publish via Trusted Publishing + uses: pypa/gh-action-pypi-publish@ed0c53931b1dc9bd32cbe73a98c7f6766f8a527e # v1.13.0 From 3356374caa5c65641a6d100d45e8186b8a8ed7d2 Mon Sep 17 00:00:00 2001 From: Eddie A Tejeda <669988+eddietejeda@users.noreply.github.com> Date: Mon, 18 May 2026 21:48:24 -0700 Subject: [PATCH 5/5] fix: tighten create_database and force handling on drops Resolve existing connections once in create_database and only let force ignore missing resources, not misconfigured non-managed targets. --- src/ibis_hotdata/backend.py | 16 ++++++++++++---- tests/test_hotdata_backend.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/src/ibis_hotdata/backend.py b/src/ibis_hotdata/backend.py index 84e0500..c5ba6d5 100644 --- a/src/ibis_hotdata/backend.py +++ b/src/ibis_hotdata/backend.py @@ -519,13 +519,17 @@ def create_database( "Hotdata create_database creates a managed connection (catalog); catalog= is not supported" ) try: - self._resolve_connection(name) + existing = self._resolve_connection(name) except com.IbisError: - pass - else: + existing = None + if existing is not None: if not force: raise com.IbisInputError(f"Managed database {name!r} already exists") - self._resolve_managed_connection(name) + if existing.get("source_type") != MANAGED_SOURCE_TYPE: + raise com.IbisInputError( + f"{name!r} is not a managed database " + f"(source_type={existing.get('source_type')!r})" + ) return try: self._http.create_managed_database(name, schema=schema, tables=list(tables or ())) @@ -547,6 +551,8 @@ def drop_database( ) try: conn = self._resolve_managed_connection(name) + except com.IbisInputError: + raise except com.IbisError: if force: return @@ -638,6 +644,8 @@ def drop_table( ) -> None: try: connection_id, schema_name = self._table_location(database) + except com.IbisInputError: + raise except com.IbisError: if force: return diff --git a/tests/test_hotdata_backend.py b/tests/test_hotdata_backend.py index 42e1520..6f4456e 100644 --- a/tests/test_hotdata_backend.py +++ b/tests/test_hotdata_backend.py @@ -532,6 +532,17 @@ def test_drop_table_raises_for_non_managed_connection(httpserver: HTTPServer, sr con.drop_table("demo", database=(TPCH_CONN, PUBLIC)) +def test_drop_table_force_still_raises_for_non_managed_connection(httpserver: HTTPServer, srv: str): + httpserver.expect_request("/v1/connections").respond_with_json( + {"connections": [{"id": TPCH_CONN, "name": "TPC-H", "source_type": "duckdb"}]} + ) + + con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) + + with pytest.raises(com.IbisInputError, match="not a managed database"): + con.drop_table("demo", database=(TPCH_CONN, PUBLIC), force=True) + + def test_drop_database_deletes_managed_connection(httpserver: HTTPServer, srv: str): httpserver.expect_request("/v1/connections").respond_with_json(managed_connections_response()) httpserver.expect_request(f"/v1/connections/{MANAGED_CONN}", method="DELETE").respond_with_data( @@ -542,6 +553,24 @@ def test_drop_database_deletes_managed_connection(httpserver: HTTPServer, srv: s con.drop_database(MANAGED_NAME) +def test_drop_database_force_ignores_unknown_connection(httpserver: HTTPServer, srv: str): + httpserver.expect_request("/v1/connections").respond_with_json({"connections": []}) + + con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) + con.drop_database("missing", force=True) + + +def test_drop_database_force_raises_for_non_managed_connection(httpserver: HTTPServer, srv: str): + httpserver.expect_request("/v1/connections").respond_with_json( + {"connections": [{"id": TPCH_CONN, "name": "TPC-H", "source_type": "duckdb"}]} + ) + + con = ibis.hotdata.connect(api_url=srv, token="tok", workspace_id="ws", verify_ssl=False) + + with pytest.raises(com.IbisInputError, match="not a managed database"): + con.drop_database("TPC-H", force=True) + + def test_compile_scalar_no_roundtrip(httpserver: HTTPServer, srv: str): con = ibis.hotdata.connect( api_url=srv,