Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ con = ibis.hotdata.connect(
timeout=120.0,
default_connection=None, # Hotdata connection id → Ibis catalog
default_schema=None, # remote schema → Ibis database
prefer_async=False,
poll_interval_s=0.25,
poll_timeout_s=600.0,
)
```

Expand All @@ -41,7 +42,7 @@ con = ibis.connect(

**Mapping:** Ibis **catalog** = Hotdata connection id; **database** = remote schema; **table** = table name. SQL references look like `connection.schema.table`. With a single connection and schema, defaults are inferred; otherwise set `default_connection` / `default_schema` or qualify `con.table(..., database=(conn_id, schema))`.

**Execution:** SQL is compiled with Ibis’s **Postgres** SQLGlot compiler. The client uses `POST /v1/query`; with `prefer_async=True` it follows `202` and polls query-run and result endpoints until rows are ready. Tuning: `poll_interval_s`, `poll_timeout_s` on `connect()`.
**Execution:** SQL is compiled with Ibis’s **Postgres** SQLGlot compiler. The client submits queries asynchronously with `POST /v1/query`, polls `GET /v1/query-runs/{id}`, then downloads ready results as Arrow IPC from `GET /v1/results/{id}`. Tuning: `poll_interval_s`, `poll_timeout_s` on `connect()`.

**Types:** Typed tables come from Hotdata’s information schema. `con.sql(...)` types are inferred from a small preview query; see [Hotdata SQL](https://www.hotdata.dev/docs/sql) for server behavior.

Expand Down
17 changes: 2 additions & 15 deletions examples/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,11 @@ def parser(description: str) -> argparse.ArgumentParser:
action="store_true",
help="Disable TLS verification (dev only)",
)
p.add_argument(
"--prefer-async",
action="store_true",
help="Prefer async POST /v1/query",
)
p.add_argument("--timeout", type=float, default=120.0)
p.add_argument(
"--default-connection",
dest="default_connection",
default=os.environ.get("HOTDATA_DEFAULT_CONNECTION")
or DEFAULT_TPCH_CONNECTION,
default=os.environ.get("HOTDATA_DEFAULT_CONNECTION") or DEFAULT_TPCH_CONNECTION,
help=f"Connection id (= Ibis catalog). Env HOTDATA_DEFAULT_CONNECTION. Default {DEFAULT_TPCH_CONNECTION!r}.",
)
p.add_argument(
Expand All @@ -200,11 +194,7 @@ def parser(description: str) -> argparse.ArgumentParser:
def parsed_args(parser: argparse.ArgumentParser) -> argparse.Namespace:
ns = parser.parse_args()
if not ns.token.strip() or not ns.workspace_id.strip():
parser.error(
"Set HOTDATA_TOKEN and HOTDATA_WORKSPACE_ID, or pass --token and --workspace."
)
if os.environ.get("HOTDATA_PREFER_ASYNC", "").lower() in ("1", "true", "yes"):
ns.prefer_async = True
parser.error("Set HOTDATA_TOKEN and HOTDATA_WORKSPACE_ID, or pass --token and --workspace.")
normalize_tpch_defaults(ns)
return ns

Expand All @@ -217,7 +207,6 @@ def connect_kwargs(ns: argparse.Namespace, **extras) -> dict:
"token": ns.token.strip(),
"workspace_id": ns.workspace_id.strip(),
"timeout": ns.timeout,
"prefer_async": ns.prefer_async,
"verify_ssl": False if getattr(ns, "insecure", False) else True,
}
if ns.session_id:
Expand Down Expand Up @@ -256,7 +245,5 @@ def hotdata_connect_uri(ns: argparse.Namespace) -> str:
qs["default_connection"] = dc
if ds:
qs["default_schema"] = ds
if ns.prefer_async:
qs["prefer_async"] = "true"
q = urllib.parse.urlencode(qs)
return f"hotdata://{api_host(ns.api_url)}/?{q}"
140 changes: 55 additions & 85 deletions src/ibis_hotdata/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,58 +16,44 @@
from __future__ import annotations

import contextlib
import urllib.parse
from collections.abc import Iterable, Mapping
from functools import cached_property
from importlib.metadata import PackageNotFoundError
from importlib.metadata import version as pkg_version
from typing import TYPE_CHECKING, Any
from urllib.parse import ParseResult, unquote_plus

import sqlglot as sg
import sqlglot.expressions as sge
from urllib.parse import ParseResult, parse_qsl, unquote_plus

import ibis.backends.sql.compilers as sc
import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
import ibis.expr.operations as ops
import ibis.expr.schema as sch
import ibis.expr.types as ir

from ibis.backends import CanListCatalog, CanListDatabase, HasCurrentCatalog, HasCurrentDatabase, NoExampleLoader
import sqlglot as sg
import sqlglot.expressions as sge
from ibis.backends import (
CanListCatalog,
CanListDatabase,
HasCurrentCatalog,
HasCurrentDatabase,
NoExampleLoader,
)
from ibis.backends.sql import SQLBackend

from ibis_hotdata.http import HotdataAPIError, HotdataClient
from ibis_hotdata.types import dtype_from_hotdata_sql_type, dtype_from_json_value
from ibis_hotdata.types import dtype_from_hotdata_sql_type

if TYPE_CHECKING:
from collections.abc import Iterator
_INFORMATION_SCHEMA_PAGE_SIZE = 500

import pandas as pd

def _ibis_err_from_hotdata(exc: HotdataAPIError) -> com.IbisError:
return com.IbisError(str(exc))

class HotdataRowsCursor:
"""DB-API–like cursor backed by prefetched rows (used by `_fetch_from_cursor`)."""

def __init__(self, rows: list) -> None:
self._rows = rows
self._idx = 0

def fetchmany(self, size: int = 1024) -> list:
start = self._idx
end = min(self._idx + size, len(self._rows))
self._idx = end
return [tuple(r) for r in self._rows[start:end]]

def fetchall(self) -> list:
return [tuple(r) for r in self._rows[self._idx :]]

def close(self) -> None:
self._idx = len(self._rows)
if TYPE_CHECKING:
from collections.abc import Iterator

def __iter__(self) -> Iterator[tuple]:
while self._idx < len(self._rows):
row = self._rows[self._idx]
self._idx += 1
yield tuple(row)
import pandas as pd


class Backend(
Expand Down Expand Up @@ -95,16 +81,18 @@ def _from_url(self, url: ParseResult, **kwarg_overrides: Any):
* Base URL defaults to ``https://{host}`` plus optional leading ``path``.
* Query string may include ``token``, ``workspace_id``, ``session_id``,
``timeout``, ``verify_ssl`` (``true`` / ``false``), ``default_connection``,
``default_schema``, ``prefer_async``.
``default_schema``, ``poll_interval_s``, ``poll_timeout_s``.
* If ``token`` is omitted, ``urlparse`` password (`user:TOKEN@`) is accepted.
"""
q = dict(urllib.parse.parse_qsl(url.query, keep_blank_values=True))
q = dict(parse_qsl(url.query, keep_blank_values=True))
q.update(kwarg_overrides)

netloc = url.netloc
path_prefix = url.path.rstrip("/")
if not netloc:
raise com.IbisError("hotdata:// URL requires a network location, e.g. hotdata://api.hotdata.dev/")
raise com.IbisError(
"hotdata:// URL requires a network location, e.g. hotdata://api.hotdata.dev/"
)

verify = q.pop("verify_ssl", None)
if verify is None:
Expand All @@ -117,13 +105,9 @@ def _from_url(self, url: ParseResult, **kwarg_overrides: Any):
timeout = float(q.pop("timeout", "120"))
api_url = q.pop("api_url", None) or ("https://" + netloc + path_prefix)

token = q.pop("token", None) or (
unquote_plus(url.password) if url.password else None
)
token = q.pop("token", None) or (unquote_plus(url.password) if url.password else None)
workspace_id = q.pop("workspace_id", None)

prefer_async_s = q.pop("prefer_async", "false")

kwargs = dict(
api_url=api_url,
token=token,
Expand All @@ -133,7 +117,6 @@ def _from_url(self, url: ParseResult, **kwarg_overrides: Any):
verify_ssl=verify_ssl,
default_connection=q.pop("default_connection", None),
default_schema=q.pop("default_schema", None),
prefer_async=str(prefer_async_s).lower() in ("true", "1", "yes"),
poll_interval_s=float(q.pop("poll_interval_s", "0.25")),
poll_timeout_s=float(q.pop("poll_timeout_s", "600")),
)
Expand All @@ -157,12 +140,14 @@ def do_connect(
verify_ssl: bool | str = True,
default_connection: str | None = None,
default_schema: str | None = None,
prefer_async: bool = False,
poll_interval_s: float = 0.25,
poll_timeout_s: float = 600.0,
) -> None:
"""Create an Ibis client for a Hotdata workspace.

Query execution always uses Hotdata's async path and downloads ready
results as Arrow IPC from ``GET /v1/results/{id}``.

Parameters
----------
api_url
Expand All @@ -184,8 +169,6 @@ def do_connect(
default_schema
Optional default **database** (remote schema name). If omitted and only
one schema exists for the default connection, it is chosen automatically.
prefer_async
When True, requests ``async: true`` on ``POST /v1/query`` (with polling).
poll_interval_s
Sleep between ``GET /v1/query-runs/{id}`` polls.
poll_timeout_s
Expand All @@ -194,7 +177,6 @@ def do_connect(
self.disconnect()
self._default_connection = default_connection
self._default_schema = default_schema
self._prefer_async = prefer_async
self._poll_interval_s = poll_interval_s
self._poll_timeout_s = poll_timeout_s

Expand Down Expand Up @@ -236,7 +218,9 @@ def _infer_default_schema(self, connection_id: str) -> str:
)
if self._default_schema is not None:
if self._default_schema not in schemas:
raise com.IbisInputError(f"Unknown schema {self._default_schema!r} for connection {connection_id!r}")
raise com.IbisInputError(
f"Unknown schema {self._default_schema!r} for connection {connection_id!r}"
)
return self._default_schema
if len(schemas) == 1:
self._default_schema = schemas[0]
Expand Down Expand Up @@ -287,7 +271,9 @@ def list_databases(
schemas = sorted(
{
row["schema"]
for row in self._iterate_information_schema({"connection_id": conn}, include_columns=False)
for row in self._iterate_information_schema(
{"connection_id": conn}, include_columns=False
)
}
)
return self._filter_with_like(list(schemas), like)
Expand All @@ -308,7 +294,10 @@ def list_tables(
params["schema"] = schema_part

tables = sorted(
{row["table"] for row in self._iterate_information_schema(params, include_columns=False)}
{
row["table"]
for row in self._iterate_information_schema(params, include_columns=False)
}
)
return self._filter_with_like(tables, like)

Expand All @@ -318,7 +307,7 @@ def _iterate_information_schema(
cursor: str | None = None
while True:
params: dict[str, Any] = dict(filters)
params["limit"] = 500
params["limit"] = _INFORMATION_SCHEMA_PAGE_SIZE
params["include_columns"] = include_columns
if cursor:
params["cursor"] = cursor
Expand Down Expand Up @@ -367,70 +356,47 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:
try:
data = self._http.execute_query(
preview_sql,
prefer_async=self._prefer_async,
poll_interval_s=self._poll_interval_s,
poll_timeout_s=self._poll_timeout_s,
)
except HotdataAPIError as exc:
raise com.IbisError(str(exc)) from exc
raise _ibis_err_from_hotdata(exc) from exc

cols = data["columns"]
nulls = data["nullable"]
row0 = data["rows"][0] if data.get("rows") else None
mapping: dict[str, dt.DataType] = {}
for i, name in enumerate(cols):
null = bool(nulls[i]) if i < len(nulls) else True
if row0 is not None and i < len(row0):
inferred = dtype_from_json_value(row0[i])
if inferred is not None:
mapping[name] = inferred.copy(nullable=null)
continue
mapping[name] = dt.String(nullable=null)
return sch.Schema(mapping)
from ibis.formats.pyarrow import PyArrowSchema

return PyArrowSchema.to_ibis(data["pa_table"].schema)

@contextlib.contextmanager
def _safe_raw_sql(
self,
query: str | sge.Expression,
) -> Iterator[HotdataRowsCursor]:
) -> Iterator[Any]:
if not isinstance(query, str):
query = query.sql(dialect=self.compiler.dialect, pretty=True)

try:
payload = self._http.execute_query(
query,
prefer_async=self._prefer_async,
poll_interval_s=self._poll_interval_s,
poll_timeout_s=self._poll_timeout_s,
)
except HotdataAPIError as exc:
raise com.IbisError(str(exc)) from exc
raise _ibis_err_from_hotdata(exc) from exc

cur = HotdataRowsCursor(payload["rows"])
try:
yield cur
finally:
cur.close()
yield payload["pa_table"]

def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
import pandas as pd

from ibis.formats.pandas import PandasData

try:
df = pd.DataFrame.from_records(iter(cursor), columns=schema.names, coerce_float=True)
except Exception:
cursor.close()
raise
df = PandasData.convert_table(df, schema)
return df
df = cursor.to_pandas()
return PandasData.convert_table(df, schema)

def upload_file(self, data: bytes) -> dict[str, Any]:
"""POST ``/v1/files``; returns the upload record (use ``id`` with :meth:`create_dataset_from_upload`)."""
try:
return self._http.upload_file(data)
except HotdataAPIError as exc:
raise com.IbisError(str(exc)) from exc
raise _ibis_err_from_hotdata(exc) from exc

def create_dataset_from_upload(
self,
Expand All @@ -453,7 +419,7 @@ def create_dataset_from_upload(
file_format=file_format,
)
except HotdataAPIError as exc:
raise com.IbisError(str(exc)) from exc
raise _ibis_err_from_hotdata(exc) from exc

def create_table(self, *_args: Any, **_kwargs: Any) -> ir.Table:
raise NotImplementedError(
Expand All @@ -469,4 +435,8 @@ def _register_in_memory_table(self, _op: ops.InMemoryTable) -> None:

@cached_property
def version(self) -> str:
return "Hotdata REST API (/v1/query)"
try:
v = pkg_version("ibis-hotdata")
except PackageNotFoundError:
v = "0.0.0"
return f"ibis-hotdata {v} (Hotdata /v1/query)"
Loading
Loading