2020from __future__ import annotations
2121
2222import warnings
23- from typing import TYPE_CHECKING , Any , Protocol , Sequence
23+ from typing import TYPE_CHECKING , Any , Protocol
2424
2525import pyarrow as pa
2626
3131
3232from datafusion .catalog import Catalog , CatalogProvider , Table
3333from datafusion .dataframe import DataFrame
34- from datafusion .expr import SortKey , sort_list_to_raw_sort_list
34+ from datafusion .expr import Expr , SortExpr , sort_list_to_raw_sort_list
3535from datafusion .record_batch import RecordBatchStream
3636from datafusion .user_defined import AggregateUDF , ScalarUDF , TableFunction , WindowUDF
3737
3838from ._internal import RuntimeEnvBuilder as RuntimeEnvBuilderInternal
3939from ._internal import SessionConfig as SessionConfigInternal
4040from ._internal import SessionContext as SessionContextInternal
4141from ._internal import SQLOptions as SQLOptionsInternal
42- from ._internal import expr as expr_internal
4342
4443if TYPE_CHECKING :
4544 import pathlib
@@ -554,7 +553,7 @@ def register_listing_table(
554553 table_partition_cols : list [tuple [str , str | pa .DataType ]] | None = None ,
555554 file_extension : str = ".parquet" ,
556555 schema : pa .Schema | None = None ,
557- file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
556+ file_sort_order : list [ list [ Expr | SortExpr ]] | None = None ,
558557 ) -> None :
559558 """Register multiple files as a single table.
560559
@@ -568,20 +567,23 @@ def register_listing_table(
568567 table_partition_cols: Partition columns.
569568 file_extension: File extension of the provided table.
570569 schema: The data source schema.
571- file_sort_order: Sort order for the file. Each sort key can be
572- specified as a column name (``str``), an expression
573- (``Expr``), or a ``SortExpr``.
570+ file_sort_order: Sort order for the file.
574571 """
575572 if table_partition_cols is None :
576573 table_partition_cols = []
577574 table_partition_cols = self ._convert_table_partition_cols (table_partition_cols )
575+ file_sort_order_raw = (
576+ [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
577+ if file_sort_order is not None
578+ else None
579+ )
578580 self .ctx .register_listing_table (
579581 name ,
580582 str (path ),
581583 table_partition_cols ,
582584 file_extension ,
583585 schema ,
584- self . _convert_file_sort_order ( file_sort_order ) ,
586+ file_sort_order_raw ,
585587 )
586588
587589 def sql (self , query : str , options : SQLOptions | None = None ) -> DataFrame :
@@ -806,7 +808,7 @@ def register_parquet(
806808 file_extension : str = ".parquet" ,
807809 skip_metadata : bool = True ,
808810 schema : pa .Schema | None = None ,
809- file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
811+ file_sort_order : list [ list [ SortExpr ]] | None = None ,
810812 ) -> None :
811813 """Register a Parquet file as a table.
812814
@@ -825,9 +827,7 @@ def register_parquet(
825827 that may be in the file schema. This can help avoid schema
826828 conflicts due to metadata.
827829 schema: The data source schema.
828- file_sort_order: Sort order for the file. Each sort key can be
829- specified as a column name (``str``), an expression
830- (``Expr``), or a ``SortExpr``.
830+ file_sort_order: Sort order for the file.
831831 """
832832 if table_partition_cols is None :
833833 table_partition_cols = []
@@ -840,7 +840,9 @@ def register_parquet(
840840 file_extension ,
841841 skip_metadata ,
842842 schema ,
843- self ._convert_file_sort_order (file_sort_order ),
843+ [sort_list_to_raw_sort_list (exprs ) for exprs in file_sort_order ]
844+ if file_sort_order is not None
845+ else None ,
844846 )
845847
846848 def register_csv (
@@ -1097,7 +1099,7 @@ def read_parquet(
10971099 file_extension : str = ".parquet" ,
10981100 skip_metadata : bool = True ,
10991101 schema : pa .Schema | None = None ,
1100- file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
1102+ file_sort_order : list [ list [ Expr | SortExpr ]] | None = None ,
11011103 ) -> DataFrame :
11021104 """Read a Parquet source into a :py:class:`~datafusion.dataframe.Dataframe`.
11031105
@@ -1114,17 +1116,19 @@ def read_parquet(
11141116 schema: An optional schema representing the parquet files. If None,
11151117 the parquet reader will try to infer it based on data in the
11161118 file.
1117- file_sort_order: Sort order for the file. Each sort key can be
1118- specified as a column name (``str``), an expression
1119- (``Expr``), or a ``SortExpr``.
1119+ file_sort_order: Sort order for the file.
11201120
11211121 Returns:
11221122 DataFrame representation of the read Parquet files
11231123 """
11241124 if table_partition_cols is None :
11251125 table_partition_cols = []
11261126 table_partition_cols = self ._convert_table_partition_cols (table_partition_cols )
1127- file_sort_order = self ._convert_file_sort_order (file_sort_order )
1127+ file_sort_order = (
1128+ [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
1129+ if file_sort_order is not None
1130+ else None
1131+ )
11281132 return DataFrame (
11291133 self .ctx .read_parquet (
11301134 str (path ),
@@ -1175,24 +1179,6 @@ def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream:
11751179 """Execute the ``plan`` and return the results."""
11761180 return RecordBatchStream (self .ctx .execute (plan ._raw_plan , partitions ))
11771181
1178- @staticmethod
1179- def _convert_file_sort_order (
1180- file_sort_order : Sequence [Sequence [SortKey ]] | None ,
1181- ) -> list [list [expr_internal .SortExpr ]] | None :
1182- """Convert nested ``SortKey`` sequences into raw sort expressions.
1183-
1184- Each ``SortKey`` can be a column name string, an ``Expr``, or a
1185- ``SortExpr`` and will be converted using
1186- :func:`datafusion.expr.sort_list_to_raw_sort_list`.
1187- """
1188- # Convert each ``SortKey`` in the provided sort order to the low-level
1189- # representation expected by the Rust bindings.
1190- return (
1191- [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
1192- if file_sort_order is not None
1193- else None
1194- )
1195-
11961182 @staticmethod
11971183 def _convert_table_partition_cols (
11981184 table_partition_cols : list [tuple [str , str | pa .DataType ]],
0 commit comments