Skip to content

Commit 1e67e2a

Browse files
committed
Revert "UNPICK changes to review"
This reverts commit 22c0f35.
1 parent 22c0f35 commit 1e67e2a

File tree

5 files changed

+592
-14
lines changed

5 files changed

+592
-14
lines changed

docs/source/user-guide/dataframe/index.rst

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,33 @@ Core Classes
228228
* :py:meth:`~datafusion.SessionContext.from_pandas` - Create from Pandas DataFrame
229229
* :py:meth:`~datafusion.SessionContext.from_arrow` - Create from Arrow data
230230

231+
``SessionContext`` can automatically resolve SQL table names that match
232+
in-scope Python data objects. When automatic lookup is enabled, a query
233+
such as ``ctx.sql("SELECT * FROM pdf")`` will register a pandas or
234+
PyArrow object named ``pdf`` without calling
235+
:py:meth:`~datafusion.SessionContext.from_pandas` or
236+
:py:meth:`~datafusion.SessionContext.from_arrow` explicitly. This requires
237+
the corresponding library (``pandas`` for pandas objects, ``pyarrow`` for
238+
Arrow objects) to be installed.
239+
240+
.. code-block:: python
241+
242+
import pandas as pd
243+
from datafusion import SessionContext
244+
245+
ctx = SessionContext(auto_register_python_objects=True)
246+
pdf = pd.DataFrame({"value": [1, 2, 3]})
247+
248+
df = ctx.sql("SELECT SUM(value) AS total FROM pdf")
249+
print(df.to_pandas()) # automatically registers `pdf`
250+
251+
Automatic lookup is disabled by default. Enable it by passing
252+
``auto_register_python_objects=True`` when constructing the session or by
253+
configuring :py:class:`~datafusion.SessionConfig` with
254+
:py:meth:`~datafusion.SessionConfig.with_python_table_lookup`. Use
255+
:py:meth:`~datafusion.SessionContext.set_python_table_lookup` to toggle the
256+
behaviour at runtime.
257+
231258
See: :py:class:`datafusion.SessionContext`
232259

233260
Expression Classes

