2020from __future__ import annotations
2121
2222import warnings
23- from typing import TYPE_CHECKING , Any , Protocol
23+ from typing import TYPE_CHECKING , Any , Protocol , Sequence
2424
2525import pyarrow as pa
2626
3131
3232from datafusion .catalog import Catalog , CatalogProvider , Table
3333from datafusion .dataframe import DataFrame
34- from datafusion .expr import Expr , SortExpr , sort_list_to_raw_sort_list
34+ from datafusion .expr import SortKey , sort_list_to_raw_sort_list
3535from datafusion .record_batch import RecordBatchStream
3636from datafusion .user_defined import AggregateUDF , ScalarUDF , TableFunction , WindowUDF
3737
3838from ._internal import RuntimeEnvBuilder as RuntimeEnvBuilderInternal
3939from ._internal import SessionConfig as SessionConfigInternal
4040from ._internal import SessionContext as SessionContextInternal
4141from ._internal import SQLOptions as SQLOptionsInternal
42+ from ._internal import expr as expr_internal
4243
4344if TYPE_CHECKING :
4445 import pathlib
@@ -553,7 +554,7 @@ def register_listing_table(
553554 table_partition_cols : list [tuple [str , str | pa .DataType ]] | None = None ,
554555 file_extension : str = ".parquet" ,
555556 schema : pa .Schema | None = None ,
556- file_sort_order : list [ list [ Expr | SortExpr ]] | None = None ,
557+ file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
557558 ) -> None :
558559 """Register multiple files as a single table.
559560
@@ -567,23 +568,20 @@ def register_listing_table(
567568 table_partition_cols: Partition columns.
568569 file_extension: File extension of the provided table.
569570 schema: The data source schema.
570- file_sort_order: Sort order for the file.
571+ file_sort_order: Sort order for the file. Each sort key can be
572+ specified as a column name (``str``), an expression
573+ (``Expr``), or a ``SortExpr``.
571574 """
572575 if table_partition_cols is None :
573576 table_partition_cols = []
574577 table_partition_cols = self ._convert_table_partition_cols (table_partition_cols )
575- file_sort_order_raw = (
576- [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
577- if file_sort_order is not None
578- else None
579- )
580578 self .ctx .register_listing_table (
581579 name ,
582580 str (path ),
583581 table_partition_cols ,
584582 file_extension ,
585583 schema ,
586- file_sort_order_raw ,
584+ self . _convert_file_sort_order ( file_sort_order ) ,
587585 )
588586
589587 def sql (self , query : str , options : SQLOptions | None = None ) -> DataFrame :
@@ -808,7 +806,7 @@ def register_parquet(
808806 file_extension : str = ".parquet" ,
809807 skip_metadata : bool = True ,
810808 schema : pa .Schema | None = None ,
811- file_sort_order : list [ list [ SortExpr ]] | None = None ,
809+ file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
812810 ) -> None :
813811 """Register a Parquet file as a table.
814812
@@ -827,7 +825,9 @@ def register_parquet(
827825 that may be in the file schema. This can help avoid schema
828826 conflicts due to metadata.
829827 schema: The data source schema.
830- file_sort_order: Sort order for the file.
828+ file_sort_order: Sort order for the file. Each sort key can be
829+ specified as a column name (``str``), an expression
830+ (``Expr``), or a ``SortExpr``.
831831 """
832832 if table_partition_cols is None :
833833 table_partition_cols = []
@@ -840,9 +840,7 @@ def register_parquet(
840840 file_extension ,
841841 skip_metadata ,
842842 schema ,
843- [sort_list_to_raw_sort_list (exprs ) for exprs in file_sort_order ]
844- if file_sort_order is not None
845- else None ,
843+ self ._convert_file_sort_order (file_sort_order ),
846844 )
847845
848846 def register_csv (
@@ -1099,7 +1097,7 @@ def read_parquet(
10991097 file_extension : str = ".parquet" ,
11001098 skip_metadata : bool = True ,
11011099 schema : pa .Schema | None = None ,
1102- file_sort_order : list [ list [ Expr | SortExpr ]] | None = None ,
1100+ file_sort_order : Sequence [ Sequence [ SortKey ]] | None = None ,
11031101 ) -> DataFrame :
11041102 """Read a Parquet source into a :py:class:`~datafusion.dataframe.Dataframe`.
11051103
@@ -1116,19 +1114,17 @@ def read_parquet(
11161114 schema: An optional schema representing the parquet files. If None,
11171115 the parquet reader will try to infer it based on data in the
11181116 file.
1119- file_sort_order: Sort order for the file.
1117+ file_sort_order: Sort order for the file. Each sort key can be
1118+ specified as a column name (``str``), an expression
1119+ (``Expr``), or a ``SortExpr``.
11201120
11211121 Returns:
11221122 DataFrame representation of the read Parquet files
11231123 """
11241124 if table_partition_cols is None :
11251125 table_partition_cols = []
11261126 table_partition_cols = self ._convert_table_partition_cols (table_partition_cols )
1127- file_sort_order = (
1128- [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
1129- if file_sort_order is not None
1130- else None
1131- )
1127+ file_sort_order = self ._convert_file_sort_order (file_sort_order )
11321128 return DataFrame (
11331129 self .ctx .read_parquet (
11341130 str (path ),
@@ -1179,6 +1175,24 @@ def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream:
11791175 """Execute the ``plan`` and return the results."""
11801176 return RecordBatchStream (self .ctx .execute (plan ._raw_plan , partitions ))
11811177
1178+ @staticmethod
1179+ def _convert_file_sort_order (
1180+ file_sort_order : Sequence [Sequence [SortKey ]] | None ,
1181+ ) -> list [list [expr_internal .SortExpr ]] | None :
1182+ """Convert nested ``SortKey`` sequences into raw sort expressions.
1183+
1184+ Each ``SortKey`` can be a column name string, an ``Expr``, or a
1185+ ``SortExpr`` and will be converted using
1186+ :func:`datafusion.expr.sort_list_to_raw_sort_list`.
1187+ """
1188+ # Convert each ``SortKey`` in the provided sort order to the low-level
1189+ # representation expected by the Rust bindings.
1190+ return (
1191+ [sort_list_to_raw_sort_list (f ) for f in file_sort_order ]
1192+ if file_sort_order is not None
1193+ else None
1194+ )
1195+
11821196 @staticmethod
11831197 def _convert_table_partition_cols (
11841198 table_partition_cols : list [tuple [str , str | pa .DataType ]],
0 commit comments