From 39e190f5dad69b89918b3be40d7641dbee20c3bd Mon Sep 17 00:00:00 2001 From: Daniel Mesejo Date: Sun, 5 Apr 2026 18:08:29 +0200 Subject: [PATCH] refactor(context): deduplicate register/read option-building logic Extract shared helpers (convert_partition_cols, convert_file_sort_order, build_parquet/json/avro_options, convert_csv_options), standardize path types to &str, and remove redundant intermediate variables. --- crates/core/src/context.rs | 281 +++++++++++++++++-------------------- 1 file changed, 131 insertions(+), 150 deletions(-) diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 53994d2f5..5bd9bcfd4 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -16,7 +16,6 @@ // under the License. use std::collections::{HashMap, HashSet}; -use std::path::PathBuf; use std::ptr::NonNull; use std::str::FromStr; use std::sync::Arc; @@ -456,19 +455,8 @@ impl PySessionContext { ) -> PyDataFusionResult<()> { let options = ListingOptions::new(Arc::new(ParquetFormat::new())) .with_file_extension(file_extension) - .with_table_partition_cols( - table_partition_cols - .into_iter() - .map(|(name, ty)| (name, ty.0)) - .collect::>(), - ) - .with_file_sort_order( - file_sort_order - .unwrap_or_default() - .into_iter() - .map(|e| e.into_iter().map(|f| f.into()).collect()) - .collect(), - ); + .with_table_partition_cols(convert_partition_cols(table_partition_cols)) + .with_file_sort_order(convert_file_sort_order(file_sort_order)); let table_path = ListingTableUrl::parse(path)?; let resolved_schema: SchemaRef = match schema { Some(s) => Arc::new(s.0), @@ -831,25 +819,15 @@ impl PySessionContext { file_sort_order: Option>>, py: Python, ) -> PyDataFusionResult<()> { - let mut options = ParquetReadOptions::default() - .table_partition_cols( - table_partition_cols - .into_iter() - .map(|(name, ty)| (name, ty.0)) - .collect::>(), - ) - .parquet_pruning(parquet_pruning) - .skip_metadata(skip_metadata); - options.file_extension = file_extension; - options.schema = schema.as_ref().map(|x| &x.0); - options.file_sort_order = file_sort_order - .unwrap_or_default() - .into_iter() - .map(|e| e.into_iter().map(|f| f.into()).collect()) - .collect(); - - let result = self.ctx.register_parquet(name, path, options); - wait_for_future(py, result)??; + let options = build_parquet_options( + table_partition_cols, + parquet_pruning, + file_extension, + skip_metadata, + &schema, + file_sort_order, + ); + wait_for_future(py, self.ctx.register_parquet(name, path, options))??; Ok(()) } @@ -863,19 +841,17 @@ impl PySessionContext { options: Option<&PyCsvReadOptions>, py: Python, ) -> PyDataFusionResult<()> { - let options = options - .map(|opts| opts.try_into()) - .transpose()? - .unwrap_or_default(); + let options = convert_csv_options(options)?; if path.is_instance_of::() { let paths = path.extract::>()?; - let result = self.register_csv_from_multiple_paths(name, paths, options); - wait_for_future(py, result)??; + wait_for_future( + py, + self.register_csv_from_multiple_paths(name, paths, options), + )??; } else { let path = path.extract::()?; - let result = self.ctx.register_csv(name, &path, options); - wait_for_future(py, result)??; + wait_for_future(py, self.ctx.register_csv(name, &path, options))??; } Ok(()) @@ -892,7 +868,7 @@ impl PySessionContext { pub fn register_json( &self, name: &str, - path: PathBuf, + path: &str, schema: Option>, schema_infer_max_records: usize, file_extension: &str, @@ -900,25 +876,14 @@ impl PySessionContext { file_compression_type: Option, py: Python, ) -> PyDataFusionResult<()> { - let path = path - .to_str() - .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; - - let mut options = JsonReadOptions::default() - .file_compression_type(parse_file_compression_type(file_compression_type)?) - .table_partition_cols( - table_partition_cols - .into_iter() - .map(|(name, ty)| (name, ty.0)) - .collect::>(), - ); - options.schema_infer_max_records = schema_infer_max_records; - options.file_extension = file_extension; - options.schema = schema.as_ref().map(|x| &x.0); - - let result = self.ctx.register_json(name, path, options); - wait_for_future(py, result)??; - + let options = build_json_options( + table_partition_cols, + file_compression_type, + schema_infer_max_records, + file_extension, + &schema, + )?; + wait_for_future(py, self.ctx.register_json(name, path, options))??; Ok(()) } @@ -931,28 +896,14 @@ impl PySessionContext { pub fn register_avro( &self, name: &str, - path: PathBuf, + path: &str, schema: Option>, file_extension: &str, table_partition_cols: Vec<(String, PyArrowType)>, py: Python, ) -> PyDataFusionResult<()> { - let path = path - .to_str() - .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; - - let mut options = AvroReadOptions::default().table_partition_cols( - table_partition_cols - .into_iter() - .map(|(name, ty)| (name, ty.0)) - .collect::>(), - ); - options.file_extension = file_extension; - options.schema = schema.as_ref().map(|x| &x.0); - - let result = self.ctx.register_avro(name, path, options); - wait_for_future(py, result)??; - + let options = build_avro_options(table_partition_cols, file_extension, &schema); + wait_for_future(py, self.ctx.register_avro(name, path, options))??; Ok(()) } @@ -1054,7 +1005,7 @@ impl PySessionContext { #[pyo3(signature = (path, schema=None, schema_infer_max_records=1000, file_extension=".json", table_partition_cols=vec![], file_compression_type=None))] pub fn read_json( &self, - path: PathBuf, + path: &str, schema: Option>, schema_infer_max_records: usize, file_extension: &str, @@ -1062,27 +1013,14 @@ impl PySessionContext { file_compression_type: Option, py: Python, ) -> PyDataFusionResult { - let path = path - .to_str() - .ok_or_else(|| PyValueError::new_err("Unable to convert path to a string"))?; - let mut options = JsonReadOptions::default() - .table_partition_cols( - table_partition_cols - .into_iter() - .map(|(name, ty)| (name, ty.0)) - .collect::>(), - ) - .file_compression_type(parse_file_compression_type(file_compression_type)?); - options.schema_infer_max_records = schema_infer_max_records; - options.file_extension = file_extension; - let df = if let Some(schema) = schema { - options.schema = Some(&schema.0); - let result = self.ctx.read_json(path, options); - wait_for_future(py, result)?? - } else { - let result = self.ctx.read_json(path, options); - wait_for_future(py, result)?? - }; + let options = build_json_options( + table_partition_cols, + file_compression_type, + schema_infer_max_records, + file_extension, + &schema, + )?; + let df = wait_for_future(py, self.ctx.read_json(path, options))??; Ok(PyDataFrame::new(df)) } @@ -1095,23 +1033,15 @@ impl PySessionContext { options: Option<&PyCsvReadOptions>, py: Python, ) -> PyDataFusionResult { - let options = options - .map(|opts| opts.try_into()) - .transpose()? - .unwrap_or_default(); + let options = convert_csv_options(options)?; - if path.is_instance_of::() { - let paths = path.extract::>()?; - let paths = paths.iter().map(|p| p as &str).collect::>(); - let result = self.ctx.read_csv(paths, options); - let df = PyDataFrame::new(wait_for_future(py, result)??); - Ok(df) + let paths: Vec = if path.is_instance_of::() { + path.extract()? } else { - let path = path.extract::()?; - let result = self.ctx.read_csv(path, options); - let df = PyDataFrame::new(wait_for_future(py, result)??); - Ok(df) - } + vec![path.extract()?] + }; + let df = wait_for_future(py, self.ctx.read_csv(paths, options))??; + Ok(PyDataFrame::new(df)) } #[allow(clippy::too_many_arguments)] @@ -1134,25 +1064,15 @@ impl PySessionContext { file_sort_order: Option>>, py: Python, ) -> PyDataFusionResult { - let mut options = ParquetReadOptions::default() - .table_partition_cols( - table_partition_cols - .into_iter() - .map(|(name, ty)| (name, ty.0)) - .collect::>(), - ) - .parquet_pruning(parquet_pruning) - .skip_metadata(skip_metadata); - options.file_extension = file_extension; - options.schema = schema.as_ref().map(|x| &x.0); - options.file_sort_order = file_sort_order - .unwrap_or_default() - .into_iter() - .map(|e| e.into_iter().map(|f| f.into()).collect()) - .collect(); - - let result = self.ctx.read_parquet(path, options); - let df = PyDataFrame::new(wait_for_future(py, result)??); + let options = build_parquet_options( + table_partition_cols, + parquet_pruning, + file_extension, + skip_metadata, + &schema, + file_sort_order, + ); + let df = PyDataFrame::new(wait_for_future(py, self.ctx.read_parquet(path, options))??); Ok(df) } @@ -1166,21 +1086,8 @@ impl PySessionContext { file_extension: &str, py: Python, ) -> PyDataFusionResult { - let mut options = AvroReadOptions::default().table_partition_cols( - table_partition_cols - .into_iter() - .map(|(name, ty)| (name, ty.0)) - .collect::>(), - ); - options.file_extension = file_extension; - let df = if let Some(schema) = schema { - options.schema = Some(&schema.0); - let read_future = self.ctx.read_avro(path, options); - wait_for_future(py, read_future)?? - } else { - let read_future = self.ctx.read_avro(path, options); - wait_for_future(py, read_future)?? - }; + let options = build_avro_options(table_partition_cols, file_extension, &schema); + let df = wait_for_future(py, self.ctx.read_avro(path, options))??; Ok(PyDataFrame::new(df)) } @@ -1280,7 +1187,7 @@ impl PySessionContext { // check if the file extension matches the expected extension for path in &table_paths { let file_path = path.as_str(); - if !file_path.ends_with(option_extension.clone().as_str()) && !path.is_collection() { + if !file_path.ends_with(option_extension.as_str()) && !path.is_collection() { return exec_err!( "File path '{file_path}' does not match the expected extension '{option_extension}'" ); @@ -1321,6 +1228,80 @@ pub fn parse_file_compression_type( }) } +fn convert_csv_options( + options: Option<&PyCsvReadOptions>, +) -> PyDataFusionResult> { + Ok(options + .map(|opts| opts.try_into()) + .transpose()? + .unwrap_or_default()) +} + +fn convert_partition_cols( + table_partition_cols: Vec<(String, PyArrowType)>, +) -> Vec<(String, DataType)> { + table_partition_cols + .into_iter() + .map(|(name, ty)| (name, ty.0)) + .collect() +} + +fn convert_file_sort_order( + file_sort_order: Option>>, +) -> Vec> { + file_sort_order + .unwrap_or_default() + .into_iter() + .map(|e| e.into_iter().map(|f| f.into()).collect()) + .collect() +} + +fn build_parquet_options<'a>( + table_partition_cols: Vec<(String, PyArrowType)>, + parquet_pruning: bool, + file_extension: &'a str, + skip_metadata: bool, + schema: &'a Option>, + file_sort_order: Option>>, +) -> ParquetReadOptions<'a> { + let mut options = ParquetReadOptions::default() + .table_partition_cols(convert_partition_cols(table_partition_cols)) + .parquet_pruning(parquet_pruning) + .skip_metadata(skip_metadata); + options.file_extension = file_extension; + options.schema = schema.as_ref().map(|x| &x.0); + options.file_sort_order = convert_file_sort_order(file_sort_order); + options +} + +fn build_json_options<'a>( + table_partition_cols: Vec<(String, PyArrowType)>, + file_compression_type: Option, + schema_infer_max_records: usize, + file_extension: &'a str, + schema: &'a Option>, +) -> Result, PyErr> { + let mut options = JsonReadOptions::default() + .table_partition_cols(convert_partition_cols(table_partition_cols)) + .file_compression_type(parse_file_compression_type(file_compression_type)?); + options.schema_infer_max_records = schema_infer_max_records; + options.file_extension = file_extension; + options.schema = schema.as_ref().map(|x| &x.0); + Ok(options) +} + +fn build_avro_options<'a>( + table_partition_cols: Vec<(String, PyArrowType)>, + file_extension: &'a str, + schema: &'a Option>, +) -> AvroReadOptions<'a> { + let mut options = AvroReadOptions::default() + .table_partition_cols(convert_partition_cols(table_partition_cols)); + options.file_extension = file_extension; + options.schema = schema.as_ref().map(|x| &x.0); + options +} + impl From for SessionContext { fn from(ctx: PySessionContext) -> SessionContext { ctx.ctx.as_ref().clone()