2121
2222import warnings
2323from typing import TYPE_CHECKING , Any , Protocol
24+ from collections .abc import Sequence
2425
2526import pyarrow as pa
2627
3132
3233from datafusion .catalog import Catalog , CatalogProvider , Table
3334from datafusion .dataframe import DataFrame
34- from datafusion .expr import Expr , SortExpr , sort_list_to_raw_sort_list
35+ from datafusion .expr import SortKey , sort_list_to_raw_sort_list
3536from datafusion .record_batch import RecordBatchStream
3637from datafusion .user_defined import AggregateUDF , ScalarUDF , TableFunction , WindowUDF
3738
3839from ._internal import RuntimeEnvBuilder as RuntimeEnvBuilderInternal
3940from ._internal import SessionConfig as SessionConfigInternal
4041from ._internal import SessionContext as SessionContextInternal
4142from ._internal import SQLOptions as SQLOptionsInternal
43+ from ._internal import expr as expr_internal
4244
4345if TYPE_CHECKING :
4446 import pathlib
@@ -553,7 +555,7 @@ def register_listing_table(
553555 table_partition_cols : list [tuple [str , str | pa .DataType ]] | None = None ,
554556 file_extension : str = ".parquet" ,
555557 schema : pa .Schema | None = None ,
556- file_sort_order : list [ list [ Expr | SortExpr ]] | None = None ,
558+ file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
557559 ) -> None :
558560 """Register multiple files as a single table.
559561
@@ -567,23 +569,20 @@ def register_listing_table(
567569 table_partition_cols: Partition columns.
568570 file_extension: File extension of the provided table.
569571 schema: The data source schema.
570- file_sort_order: Sort order for the file.
572+ file_sort_order: Sort order for the file. Each sort key can be
573+ specified as a column name (``str``), an expression
574+ (``Expr``), or a ``SortExpr``.
571575 """
572576 if table_partition_cols is None :
573577 table_partition_cols = []
574578 table_partition_cols = self ._convert_table_partition_cols (table_partition_cols )
575- file_sort_order_raw = (
576- [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
577- if file_sort_order is not None
578- else None
579- )
580579 self .ctx .register_listing_table (
581580 name ,
582581 str (path ),
583582 table_partition_cols ,
584583 file_extension ,
585584 schema ,
586- file_sort_order_raw ,
585+ self . _convert_file_sort_order ( file_sort_order ) ,
587586 )
588587
589588 def sql (self , query : str , options : SQLOptions | None = None ) -> DataFrame :
@@ -808,7 +807,7 @@ def register_parquet(
808807 file_extension : str = ".parquet" ,
809808 skip_metadata : bool = True ,
810809 schema : pa .Schema | None = None ,
811- file_sort_order : list [ list [ SortExpr ]] | None = None ,
810+ file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
812811 ) -> None :
813812 """Register a Parquet file as a table.
814813
@@ -827,7 +826,9 @@ def register_parquet(
827826 that may be in the file schema. This can help avoid schema
828827 conflicts due to metadata.
829828 schema: The data source schema.
830- file_sort_order: Sort order for the file.
829+ file_sort_order: Sort order for the file. Each sort key can be
830+ specified as a column name (``str``), an expression
831+ (``Expr``), or a ``SortExpr``.
831832 """
832833 if table_partition_cols is None :
833834 table_partition_cols = []
@@ -840,9 +841,7 @@ def register_parquet(
840841 file_extension ,
841842 skip_metadata ,
842843 schema ,
843- [sort_list_to_raw_sort_list (exprs ) for exprs in file_sort_order ]
844- if file_sort_order is not None
845- else None ,
844+ self ._convert_file_sort_order (file_sort_order ),
846845 )
847846
848847 def register_csv (
@@ -1099,7 +1098,7 @@ def read_parquet(
10991098 file_extension : str = ".parquet" ,
11001099 skip_metadata : bool = True ,
11011100 schema : pa .Schema | None = None ,
1102- file_sort_order : list [ list [ Expr | SortExpr ]] | None = None ,
1101+ file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
11031102 ) -> DataFrame :
11041103 """Read a Parquet source into a :py:class:`~datafusion.dataframe.Dataframe`.
11051104
@@ -1116,19 +1115,17 @@ def read_parquet(
11161115 schema: An optional schema representing the parquet files. If None,
11171116 the parquet reader will try to infer it based on data in the
11181117 file.
1119- file_sort_order: Sort order for the file.
1118+ file_sort_order: Sort order for the file. Each sort key can be
1119+ specified as a column name (``str``), an expression
1120+ (``Expr``), or a ``SortExpr``.
11201121
11211122 Returns:
11221123 DataFrame representation of the read Parquet files
11231124 """
11241125 if table_partition_cols is None :
11251126 table_partition_cols = []
11261127 table_partition_cols = self ._convert_table_partition_cols (table_partition_cols )
1127- file_sort_order = (
1128- [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
1129- if file_sort_order is not None
1130- else None
1131- )
1128+ file_sort_order = self ._convert_file_sort_order (file_sort_order )
11321129 return DataFrame (
11331130 self .ctx .read_parquet (
11341131 str (path ),
@@ -1179,6 +1176,24 @@ def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream:
11791176 """Execute the ``plan`` and return the results."""
11801177 return RecordBatchStream (self .ctx .execute (plan ._raw_plan , partitions ))
11811178
1179+ @staticmethod
1180+ def _convert_file_sort_order (
1181+ file_sort_order : Sequence [Sequence [SortKey ]] | None ,
1182+ ) -> list [list [expr_internal .SortExpr ]] | None :
1183+ """Convert nested ``SortKey`` sequences into raw sort expressions.
1184+
1185+ Each ``SortKey`` can be a column name string, an ``Expr``, or a
1186+ ``SortExpr`` and will be converted using
1187+ :func:`datafusion.expr.sort_list_to_raw_sort_list`.
1188+ """
1189+ # Convert each ``SortKey`` in the provided sort order to the low-level
1190+ # representation expected by the Rust bindings.
1191+ return (
1192+ [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
1193+ if file_sort_order is not None
1194+ else None
1195+ )
1196+
11821197 @staticmethod
11831198 def _convert_table_partition_cols (
11841199 table_partition_cols : list [tuple [str , str | pa .DataType ]],
0 commit comments