-
Notifications
You must be signed in to change notification settings - Fork 0
Document and extend Ibis support coverage #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9adc90f
6948932
2d2db2f
10a238d
e65ee1e
f80ce96
fd377ab
e4f83f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,3 +9,5 @@ dist/ | |
| build/ | ||
| .coverage | ||
| htmlcov/ | ||
|
|
||
| .DS_Store | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,10 @@ | ||
| from importlib.metadata import PackageNotFoundError, version | ||
|
|
||
| try: | ||
| __version__ = version("ibis-hotdata") | ||
| except PackageNotFoundError: | ||
| __version__ = "0.0.0+unknown" | ||
|
|
||
| from ibis_hotdata.backend import Backend | ||
|
|
||
| __all__ = ["Backend"] | ||
| __all__ = ["Backend", "__version__"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ | |
| from __future__ import annotations | ||
|
|
||
| import contextlib | ||
| import io | ||
| from collections.abc import Iterable, Mapping | ||
| from functools import cached_property | ||
| from importlib.metadata import PackageNotFoundError | ||
|
|
@@ -319,6 +320,45 @@ def _iterate_information_schema( | |
| if cursor is None: | ||
| break | ||
|
|
||
| def _iterate_datasets(self) -> Iterable[dict[str, Any]]: | ||
| offset = 0 | ||
| limit = 1000 | ||
| while True: | ||
| chunk = self._http.list_datasets(limit=limit, offset=offset) | ||
| yield from chunk["datasets"] | ||
| if not chunk.get("has_more"): | ||
| break | ||
| offset += limit | ||
|
|
||
| def _dataset_database(self, database: tuple[str, str] | str | None) -> str | None: | ||
| if database is None: | ||
| return None | ||
| table_loc = self._to_sqlglot_table(database) | ||
| catalog, schema_name = self._to_catalog_db_tuple(table_loc) | ||
| if catalog and catalog != "datasets": | ||
| return "__not_datasets__" | ||
| return schema_name or catalog | ||
|
|
||
| def _find_dataset( | ||
| self, table_name: str, database: tuple[str, str] | str | None | ||
| ) -> dict[str, Any]: | ||
| schema_name = self._dataset_database(database) | ||
| if schema_name == "__not_datasets__": | ||
| raise com.TableNotFound(table_name) | ||
| matches = [ | ||
| ds | ||
| for ds in self._iterate_datasets() | ||
| if ds["table_name"] == table_name | ||
| and (schema_name is None or ds["schema_name"] == schema_name) | ||
| ] | ||
| if not matches: | ||
| raise com.TableNotFound(table_name) | ||
| if len(matches) > 1: | ||
| raise com.IbisInputError( | ||
| f"Multiple Hotdata datasets named {table_name!r}; pass database=('datasets', schema)." | ||
| ) | ||
| return matches[0] | ||
|
|
||
| # --- schema / sql execution -------------------------------------------- | ||
|
|
||
| def get_schema( | ||
|
|
@@ -391,10 +431,58 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame: | |
| df = cursor.to_pandas() | ||
| return PandasData.convert_table(df, schema) | ||
|
|
||
| def upload_file(self, data: bytes) -> dict[str, Any]: | ||
| def to_pyarrow( | ||
| self, | ||
| expr: ir.Expr, | ||
| /, | ||
| *, | ||
| params: Mapping[ir.Scalar, Any] | None = None, | ||
| limit: int | str | None = None, | ||
| **kwargs: Any, | ||
| ): | ||
| self._run_pre_execute_hooks(expr) | ||
| table_expr = expr.as_table() | ||
| sql = self.compile(table_expr, params=params, limit=limit, **kwargs) | ||
| try: | ||
| payload = self._http.execute_query( | ||
| sql, | ||
| poll_interval_s=self._poll_interval_s, | ||
| poll_timeout_s=self._poll_timeout_s, | ||
| ) | ||
| except HotdataAPIError as exc: | ||
| raise _ibis_err_from_hotdata(exc) from exc | ||
| table = payload["pa_table"] | ||
| arrow_schema = table_expr.schema().to_pyarrow() | ||
| table = table.rename_columns(list(table_expr.columns)).cast(arrow_schema) | ||
| return expr.__pyarrow_result__(table) | ||
|
|
||
| def to_pyarrow_batches( | ||
| self, | ||
| expr: ir.Expr, | ||
| /, | ||
| *, | ||
| params: Mapping[ir.Scalar, Any] | None = None, | ||
| limit: int | str | None = None, | ||
| chunk_size: int = 1_000_000, | ||
| **kwargs: Any, | ||
| ): | ||
| """Execute to Arrow and expose local record batches. | ||
|
|
||
| Hotdata currently returns one Arrow IPC result for the full query. This | ||
| method downloads that result first, then splits it into local batches. | ||
| """ | ||
| import pyarrow as pa | ||
|
|
||
| table = self.to_pyarrow(expr.as_table(), params=params, limit=limit, **kwargs) | ||
| return pa.ipc.RecordBatchReader.from_batches( | ||
| table.schema, | ||
| table.to_batches(max_chunksize=chunk_size), | ||
| ) | ||
|
Comment on lines
+459
to
+480
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. super nit: (not blocking) |
||
|
|
||
| def upload_file(self, data: bytes, *, content_type: str | None = None) -> dict[str, Any]: | ||
| """POST ``/v1/files``; returns the upload record (use ``id`` with :meth:`create_dataset_from_upload`).""" | ||
| try: | ||
| return self._http.upload_file(data) | ||
| return self._http.upload_file(data, content_type=content_type) | ||
| except HotdataAPIError as exc: | ||
| raise _ibis_err_from_hotdata(exc) from exc | ||
|
|
||
|
|
@@ -421,14 +509,81 @@ def create_dataset_from_upload( | |
| except HotdataAPIError as exc: | ||
| raise _ibis_err_from_hotdata(exc) from exc | ||
|
|
||
| def create_table(self, *_args: Any, **_kwargs: Any) -> ir.Table: | ||
| raise NotImplementedError( | ||
| "Hotdata does not implement Ibis create_table in v1; use upload_file + " | ||
| "create_dataset_from_upload, then SQL or con.table with the returned names." | ||
| def _local_table_to_parquet(self, obj: Any, schema: sch.Schema | None): | ||
| import pandas as pd | ||
| import pyarrow as pa | ||
| import pyarrow.parquet as pq | ||
|
|
||
| if obj is not None and schema is not None: | ||
| raise com.IbisInputError("create_table accepts only one of obj or schema") | ||
|
|
||
| if obj is None: | ||
| if schema is None: | ||
| raise com.IbisInputError("create_table requires a pandas/pyarrow object or schema") | ||
| arrow_schema = schema.to_pyarrow() | ||
| table = pa.Table.from_arrays( | ||
| [pa.array([], type=field.type) for field in arrow_schema], | ||
| schema=arrow_schema, | ||
| ) | ||
| elif isinstance(obj, pa.Table): | ||
| table = obj | ||
| elif isinstance(obj, pd.DataFrame): | ||
| table = pa.Table.from_pandas(obj, preserve_index=False) | ||
| else: | ||
| raise com.IbisInputError( | ||
| "create_table currently accepts pandas.DataFrame or pyarrow.Table" | ||
| ) | ||
|
Comment on lines
+512
to
+535
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. super nit: (not blocking) When both |
||
|
|
||
| sink = io.BytesIO() | ||
| pq.write_table(table, sink) | ||
| return sink.getvalue() | ||
|
|
||
| def create_table( | ||
| self, | ||
| name: str, | ||
| /, | ||
| obj: Any = None, | ||
| *, | ||
| schema: sch.Schema | None = None, | ||
| database: str | None = None, | ||
| temp: bool = False, | ||
| overwrite: bool = False, | ||
| ) -> ir.Table: | ||
| if temp: | ||
| raise NotImplementedError("Hotdata does not support temporary tables.") | ||
| if overwrite: | ||
| raise NotImplementedError("Hotdata create_table does not support overwrite.") | ||
| if database is not None: | ||
| raise NotImplementedError("Hotdata datasets choose their schema at creation time.") | ||
|
|
||
| data = self._local_table_to_parquet(obj, schema) | ||
| upload = self.upload_file(data, content_type="application/parquet") | ||
| dataset = self.create_dataset_from_upload( | ||
| upload_id=upload["id"], | ||
| label=name, | ||
| table_name=name, | ||
| file_format="parquet", | ||
| ) | ||
| return self.table(dataset["table_name"], database=("datasets", dataset["schema_name"])) | ||
|
|
||
| def drop_table(self, *_args: Any, **_kwargs: Any) -> None: | ||
| raise NotImplementedError("Hotdata backend does not implement drop_table in v1.") | ||
| def drop_table( | ||
| self, | ||
| name: str, | ||
| /, | ||
| *, | ||
| database: tuple[str, str] | str | None = None, | ||
| force: bool = False, | ||
| ) -> None: | ||
| try: | ||
| dataset = self._find_dataset(name, database) | ||
| except com.TableNotFound: | ||
| if force: | ||
| return | ||
| raise | ||
| try: | ||
| self._http.delete_dataset(dataset["id"]) | ||
| except HotdataAPIError as exc: | ||
| raise _ibis_err_from_hotdata(exc) from exc | ||
|
|
||
| def _register_in_memory_table(self, _op: ops.InMemoryTable) -> None: | ||
| return | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: (not blocking) The
"__not_datasets__"magic string is fragile — if a Hotdata schema is ever literally named__not_datasets__, lookups would silently misbehave. More importantly, when the sentinel is set,_find_datasetstill iterates every page oflist_datasets()only to discard each row againstschema_name != "__not_datasets__". Consider short-circuiting before the iteration (e.g., raiseTableNotFounddirectly in_find_datasetwhen the catalog isn't"datasets") which avoids both the sentinel and the unnecessary network calls — seetest_drop_table_raises_for_non_dataset_catalogwhere the dataset list endpoint is hit needlessly.