Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 110 additions & 101 deletions datafusion/core/benches/sql_planner_extended.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@ use datafusion_expr::{cast, col, lit, not, try_cast, when};
use datafusion_functions::expr_fn::{
btrim, length, regexp_like, regexp_replace, to_timestamp, upper,
};
use std::env;
use std::fmt::Write;
use std::hint::black_box;
use std::ops::Rem;
use std::sync::Arc;
use tokio::runtime::Runtime;

const FULL_PREDICATE_SWEEP: [usize; 5] = [10, 20, 30, 40, 60];
const FULL_DEPTH_SWEEP: [usize; 3] = [1, 2, 3];
const DEFAULT_SWEEP_POINTS: [(usize, usize); 3] = [(10, 1), (30, 2), (60, 3)];

// This benchmark suite is designed to test the performance of
// logical planning with a large plan containing unions, many columns
// with a variety of operations in it.
Expand Down Expand Up @@ -324,6 +329,27 @@ fn build_non_case_left_join_df_with_push_down_filter(
rt.block_on(async { ctx.sql(&query).await.unwrap() })
}

fn include_full_push_down_filter_sweep() -> bool {
env::var("DATAFUSION_PUSH_DOWN_FILTER_FULL_SWEEP")
.map(|value| value == "1" || value.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}

fn push_down_filter_sweep_points() -> Vec<(usize, usize)> {
if include_full_push_down_filter_sweep() {
FULL_DEPTH_SWEEP
.into_iter()
.flat_map(|depth| {
FULL_PREDICATE_SWEEP
.into_iter()
.map(move |predicate_count| (predicate_count, depth))
})
.collect()
} else {
DEFAULT_SWEEP_POINTS.to_vec()
}
}

fn criterion_benchmark(c: &mut Criterion) {
let baseline_ctx = SessionContext::new();
let case_heavy_ctx = SessionContext::new();
Expand All @@ -349,115 +375,98 @@ fn criterion_benchmark(c: &mut Criterion) {
})
});

let predicate_sweep = [10, 20, 30, 40, 60];
let case_depth_sweep = [1, 2, 3];

let sweep_points = push_down_filter_sweep_points();
let mut hotspot_group =
c.benchmark_group("push_down_filter_hotspot_case_heavy_left_join_ab");
for case_depth in case_depth_sweep {
for predicate_count in predicate_sweep {
let with_push_down_filter =
build_case_heavy_left_join_df_with_push_down_filter(
&rt,
predicate_count,
case_depth,
true,
);
let without_push_down_filter =
build_case_heavy_left_join_df_with_push_down_filter(
&rt,
predicate_count,
case_depth,
false,
);

let input_label =
format!("predicates={predicate_count},case_depth={case_depth}");
// A/B interpretation:
// - with_push_down_filter: default optimizer path (rule enabled)
// - without_push_down_filter: control path with the rule removed
// Compare both IDs at the same sweep point to isolate rule impact.
hotspot_group.bench_with_input(
BenchmarkId::new("with_push_down_filter", &input_label),
&with_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
hotspot_group.bench_with_input(
BenchmarkId::new("without_push_down_filter", &input_label),
&without_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
for &(predicate_count, case_depth) in &sweep_points {
let with_push_down_filter = build_case_heavy_left_join_df_with_push_down_filter(
&rt,
predicate_count,
case_depth,
true,
);
let without_push_down_filter =
build_case_heavy_left_join_df_with_push_down_filter(
&rt,
predicate_count,
case_depth,
false,
);
}

let input_label = format!("predicates={predicate_count},case_depth={case_depth}");
// A/B interpretation:
// - with_push_down_filter: default optimizer path (rule enabled)
// - without_push_down_filter: control path with the rule removed
// Compare both IDs at the same sweep point to isolate rule impact.
hotspot_group.bench_with_input(
BenchmarkId::new("with_push_down_filter", &input_label),
&with_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async { df_clone.into_optimized_plan().unwrap() }),
);
})
},
);
hotspot_group.bench_with_input(
BenchmarkId::new("without_push_down_filter", &input_label),
&without_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async { df_clone.into_optimized_plan().unwrap() }),
);
})
},
);
}
hotspot_group.finish();

let mut control_group =
c.benchmark_group("push_down_filter_control_non_case_left_join_ab");
for nesting_depth in case_depth_sweep {
for predicate_count in predicate_sweep {
let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter(
&rt,
predicate_count,
nesting_depth,
true,
);
let without_push_down_filter =
build_non_case_left_join_df_with_push_down_filter(
&rt,
predicate_count,
nesting_depth,
false,
);

let input_label =
format!("predicates={predicate_count},nesting_depth={nesting_depth}");
control_group.bench_with_input(
BenchmarkId::new("with_push_down_filter", &input_label),
&with_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
control_group.bench_with_input(
BenchmarkId::new("without_push_down_filter", &input_label),
&without_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async {
df_clone.into_optimized_plan().unwrap()
}),
);
})
},
);
}
for &(predicate_count, nesting_depth) in &sweep_points {
let with_push_down_filter = build_non_case_left_join_df_with_push_down_filter(
&rt,
predicate_count,
nesting_depth,
true,
);
let without_push_down_filter = build_non_case_left_join_df_with_push_down_filter(
&rt,
predicate_count,
nesting_depth,
false,
);

let input_label =
format!("predicates={predicate_count},nesting_depth={nesting_depth}");
control_group.bench_with_input(
BenchmarkId::new("with_push_down_filter", &input_label),
&with_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async { df_clone.into_optimized_plan().unwrap() }),
);
})
},
);
control_group.bench_with_input(
BenchmarkId::new("without_push_down_filter", &input_label),
&without_push_down_filter,
|b, df| {
b.iter(|| {
let df_clone = df.clone();
black_box(
rt.block_on(async { df_clone.into_optimized_plan().unwrap() }),
);
})
},
);
}
control_group.finish();
}
Expand Down
Loading