Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,15 @@ fn populate_partition_values<'a>(
match op {
Operator::Eq => match (left.as_ref(), right.as_ref()) {
(Expr::Column(Column { name, .. }), Expr::Literal(val, _))
| (Expr::Literal(val, _), Expr::Column(Column { name, .. })) => {
| (Expr::Literal(val, _), Expr::Column(Column { name, .. }))
if partition_values
.insert(name, PartitionValue::Single(val.to_string()))
.is_some()
{
partition_values.insert(name, PartitionValue::Multi);
}
.is_some() =>
{
partition_values.insert(name, PartitionValue::Multi);
}
(Expr::Column(Column { .. }), Expr::Literal(_, _))
| (Expr::Literal(_, _), Expr::Column(Column { .. })) => {}
_ => {}
},
Operator::And => {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub(crate) mod test_util {
// Each batch writes to their own file
let files: Vec<_> = batches
.into_iter()
.zip(tmp_files.into_iter())
.zip(tmp_files)
.map(|(batch, mut output)| {
let mut builder = parquet::file::properties::WriterProperties::builder();
if multi_page {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ fn test_update_matching_exprs() -> Result<()> {
.iter()
.map(|(expr, alias)| ProjectionExpr::new(expr.clone(), alias.clone()))
.collect();
for (expr, expected_expr) in exprs.into_iter().zip(expected_exprs.into_iter()) {
for (expr, expected_expr) in exprs.into_iter().zip(expected_exprs) {
assert!(
update_expr(&expr, &child_exprs, true)?
.unwrap()
Expand Down Expand Up @@ -366,7 +366,7 @@ fn test_update_projected_exprs() -> Result<()> {
.iter()
.map(|(expr, alias)| ProjectionExpr::new(expr.clone(), alias.clone()))
.collect();
for (expr, expected_expr) in exprs.into_iter().zip(expected_exprs.into_iter()) {
for (expr, expected_expr) in exprs.into_iter().zip(expected_exprs) {
assert!(
update_expr(&expr, &proj_exprs, false)?
.unwrap()
Expand Down Expand Up @@ -579,8 +579,7 @@ fn test_streaming_table_after_projection() -> Result<()> {
options: SortOptions::default(),
}]
.into(),
]
.into_iter(),
],
true,
None,
)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/datasource/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ impl DataSink for MemSink {
}

// write the outputs into the batches
for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
for (target, mut batches) in self.batches.iter().zip(new_batches) {
// Append all the new batches in one go to minimize locking overhead
target.write().await.append(&mut batches);
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/datasource/src/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ impl MinMaxStatistics {
/// Return a sorted list of the min statistics together with the original indices
pub fn min_values_sorted(&self) -> Vec<(usize, Row<'_>)> {
let mut sort: Vec<_> = self.min_by_sort_order.iter().enumerate().collect();
sort.sort_unstable_by(|(_, a), (_, b)| a.cmp(b));
sort.sort_unstable_by_key(|(_, row)| *row);
sort
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/datasource/src/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl ListingTableUrl {
let full_prefix = if let Some(ref p) = prefix {
let mut parts = self.prefix.parts().collect::<Vec<_>>();
parts.extend(p.parts());
Path::from_iter(parts.into_iter())
Path::from_iter(parts)
} else {
self.prefix.clone()
};
Expand Down
2 changes: 1 addition & 1 deletion datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
)
})
.collect::<Vec<_>>();
consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering
consumers.sort_by_key(|consumer| std::cmp::Reverse(consumer.1));

consumers[0..std::cmp::min(top, consumers.len())]
.iter()
Expand Down
7 changes: 3 additions & 4 deletions datafusion/expr/src/window_frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,14 @@ impl WindowFrame {
// one column. However, an ORDER BY clause may be absent or have
// more than one column when the start/end bounds are UNBOUNDED or
// CURRENT ROW.
WindowFrameUnits::Range if self.free_range() => {
WindowFrameUnits::Range if self.free_range() && order_by.is_empty() => {
// If an ORDER BY clause is absent, it is equivalent to an
// ORDER BY clause with constant value as sort key. If an
// ORDER BY clause is present but has more than one column,
// it is unchanged. Note that this follows PostgreSQL behavior.
if order_by.is_empty() {
order_by.push(lit(1u64).sort(true, false));
}
order_by.push(lit(1u64).sort(true, false));
}
WindowFrameUnits::Range if self.free_range() => {}
WindowFrameUnits::Range if order_by.len() != 1 => {
return plan_err!("RANGE requires exactly one ORDER BY column");
}
Expand Down
4 changes: 2 additions & 2 deletions datafusion/functions-aggregate-common/src/merge_arrays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ mod tests {
&mut [lhs_orderings, rhs_orderings],
&sort_options,
)?;
let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
let merged_vals = ScalarValue::iter_to_array(merged_vals)?;
let merged_ts = (0..merged_ts[0].len())
.map(|col_idx| {
ScalarValue::iter_to_array(
Expand Down Expand Up @@ -334,7 +334,7 @@ mod tests {
&mut [lhs_orderings, rhs_orderings],
&sort_options,
)?;
let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
let merged_vals = ScalarValue::iter_to_array(merged_vals)?;
let merged_ts = (0..merged_ts[0].len())
.map(|col_idx| {
ScalarValue::iter_to_array(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ where
} else {
let averages: Vec<T::Native> = sums
.into_iter()
.zip(counts.into_iter())
.zip(counts)
.map(|(sum, count)| (self.avg_fn)(sum, count))
.collect::<Result<Vec<_>>>()?;
PrimitiveArray::new(averages.into(), nulls) // no copy
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-aggregate/src/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ impl NthValueAccumulator {
let array = if column_values.is_empty() {
new_empty_array(fields[i].data_type())
} else {
ScalarValue::iter_to_array(column_values.into_iter())?
ScalarValue::iter_to_array(column_values)?
};
column_wise_ordering_values.push(array);
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions-window/src/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ mod tests {
.iter()
.map(|range| evaluator.evaluate(&values, range))
.collect::<Result<Vec<ScalarValue>>>()?;
let result = ScalarValue::iter_to_array(result.into_iter())?;
let result = ScalarValue::iter_to_array(result)?;
let result = as_int32_array(&result)?;
assert_eq!(expected, *result);
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions/src/core/named_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl ScalarUDFImpl for NamedStructFunc {

let return_fields = names
.into_iter()
.zip(types.into_iter())
.zip(types)
.map(|(name, data_type)| Ok(Field::new(name, data_type.to_owned(), true)))
.collect::<Result<Vec<Field>>>()?;

Expand Down
2 changes: 1 addition & 1 deletion datafusion/functions/src/core/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl ExprPlanner for CoreFunctionPlanner {
_schema: &DFSchema,
) -> Result<PlannerResult<RawDictionaryExpr>> {
let mut args = vec![];
for (k, v) in expr.keys.into_iter().zip(expr.values.into_iter()) {
for (k, v) in expr.keys.into_iter().zip(expr.values) {
args.push(k);
args.push(v);
}
Expand Down
14 changes: 6 additions & 8 deletions datafusion/optimizer/src/eliminate_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,14 @@ pub fn eliminate_outer(
) -> JoinType {
let mut new_join_type = join_type;
match join_type {
JoinType::Left => {
if right_non_nullable {
new_join_type = JoinType::Inner;
}
JoinType::Left if right_non_nullable => {
new_join_type = JoinType::Inner;
}
JoinType::Right => {
if left_non_nullable {
new_join_type = JoinType::Inner;
}
JoinType::Left => {}
JoinType::Right if left_non_nullable => {
new_join_type = JoinType::Inner;
}
JoinType::Right => {}
JoinType::Full => {
if left_non_nullable && right_non_nullable {
new_join_type = JoinType::Inner;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,7 @@ impl OptimizerRule for PushDownFilter {
let mut push_predicates = vec![];
for (push, expr) in predicate_push_or_keep
.into_iter()
.zip(split_conjunction_owned(filter.predicate).into_iter())
.zip(split_conjunction_owned(filter.predicate))
{
if !push {
keep_predicates.push(expr);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/utils/guarantee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl LiteralGuarantee {
builder = builder.aggregate_multi_conjunct(
col,
Guarantee::In,
literals.into_iter(),
literals,
);
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/window/standard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl WindowExpr for StandardWindowExpr {
// fast path when the result only has a single row
row_wise_results[0].to_array()?
} else {
ScalarValue::iter_to_array(row_wise_results.into_iter())?
ScalarValue::iter_to_array(row_wise_results)?
};

state.update(&out_col, partition_batch_state)?;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-optimizer/src/enforce_sorting/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ pub fn ensure_sorting(
for (idx, (required_ordering, mut child)) in plan
.required_input_ordering()
.into_iter()
.zip(requirements.children.into_iter())
.zip(requirements.children)
.enumerate()
{
let physical_ordering = child.plan.output_ordering();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,6 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
.equal_to_group_indices
.clear();

let mut group_values_len = self.group_values[0].len();
for (row, &target_hash) in batch_hashes.iter().enumerate() {
let entry = self
.map
Expand All @@ -518,7 +517,8 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
let Some((_, group_index_view)) = entry else {
// 1. Bucket not found case
// Build `new inlined group index view`
let current_group_idx = group_values_len;
let current_group_idx = self.group_values[0].len()
+ self.vectorized_operation_buffers.append_row_indices.len();
let group_index_view =
GroupIndexView::new_inlined(current_group_idx as u64);

Expand All @@ -538,7 +538,6 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
// Set group index to row in `groups`
groups[row] = current_group_idx;

group_values_len += 1;
continue;
};

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ impl PhysicalGroupBy {
fn group_fields(&self, input_schema: &Schema) -> Result<Vec<FieldRef>> {
let mut fields = Vec::with_capacity(self.num_group_exprs());
for ((expr, name), group_expr_nullable) in
self.expr.iter().zip(self.exprs_nullable().into_iter())
self.expr.iter().zip(self.exprs_nullable())
{
fields.push(
Field::new(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1451,7 +1451,7 @@ fn append_probe_indices_in_order(
for (build_index, probe_index) in build_indices
.values()
.into_iter()
.zip(probe_indices.values().into_iter())
.zip(probe_indices.values())
{
// Append values between previous and current probe index with null build index:
for value in prev_index..*probe_index {
Expand Down
7 changes: 2 additions & 5 deletions datafusion/physical-plan/src/operator_statistics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -809,11 +809,8 @@ impl StatisticsProvider for JoinStatisticsProvider {
_ => return left_rows.saturating_mul(right_rows),
}
}
if ndv_divisor > 0 {
left_rows.saturating_mul(right_rows) / ndv_divisor
} else {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The checked_div(...).unwrap_or(max_rows) form hides the "NDV unknown → cartesian" intent

left_rows.saturating_mul(right_rows)
}
let max_rows = left_rows.saturating_mul(right_rows);
max_rows.checked_div(ndv_divisor).unwrap_or(max_rows)
}

let (inner_estimate, is_exact_cartesian, join_type) = if let Some(hash_join) =
Expand Down
70 changes: 35 additions & 35 deletions datafusion/spark/src/function/string/format_string.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,15 +592,15 @@ impl TryFrom<char> for TimeFormat {
impl ConversionType {
pub fn validate(&self, arg_type: &DataType) -> Result<()> {
match self {
ConversionType::BooleanLower | ConversionType::BooleanUpper => {
if *arg_type != DataType::Boolean {
return exec_err!(
"Invalid argument type for boolean conversion: {:?}",
arg_type
);
}
ConversionType::BooleanLower | ConversionType::BooleanUpper
if *arg_type != DataType::Boolean =>
{
return exec_err!(
"Invalid argument type for boolean conversion: {:?}",
arg_type
);
}
ConversionType::CharLower | ConversionType::CharUpper => {
ConversionType::CharLower | ConversionType::CharUpper
if !matches!(
arg_type,
DataType::Int8
Expand All @@ -611,45 +611,45 @@ impl ConversionType {
| DataType::UInt32
| DataType::Int64
| DataType::UInt64
) {
return exec_err!(
"Invalid argument type for char conversion: {:?}",
arg_type
);
}
) =>
{
return exec_err!(
"Invalid argument type for char conversion: {:?}",
arg_type
);
}
ConversionType::DecInt
| ConversionType::OctInt
| ConversionType::HexIntLower
| ConversionType::HexIntUpper => {
if !arg_type.is_integer() {
return exec_err!(
"Invalid argument type for integer conversion: {:?}",
arg_type
);
}
| ConversionType::HexIntUpper
if !arg_type.is_integer() =>
{
return exec_err!(
"Invalid argument type for integer conversion: {:?}",
arg_type
);
}
ConversionType::SciFloatLower
| ConversionType::SciFloatUpper
| ConversionType::DecFloatLower
| ConversionType::CompactFloatLower
| ConversionType::CompactFloatUpper
| ConversionType::HexFloatLower
| ConversionType::HexFloatUpper => {
if !arg_type.is_numeric() {
return exec_err!(
"Invalid argument type for float conversion: {:?}",
arg_type
);
}
| ConversionType::HexFloatUpper
if !arg_type.is_numeric() =>
{
return exec_err!(
"Invalid argument type for float conversion: {:?}",
arg_type
);
}
ConversionType::TimeLower(_) | ConversionType::TimeUpper(_) => {
if !arg_type.is_temporal() {
return exec_err!(
"Invalid argument type for time conversion: {:?}",
arg_type
);
}
ConversionType::TimeLower(_) | ConversionType::TimeUpper(_)
if !arg_type.is_temporal() =>
{
return exec_err!(
"Invalid argument type for time conversion: {:?}",
arg_type
);
}
_ => {}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1554,7 +1554,7 @@ mod tests {
(Some(true), Some(true)),
(Some(true), Some(false)),
];
for (sql, (asc, nulls_first)) in sqls.iter().zip(expected.into_iter()) {
for (sql, (asc, nulls_first)) in sqls.iter().zip(expected) {
let expected = Statement::CreateExternalTable(CreateExternalTable {
name: name.clone(),
columns: vec![make_column_def("c1", DataType::Int(None))],
Expand Down
Loading
Loading