Skip to content

Commit f6a4ea4

Browse files
committed
add create_csv_read_options
1 parent 88a15ee commit f6a4ea4

File tree

1 file changed

+45
-29
lines changed

1 file changed

+45
-29
lines changed

src/context.rs

Lines changed: 45 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -775,22 +775,17 @@ impl PySessionContext {
775775
// Clone self to avoid borrowing
776776
let self_clone = self.clone();
777777

778-
// Create options with owned values inside the async block
778+
// Create a future that uses our helper function
779779
let result_future = async move {
780-
let mut options = CsvReadOptions::new()
781-
.has_header(has_header)
782-
.delimiter(delimiter_byte)
783-
.schema_infer_max_records(schema_infer_max_records)
784-
.file_extension(&file_extension_owned)
785-
.file_compression_type(
786-
parse_file_compression_type(file_compression_type.clone())
787-
.map_err(py_err_to_datafusion_err)?,
788-
);
789-
790-
// Use owned schema if provided
791-
if let Some(s) = &schema_owned {
792-
options.schema = Some(s);
793-
}
780+
let schema_ref = schema_owned.as_ref().map(|s| s.as_ref());
781+
let options = create_csv_read_options(
782+
has_header,
783+
delimiter_byte,
784+
schema_infer_max_records,
785+
&file_extension_owned,
786+
file_compression_type.clone(),
787+
schema_ref,
788+
)?;
794789

795790
self_clone
796791
.register_csv_from_multiple_paths(&name_owned, paths, options)
@@ -803,20 +798,15 @@ impl PySessionContext {
803798

804799
// Create a future that moves owned values
805800
let result_future = async move {
806-
let mut options = CsvReadOptions::new()
807-
.has_header(has_header)
808-
.delimiter(delimiter_byte)
809-
.schema_infer_max_records(schema_infer_max_records)
810-
.file_extension(&file_extension_owned)
811-
.file_compression_type(
812-
parse_file_compression_type(file_compression_type.clone())
813-
.map_err(py_err_to_datafusion_err)?,
814-
);
815-
816-
// Use owned schema if provided
817-
if let Some(s) = &schema_owned {
818-
options.schema = Some(s);
819-
}
801+
let schema_ref = schema_owned.as_ref().map(|s| s.as_ref());
802+
let options = create_csv_read_options(
803+
has_header,
804+
delimiter_byte,
805+
schema_infer_max_records,
806+
&file_extension_owned,
807+
file_compression_type.clone(),
808+
schema_ref,
809+
)?;
820810

821811
ctx.register_csv(&name_owned, &path, options).await
822812
};
@@ -1416,6 +1406,32 @@ impl PySessionContext {
14161406
}
14171407
}
14181408

1409+
/// Create CsvReadOptions with the provided parameters
1410+
fn create_csv_read_options(
1411+
has_header: bool,
1412+
delimiter_byte: u8,
1413+
schema_infer_max_records: usize,
1414+
file_extension: &str,
1415+
file_compression_type: Option<String>,
1416+
schema: Option<&Schema>,
1417+
) -> PyResult<CsvReadOptions> {
1418+
let mut options = CsvReadOptions::new()
1419+
.has_header(has_header)
1420+
.delimiter(delimiter_byte)
1421+
.schema_infer_max_records(schema_infer_max_records)
1422+
.file_extension(file_extension)
1423+
.file_compression_type(
1424+
parse_file_compression_type(file_compression_type).map_err(py_err_to_datafusion_err)?,
1425+
);
1426+
1427+
// Use schema if provided
1428+
if let Some(s) = schema {
1429+
options.schema = Some(s);
1430+
}
1431+
1432+
Ok(options)
1433+
}
1434+
14191435
pub fn convert_table_partition_cols(
14201436
table_partition_cols: Vec<(String, String)>,
14211437
) -> PyDataFusionResult<Vec<(String, DataType)>> {

0 commit comments

Comments
 (0)