2020from __future__ import annotations
2121
2222import warnings
23- from typing import TYPE_CHECKING , Any , Protocol
23+ from typing import TYPE_CHECKING , Any , Protocol , Sequence
2424
2525import pyarrow as pa
2626
3131
3232from datafusion .catalog import Catalog , CatalogProvider , Table
3333from datafusion .dataframe import DataFrame
34- from datafusion .expr import Expr , SortExpr , sort_list_to_raw_sort_list
34+ from datafusion .expr import SortKey , sort_list_to_raw_sort_list
3535from datafusion .record_batch import RecordBatchStream
3636from datafusion .user_defined import AggregateUDF , ScalarUDF , TableFunction , WindowUDF
3737
@@ -553,7 +553,7 @@ def register_listing_table(
553553 table_partition_cols : list [tuple [str , str | pa .DataType ]] | None = None ,
554554 file_extension : str = ".parquet" ,
555555 schema : pa .Schema | None = None ,
556- file_sort_order : list [ list [ Expr | SortExpr ]] | None = None ,
556+ file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
557557 ) -> None :
558558 """Register multiple files as a single table.
559559
@@ -567,23 +567,20 @@ def register_listing_table(
567567 table_partition_cols: Partition columns.
568568 file_extension: File extension of the provided table.
569569 schema: The data source schema.
570- file_sort_order: Sort order for the file.
570+ file_sort_order: Sort order for the file. Each sort key can be
571+ specified as a column name (``str``), an expression
572+ (``Expr``), or a ``SortExpr``.
571573 """
572574 if table_partition_cols is None :
573575 table_partition_cols = []
574576 table_partition_cols = self ._convert_table_partition_cols (table_partition_cols )
575- file_sort_order_raw = (
576- [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
577- if file_sort_order is not None
578- else None
579- )
580577 self .ctx .register_listing_table (
581578 name ,
582579 str (path ),
583580 table_partition_cols ,
584581 file_extension ,
585582 schema ,
586- file_sort_order_raw ,
583+ self . _convert_file_sort_order ( file_sort_order ) ,
587584 )
588585
589586 def sql (self , query : str , options : SQLOptions | None = None ) -> DataFrame :
@@ -808,7 +805,7 @@ def register_parquet(
808805 file_extension : str = ".parquet" ,
809806 skip_metadata : bool = True ,
810807 schema : pa .Schema | None = None ,
811- file_sort_order : list [ list [ SortExpr ]] | None = None ,
808+ file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
812809 ) -> None :
813810 """Register a Parquet file as a table.
814811
@@ -827,7 +824,9 @@ def register_parquet(
827824 that may be in the file schema. This can help avoid schema
828825 conflicts due to metadata.
829826 schema: The data source schema.
830- file_sort_order: Sort order for the file.
827+ file_sort_order: Sort order for the file. Each sort key can be
828+ specified as a column name (``str``), an expression
829+ (``Expr``), or a ``SortExpr``.
831830 """
832831 if table_partition_cols is None :
833832 table_partition_cols = []
@@ -840,9 +839,7 @@ def register_parquet(
840839 file_extension ,
841840 skip_metadata ,
842841 schema ,
843- [sort_list_to_raw_sort_list (exprs ) for exprs in file_sort_order ]
844- if file_sort_order is not None
845- else None ,
842+ self ._convert_file_sort_order (file_sort_order ),
846843 )
847844
848845 def register_csv (
@@ -1099,7 +1096,7 @@ def read_parquet(
10991096 file_extension : str = ".parquet" ,
11001097 skip_metadata : bool = True ,
11011098 schema : pa .Schema | None = None ,
1102- file_sort_order : list [ list [ Expr | SortExpr ]] | None = None ,
1099+ file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
11031100 ) -> DataFrame :
11041101 """Read a Parquet source into a :py:class:`~datafusion.dataframe.Dataframe`.
11051102
@@ -1116,19 +1113,17 @@ def read_parquet(
11161113 schema: An optional schema representing the parquet files. If None,
11171114 the parquet reader will try to infer it based on data in the
11181115 file.
1119- file_sort_order: Sort order for the file.
1116+ file_sort_order: Sort order for the file. Each sort key can be
1117+ specified as a column name (``str``), an expression
1118+ (``Expr``), or a ``SortExpr``.
11201119
11211120 Returns:
11221121 DataFrame representation of the read Parquet files
11231122 """
11241123 if table_partition_cols is None :
11251124 table_partition_cols = []
11261125 table_partition_cols = self ._convert_table_partition_cols (table_partition_cols )
1127- file_sort_order = (
1128- [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
1129- if file_sort_order is not None
1130- else None
1131- )
1126+ file_sort_order = self ._convert_file_sort_order (file_sort_order )
11321127 return DataFrame (
11331128 self .ctx .read_parquet (
11341129 str (path ),
@@ -1179,6 +1174,24 @@ def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream:
11791174 """Execute the ``plan`` and return the results."""
11801175 return RecordBatchStream (self .ctx .execute (plan ._raw_plan , partitions ))
11811176
1177+ @staticmethod
1178+ def _convert_file_sort_order (
1179+ file_sort_order : Sequence [Sequence [SortKey ]] | None ,
1180+ ) -> list [list [Any ]] | None :
1181+ """Convert nested ``SortKey`` sequences into raw sort representations.
1182+
1183+ Each ``SortKey`` can be a column name string, an ``Expr``, or a
1184+ ``SortExpr`` and will be converted using
1185+ :func:`datafusion.expr.sort_list_to_raw_sort_list`.
1186+ """
1187+ # Convert each ``SortKey`` in the provided sort order to the low-level
1188+ # representation expected by the Rust bindings.
1189+ return (
1190+ [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
1191+ if file_sort_order is not None
1192+ else None
1193+ )
1194+
11821195 @staticmethod
11831196 def _convert_table_partition_cols (
11841197 table_partition_cols : list [tuple [str , str | pa .DataType ]],
0 commit comments