docs/source/user-guide/sql.rst

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,29 @@ DataFusion also offers a SQL API, read the full reference `here <https://arrow.a
3636
df = ctx.sql('SELECT "Attack"+"Defense", "Attack"-"Defense" FROM pokemon')
3737
3838
# collect and convert to pandas DataFrame
39-
df.to_pandas()
39+
df.to_pandas()
40+
41+
Automatic variable registration
42+
-------------------------------
43+
44+
You can opt-in to DataFusion automatically registering Arrow-compatible Python
45+
objects that appear in SQL queries. This removes the need to call
46+
``register_*`` helpers explicitly when working with in-memory data structures.
47+
48+
.. code-block:: python
49+
50+
import pyarrow as pa
51+
from datafusion import SessionContext
52+
53+
ctx = SessionContext(auto_register_python_objects=True)
54+
55+
orders = pa.Table.from_pydict({"item": ["apple", "pear"], "qty": [5, 2]})
56+
57+
result = ctx.sql("SELECT item, qty FROM orders WHERE qty > 2")
58+
print(result.to_pandas())
59+
60+
The feature inspects the call stack for variables whose names match missing
61+
tables and registers them if they expose Arrow data (including pandas and
62+
Polars DataFrames). Existing contexts can enable or disable the behavior at
63+
runtime through :py:meth:`SessionContext.set_python_table_lookup` or by passing
64+
``auto_register_python_objects`` when constructing the session.

python/datafusion/context.py

Lines changed: 212 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919

2020
from __future__ import annotations
2121

22+
import inspect
23+
import re
2224
import warnings
25+
import weakref
2326
from typing import TYPE_CHECKING, Any, Protocol
2427

2528
try:
@@ -101,6 +104,7 @@ def __init__(self, config_options: dict[str, str] | None = None) -> None:
101104
config_options: Configuration options.
102105
"""
103106
self.config_internal = SessionConfigInternal(config_options)
107+
self._python_table_lookup = False
104108

105109
def with_create_default_catalog_and_schema(
106110
self, enabled: bool = True
@@ -270,6 +274,11 @@ def with_parquet_pruning(self, enabled: bool = True) -> SessionConfig:
270274
self.config_internal = self.config_internal.with_parquet_pruning(enabled)
271275
return self
272276

277+
def with_python_table_lookup(self, enabled: bool = True) -> SessionConfig:
278+
"""Enable implicit table lookup for Python objects when running SQL."""
279+
self._python_table_lookup = enabled
280+
return self
281+
273282
def set(self, key: str, value: str) -> SessionConfig:
274283
"""Set a configuration option.
275284
@@ -483,6 +492,8 @@ def __init__(
483492
self,
484493
config: SessionConfig | None = None,
485494
runtime: RuntimeEnvBuilder | None = None,
495+
*,
496+
auto_register_python_objects: bool | None = None,
486497
) -> None:
487498
"""Main interface for executing queries with DataFusion.
488499
@@ -493,6 +504,12 @@ def __init__(
493504
Args:
494505
config: Session configuration options.
495506
runtime: Runtime configuration options.
507+
auto_register_python_objects: Automatically register referenced
508+
Python objects (such as pandas or PyArrow data) when ``sql``
509+
queries reference them by name. When omitted, this defaults to
510+
the value configured via
511+
:py:meth:`~datafusion.SessionConfig.with_python_table_lookup`
512+
(``False`` unless explicitly enabled).
496513
497514
Example usage:
498515
@@ -504,10 +521,22 @@ def __init__(
504521
ctx = SessionContext()
505522
df = ctx.read_csv("data.csv")
506523
"""
507-
config = config.config_internal if config is not None else None
508-
runtime = runtime.config_internal if runtime is not None else None
524+
self.ctx = SessionContextInternal(
525+
config.config_internal if config is not None else None,
526+
runtime.config_internal if runtime is not None else None,
527+
)
528+
529+
# Determine the final value for python table lookup
530+
if auto_register_python_objects is not None:
531+
auto_python_table_lookup = auto_register_python_objects
532+
else:
533+
# Default to session config value or False if not configured
534+
auto_python_table_lookup = getattr(config, "_python_table_lookup", False)
509535

510-
self.ctx = SessionContextInternal(config, runtime)
536+
self._auto_python_table_lookup = bool(auto_python_table_lookup)
537+
self._python_table_bindings: dict[
538+
str, tuple[weakref.ReferenceType[Any] | None, int]
539+
] = {}
511540

512541
def __repr__(self) -> str:
513542
"""Print a string representation of the Session Context."""
@@ -534,8 +563,27 @@ def enable_url_table(self) -> SessionContext:
534563
klass = self.__class__
535564
obj = klass.__new__(klass)
536565
obj.ctx = self.ctx.enable_url_table()
566+
obj._auto_python_table_lookup = getattr(
567+
self, "_auto_python_table_lookup", False
568+
)
569+
obj._python_table_bindings = getattr(self, "_python_table_bindings", {}).copy()
537570
return obj
538571

572+
def set_python_table_lookup(self, enabled: bool = True) -> SessionContext:
573+
"""Enable or disable automatic registration of Python objects in SQL.
574+
575+
Args:
576+
enabled: When ``True``, SQL queries automatically attempt to
577+
resolve missing table names by looking up Python objects in the
578+
caller's scope. Use ``False`` to require explicit registration
579+
of any referenced tables.
580+
581+
Returns:
582+
The current :py:class:`SessionContext` instance for chaining.
583+
"""
584+
self._auto_python_table_lookup = enabled
585+
return self
586+
539587
def register_object_store(
540588
self, schema: str, store: Any, host: str | None = None
541589
) -> None:
@@ -600,9 +648,34 @@ def sql(self, query: str, options: SQLOptions | None = None) -> DataFrame:
600648
Returns:
601649
DataFrame representation of the SQL query.
602650
"""
603-
if options is None:
604-
return DataFrame(self.ctx.sql(query))
605-
return DataFrame(self.ctx.sql_with_options(query, options.options_internal))
651+
652+
def _execute_sql() -> DataFrame:
653+
if options is None:
654+
return DataFrame(self.ctx.sql(query))
655+
return DataFrame(self.ctx.sql_with_options(query, options.options_internal))
656+
657+
auto_lookup_enabled = getattr(self, "_auto_python_table_lookup", False)
658+
659+
if auto_lookup_enabled:
660+
self._refresh_python_table_bindings()
661+
662+
while True:
663+
try:
664+
return _execute_sql()
665+
except Exception as err: # noqa: PERF203
666+
if not auto_lookup_enabled:
667+
raise
668+
669+
missing_tables = self._extract_missing_table_names(err)
670+
if not missing_tables:
671+
raise
672+
673+
registered = self._register_python_tables(missing_tables)
674+
if not registered:
675+
raise
676+
677+
# Retry to allow registering additional tables referenced in the query.
678+
continue
606679

607680
def sql_with_options(self, query: str, options: SQLOptions) -> DataFrame:
608681
"""Create a :py:class:`~datafusion.dataframe.DataFrame` from SQL query text.
@@ -619,6 +692,138 @@ def sql_with_options(self, query: str, options: SQLOptions) -> DataFrame:
619692
"""
620693
return self.sql(query, options)
621694

695+
@staticmethod
696+
def _extract_missing_table_names(err: Exception) -> list[str]:
697+
def _normalize(names: list[Any]) -> list[str]:
698+
tables: list[str] = []
699+
for raw_name in names:
700+
if not raw_name:
701+
continue
702+
raw_str = str(raw_name)
703+
tables.append(raw_str.rsplit(".", 1)[-1])
704+
return tables
705+
706+
missing_tables = getattr(err, "missing_table_names", None)
707+
if missing_tables is not None:
708+
if isinstance(missing_tables, str):
709+
candidates: list[Any] = [missing_tables]
710+
else:
711+
try:
712+
candidates = list(missing_tables)
713+
except TypeError:
714+
candidates = [missing_tables]
715+
716+
return _normalize(candidates)
717+
718+
message = str(err)
719+
matches = set()
720+
for pattern in (r"table '([^']+)' not found", r"No table named '([^']+)'"):
721+
matches.update(re.findall(pattern, message))
722+
723+
return _normalize(list(matches))
724+
725+
def _register_python_tables(self, tables: list[str]) -> bool:
726+
registered_any = False
727+
for table_name in tables:
728+
if not table_name or self.table_exist(table_name):
729+
continue
730+
731+
python_obj = self._lookup_python_object(table_name)
732+
if python_obj is None:
733+
continue
734+
735+
if self._register_python_object(table_name, python_obj):
736+
registered_any = True
737+
738+
return registered_any
739+
740+
@staticmethod
741+
def _lookup_python_object(name: str) -> Any | None:
742+
frame = inspect.currentframe()
743+
try:
744+
frame = frame.f_back if frame is not None else None
745+
lower_name = name.lower()
746+
747+
def _match(mapping: dict[str, Any]) -> Any | None:
748+
value = mapping.get(name)
749+
if value is not None:
750+
return value
751+
752+
for key, candidate in mapping.items():
753+
if (
754+
isinstance(key, str)
755+
and key.lower() == lower_name
756+
and candidate is not None
757+
):
758+
return candidate
759+
760+
return None
761+
762+
while frame is not None:
763+
for scope in (frame.f_locals, frame.f_globals):
764+
match = _match(scope)
765+
if match is not None:
766+
return match
767+
frame = frame.f_back
768+
finally:
769+
del frame
770+
return None
771+
772+
def _refresh_python_table_bindings(self) -> None:
773+
bindings = getattr(self, "_python_table_bindings", {})
774+
for table_name, (obj_ref, cached_id) in list(bindings.items()):
775+
cached_obj = obj_ref() if obj_ref is not None else None
776+
current_obj = self._lookup_python_object(table_name)
777+
weakref_dead = obj_ref is not None and cached_obj is None
778+
id_mismatch = current_obj is not None and id(current_obj) != cached_id
779+
780+
if not (weakref_dead or id_mismatch):
781+
continue
782+
783+
self.deregister_table(table_name)
784+
785+
if current_obj is None:
786+
bindings.pop(table_name, None)
787+
continue
788+
789+
if self._register_python_object(table_name, current_obj):
790+
continue
791+
792+
bindings.pop(table_name, None)
793+
794+
def _register_python_object(self, name: str, obj: Any) -> bool:
795+
registered = False
796+
797+
if isinstance(obj, DataFrame):
798+
self.register_view(name, obj)
799+
registered = True
800+
elif (
801+
obj.__class__.__module__.startswith("polars.")
802+
and obj.__class__.__name__ == "DataFrame"
803+
):
804+
self.from_polars(obj, name=name)
805+
registered = True
806+
elif (
807+
obj.__class__.__module__.startswith("pandas.")
808+
and obj.__class__.__name__ == "DataFrame"
809+
):
810+
self.from_pandas(obj, name=name)
811+
registered = True
812+
elif isinstance(obj, (pa.Table, pa.RecordBatch, pa.RecordBatchReader)) or (
813+
hasattr(obj, "__arrow_c_stream__") or hasattr(obj, "__arrow_c_array__")
814+
):
815+
self.from_arrow(obj, name=name)
816+
registered = True
817+
818+
if registered:
819+
try:
820+
reference: weakref.ReferenceType[Any] | None = weakref.ref(obj)
821+
except TypeError:
822+
reference = None
823+
self._python_table_bindings[name] = (reference, id(obj))
824+
825+
return registered
826+
622827
def create_dataframe(
623828
self,
624829
partitions: list[list[pa.RecordBatch]],
@@ -756,6 +961,7 @@ def register_table(self, name: str, table: Table) -> None:
756961
def deregister_table(self, name: str) -> None:
757962
"""Remove a table from the session."""
758963
self.ctx.deregister_table(name)
964+
self._python_table_bindings.pop(name, None)
759965

760966
def catalog_names(self) -> set[str]:
761967
"""Returns the list of catalogs in this context."""

0 commit comments

Comments
 (0)