2121
2222import warnings
2323from typing import TYPE_CHECKING , Any , Protocol
24- from collections .abc import Sequence
2524
2625import pyarrow as pa
2726
3231
3332from datafusion .catalog import Catalog , CatalogProvider , Table
3433from datafusion .dataframe import DataFrame
35- from datafusion .expr import SortKey , sort_list_to_raw_sort_list
34+ from datafusion .expr import Expr , SortExpr , sort_list_to_raw_sort_list
3635from datafusion .record_batch import RecordBatchStream
3736from datafusion .user_defined import AggregateUDF , ScalarUDF , TableFunction , WindowUDF
3837
3938from ._internal import RuntimeEnvBuilder as RuntimeEnvBuilderInternal
4039from ._internal import SessionConfig as SessionConfigInternal
4140from ._internal import SessionContext as SessionContextInternal
4241from ._internal import SQLOptions as SQLOptionsInternal
43- from ._internal import expr as expr_internal
4442
4543if TYPE_CHECKING :
4644 import pathlib
@@ -555,7 +553,7 @@ def register_listing_table(
555553 table_partition_cols : list [tuple [str , str | pa .DataType ]] | None = None ,
556554 file_extension : str = ".parquet" ,
557555 schema : pa .Schema | None = None ,
558- file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
556+ file_sort_order : list [ list [ Expr | SortExpr ]] | None = None ,
559557 ) -> None :
560558 """Register multiple files as a single table.
561559
@@ -569,20 +567,23 @@ def register_listing_table(
569567 table_partition_cols: Partition columns.
570568 file_extension: File extension of the provided table.
571569 schema: The data source schema.
572- file_sort_order: Sort order for the file. Each sort key can be
573- specified as a column name (``str``), an expression
574- (``Expr``), or a ``SortExpr``.
570+ file_sort_order: Sort order for the file.
575571 """
576572 if table_partition_cols is None :
577573 table_partition_cols = []
578574 table_partition_cols = self ._convert_table_partition_cols (table_partition_cols )
575+ file_sort_order_raw = (
576+ [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
577+ if file_sort_order is not None
578+ else None
579+ )
579580 self .ctx .register_listing_table (
580581 name ,
581582 str (path ),
582583 table_partition_cols ,
583584 file_extension ,
584585 schema ,
585- self . _convert_file_sort_order ( file_sort_order ) ,
586+ file_sort_order_raw ,
586587 )
587588
588589 def sql (self , query : str , options : SQLOptions | None = None ) -> DataFrame :
@@ -807,7 +808,7 @@ def register_parquet(
807808 file_extension : str = ".parquet" ,
808809 skip_metadata : bool = True ,
809810 schema : pa .Schema | None = None ,
810- file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
811+ file_sort_order : list [ list [ SortExpr ]] | None = None ,
811812 ) -> None :
812813 """Register a Parquet file as a table.
813814
@@ -826,9 +827,7 @@ def register_parquet(
826827 that may be in the file schema. This can help avoid schema
827828 conflicts due to metadata.
828829 schema: The data source schema.
829- file_sort_order: Sort order for the file. Each sort key can be
830- specified as a column name (``str``), an expression
831- (``Expr``), or a ``SortExpr``.
830+ file_sort_order: Sort order for the file.
832831 """
833832 if table_partition_cols is None :
834833 table_partition_cols = []
@@ -841,7 +840,9 @@ def register_parquet(
841840 file_extension ,
842841 skip_metadata ,
843842 schema ,
844- self ._convert_file_sort_order (file_sort_order ),
843+ [sort_list_to_raw_sort_list (exprs ) for exprs in file_sort_order ]
844+ if file_sort_order is not None
845+ else None ,
845846 )
846847
847848 def register_csv (
@@ -1098,7 +1099,7 @@ def read_parquet(
10981099 file_extension : str = ".parquet" ,
10991100 skip_metadata : bool = True ,
11001101 schema : pa .Schema | None = None ,
1101- file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
1102+ file_sort_order : list [ list [ Expr | SortExpr ]] | None = None ,
11021103 ) -> DataFrame :
11031104 """Read a Parquet source into a :py:class:`~datafusion.dataframe.Dataframe`.
11041105
@@ -1115,17 +1116,19 @@ def read_parquet(
11151116 schema: An optional schema representing the parquet files. If None,
11161117 the parquet reader will try to infer it based on data in the
11171118 file.
1118- file_sort_order: Sort order for the file. Each sort key can be
1119- specified as a column name (``str``), an expression
1120- (``Expr``), or a ``SortExpr``.
1119+ file_sort_order: Sort order for the file.
11211120
11221121 Returns:
11231122 DataFrame representation of the read Parquet files
11241123 """
11251124 if table_partition_cols is None :
11261125 table_partition_cols = []
11271126 table_partition_cols = self ._convert_table_partition_cols (table_partition_cols )
1128- file_sort_order = self ._convert_file_sort_order (file_sort_order )
1127+ file_sort_order = (
1128+ [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
1129+ if file_sort_order is not None
1130+ else None
1131+ )
11291132 return DataFrame (
11301133 self .ctx .read_parquet (
11311134 str (path ),
@@ -1176,24 +1179,6 @@ def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream:
11761179 """Execute the ``plan`` and return the results."""
11771180 return RecordBatchStream (self .ctx .execute (plan ._raw_plan , partitions ))
11781181
1179- @staticmethod
1180- def _convert_file_sort_order (
1181- file_sort_order : Sequence [Sequence [SortKey ]] | None ,
1182- ) -> list [list [expr_internal .SortExpr ]] | None :
1183- """Convert nested ``SortKey`` sequences into raw sort expressions.
1184-
1185- Each ``SortKey`` can be a column name string, an ``Expr``, or a
1186- ``SortExpr`` and will be converted using
1187- :func:`datafusion.expr.sort_list_to_raw_sort_list`.
1188- """
1189- # Convert each ``SortKey`` in the provided sort order to the low-level
1190- # representation expected by the Rust bindings.
1191- return (
1192- [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
1193- if file_sort_order is not None
1194- else None
1195- )
1196-
11971182 @staticmethod
11981183 def _convert_table_partition_cols (
11991184 table_partition_cols : list [tuple [str , str | pa .DataType ]],
0 commit comments