Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
931f2bc
Start testing what will be DataFusion 54.0.
mbutrovich Apr 8, 2026
8d922b5
fix expandexec
mbutrovich Apr 8, 2026
b75c321
Bump commits.
mbutrovich Apr 15, 2026
42813d9
Merge branch 'main' into df54
mbutrovich Apr 15, 2026
af84b73
Fix.
mbutrovich Apr 15, 2026
e19aa50
Fix metrics aggregation for native scan.
mbutrovich Apr 15, 2026
e798f9a
enable SMJ with filter by default
mbutrovich Apr 15, 2026
4236c27
add test for SPARK-43113
mbutrovich Apr 16, 2026
3bff80b
test https://github.com/apache/datafusion/pull/21660
mbutrovich Apr 16, 2026
ec18066
Fix after merging in latest datafusion main in the feature branch.
mbutrovich Apr 16, 2026
68f1a09
Fix after merging in latest datafusion main in the feature branch.
mbutrovich Apr 16, 2026
276ad63
Merge branch 'main' into df54
mbutrovich Apr 16, 2026
ab809d1
Fix after merging in latest datafusion main in the feature branch.
mbutrovich Apr 16, 2026
f716fc3
Fix after merging in latest datafusion main in the feature branch.
mbutrovich Apr 16, 2026
219dc9a
bump to latest commit on main after miri fix.
mbutrovich Apr 16, 2026
9da46f4
Merge branch 'main' into df54
mbutrovich Apr 16, 2026
f1a8041
fix after upmerge
mbutrovich Apr 16, 2026
2b904ff
bump to pick up SMJ with filter fix.
mbutrovich Apr 16, 2026
ce476f7
bump to pick up SMJ with filter fix.
mbutrovich Apr 16, 2026
8d47993
Merge branch 'main' into df54
mbutrovich Apr 16, 2026
37318f3
Bump to latest commit to pick up Miri fix.
mbutrovich Apr 16, 2026
8ed0cc0
Merge main into df54, bump to DF commit a311d14.
mbutrovich Apr 20, 2026
77ac0db
Fix clippy about deprecated APIs.
mbutrovich Apr 20, 2026
a23cd04
Merge branch 'main' into df54
mbutrovich Apr 20, 2026
96ae5e9
Merge branch 'main' into df54
mbutrovich Apr 23, 2026
72bce74
update to latest DF commit
mbutrovich Apr 23, 2026
f8cda65
chore: [datafusion-54] bump DataFusion rev to 85e75e2 (#4063)
andygrove Apr 24, 2026
9447469
mbutrovich May 15, 2026
b7737fb
chore: DF54 upgrade
comphead May 21, 2026
b32f3d0
Merge remote-tracking branch 'upstream/datafusion-54' into datafusion-54
comphead May 21, 2026
bc96836
chore: DF54 upgrade
comphead May 21, 2026
95930af
chore: datafusion 54 sync (#4391)
comphead May 21, 2026
93b1470
mbutrovich May 28, 2026
5b8dc92
Merge branch 'main' into datafusion-54
mbutrovich May 28, 2026
687c5a0
Re-run cargo update.
mbutrovich May 28, 2026
edef4d4
feat: [datafusion-54] fix `weekday` (#4495)
comphead May 28, 2026
926f0a5
Merge remote-tracking branch 'upstream/datafusion-54' into datafusion-54
comphead May 28, 2026
3c097f1
fallback on pow
comphead May 28, 2026
e2f00be
fallback on pow
comphead May 28, 2026
6cb71a7
Merge branch 'main' into datafusion-54
comphead Jun 2, 2026
a8ec495
chore: DF54 upgrade
comphead Jun 3, 2026
4b9a817
chore: DF54 upgrade
comphead Jun 3, 2026
07c040f
Merge branch 'main' into datafusion-54
mbutrovich Jun 5, 2026
7ebaa58
bump to crates
mbutrovich Jun 8, 2026
72cb1c3
Merge branch 'main' into datafusion-54
mbutrovich Jun 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
901 changes: 404 additions & 497 deletions native/Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ arrow = { version = "58.3.0", features = ["prettyprint", "ffi", "chrono-tz"] }
async-trait = { version = "0.1" }
bytes = { version = "1.11.1" }
parquet = { version = "58.3.0", default-features = false, features = ["experimental"] }
datafusion = { version = "53.1.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { version = "53.1.0" }
datafusion-physical-expr-adapter = { version = "53.1.0" }
datafusion-spark = { version = "53.1.0", features = ["core"] }
datafusion = { version = "54.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "nested_expressions", "parquet"] }
datafusion-datasource = { version = "54.0.0" }
datafusion-physical-expr-adapter = { version = "54.0.0" }
datafusion-spark = { version = "54.0.0", features = ["core"] }
datafusion-comet-spark-expr = { path = "spark-expr" }
datafusion-comet-common = { path = "common" }
datafusion-comet-jni-bridge = { path = "jni-bridge" }
Expand Down
2 changes: 1 addition & 1 deletion native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ jni = { version = "0.22.4", features = ["invocation"] }
lazy_static = "1.4"
assertables = "9"
hex = "0.4.3"
datafusion-functions-nested = { version = "53.1.0" }
datafusion-functions-nested = { version = "54.0.0" }

[features]
backtrace = ["datafusion/backtrace"]
Expand Down
7 changes: 0 additions & 7 deletions native/core/src/debug/debug_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::sync::Arc;
Expand Down Expand Up @@ -82,9 +81,6 @@ impl datafusion::physical_plan::ExecutionPlan for DebugExecutionDataStream {
fn name(&self) -> &str {
"DebugExecutionDataStream"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn properties(&self) -> &Arc<datafusion::physical_plan::PlanProperties> {
self.inner.properties()
}
Expand Down Expand Up @@ -155,9 +151,6 @@ impl Hash for DebugExecutionDataPhyExpr {
}

impl PhysicalExpr for DebugExecutionDataPhyExpr {
fn as_any(&self) -> &dyn Any {
self
}
fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
self.inner.data_type(input_schema)
}
Expand Down
4 changes: 0 additions & 4 deletions native/core/src/execution/expressions/arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,6 @@ impl Hash for CheckedBinaryExpr {
}

impl PhysicalExpr for CheckedBinaryExpr {
fn as_any(&self) -> &dyn Any {
self
}

fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.child.fmt_sql(f)
}
Expand Down
5 changes: 0 additions & 5 deletions native/core/src/execution/expressions/list_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

use std::any::Any;
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
Expand Down Expand Up @@ -68,10 +67,6 @@ impl Hash for ListPositionsExpr {
}

impl PhysicalExpr for ListPositionsExpr {
fn as_any(&self) -> &dyn Any {
self
}

fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(self, f)
}
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/expressions/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl ExpressionBuilder for RlikeBuilder {
let left = planner.create_expr(expr.left.as_ref().unwrap(), Arc::clone(&input_schema))?;
let right = planner.create_expr(expr.right.as_ref().unwrap(), input_schema)?;

match right.as_any().downcast_ref::<Literal>().unwrap().value() {
match right.downcast_ref::<Literal>().unwrap().value() {
ScalarValue::Utf8(Some(pattern)) => Ok(Arc::new(RLike::try_new(left, pattern)?)),
_ => Err(ExecutionError::GeneralError(
"RLike only supports scalar patterns".to_string(),
Expand Down
5 changes: 0 additions & 5 deletions native/core/src/execution/expressions/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use jni::{
sys::{jboolean, jbyte, jint, jlong, jshort},
};
use std::{
any::Any,
fmt::{Display, Formatter},
hash::Hash,
sync::Arc,
Expand Down Expand Up @@ -63,10 +62,6 @@ impl Display for Subquery {
}

impl PhysicalExpr for Subquery {
fn as_any(&self) -> &dyn Any {
self
}

fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Display::fmt(self, f)
}
Expand Down
17 changes: 16 additions & 1 deletion native/core/src/execution/memory_pools/fair_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use std::{
fmt::{Debug, Formatter, Result as FmtResult},
fmt::{Debug, Display, Formatter, Result as FmtResult},
sync::Arc,
};

Expand Down Expand Up @@ -83,10 +83,25 @@ impl CometFairMemoryPool {
}
}

impl Display for CometFairMemoryPool {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
let state = self.state.lock();
write!(
f,
"CometFairMemoryPool(pool_size={}, used={}, num={})",
self.pool_size, state.used, state.num
)
}
}

unsafe impl Send for CometFairMemoryPool {}
unsafe impl Sync for CometFairMemoryPool {}

impl MemoryPool for CometFairMemoryPool {
fn name(&self) -> &str {
"CometFairMemoryPool"
}

fn register(&self, _: &MemoryConsumer) {
let mut state = self.state.lock();
state.num = state
Expand Down
11 changes: 11 additions & 0 deletions native/core/src/execution/memory_pools/logging_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use datafusion::execution::memory_pool::{
MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
};
use log::{info, warn};
use std::fmt;
use std::sync::Arc;

#[derive(Debug)]
Expand All @@ -27,6 +28,12 @@ pub(crate) struct LoggingMemoryPool {
pool: Arc<dyn MemoryPool>,
}

impl fmt::Display for LoggingMemoryPool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "LoggingMemoryPool(task={})", self.task_attempt_id)
}
}

impl LoggingMemoryPool {
pub fn new(task_attempt_id: u64, pool: Arc<dyn MemoryPool>) -> Self {
Self {
Expand All @@ -37,6 +44,10 @@ impl LoggingMemoryPool {
}

impl MemoryPool for LoggingMemoryPool {
fn name(&self) -> &str {
"LoggingMemoryPool"
}

fn register(&self, consumer: &MemoryConsumer) {
info!(
"[Task {}] MemoryPool[{}].register()",
Expand Down
16 changes: 15 additions & 1 deletion native/core/src/execution/memory_pools/unified_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use std::{
fmt::{Debug, Formatter, Result as FmtResult},
fmt::{Debug, Display, Formatter, Result as FmtResult},
sync::{
atomic::{AtomicUsize, Ordering::Relaxed},
Arc,
Expand Down Expand Up @@ -90,10 +90,24 @@ impl Drop for CometUnifiedMemoryPool {
}
}

impl Display for CometUnifiedMemoryPool {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
write!(
f,
"CometUnifiedMemoryPool(used={})",
self.used.load(Relaxed)
)
}
}

unsafe impl Send for CometUnifiedMemoryPool {}
unsafe impl Sync for CometUnifiedMemoryPool {}

impl MemoryPool for CometUnifiedMemoryPool {
fn name(&self) -> &str {
"CometUnifiedMemoryPool"
}

fn grow(&self, reservation: &MemoryReservation, additional: usize) {
self.try_grow(reservation, additional).unwrap();
}
Expand Down
5 changes: 0 additions & 5 deletions native/core/src/execution/merge_as_partial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
//! outputs state) but redirects `update_batch` calls to `merge_batch`, giving merge
//! semantics with state output.

use std::any::Any;
use std::fmt::Debug;
use std::hash::{Hash, Hasher};

Expand Down Expand Up @@ -100,10 +99,6 @@ impl MergeAsPartialUDF {
}

impl AggregateUDFImpl for MergeAsPartialUDF {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
&self.name
}
Expand Down
21 changes: 13 additions & 8 deletions native/core/src/execution/metrics/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,19 @@ pub(crate) fn to_native_metric_node(
children: Vec::with_capacity(children.len()),
};

if let Some(metrics) = node_metrics {
for m in metrics.iter() {
let value = m.value();
native_metric_node
.metrics
.insert(value.name().to_string(), value.as_usize() as i64);
}
}
// Aggregate metrics by name using DataFusion's aggregate_by_name(), which
// correctly handles duplicate metric names (e.g. BaselineMetrics registered
// by both FileStream and ParquetMorselizer on the same ExecutionPlanMetricsSet).
// The additional_native_plans branch below already does this.
node_metrics
.unwrap_or_default()
.aggregate_by_name()
.iter()
.map(|m| m.value())
.map(|m| (m.name(), m.as_usize() as i64))
.for_each(|(name, value)| {
native_metric_node.metrics.insert(name.to_string(), value);
});

for child_plan in children {
let child_node = to_native_metric_node(child_plan)?;
Expand Down
5 changes: 0 additions & 5 deletions native/core/src/execution/operators/expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use datafusion::{
};
use futures::{Stream, StreamExt};
use std::{
any::Any,
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand Down Expand Up @@ -91,10 +90,6 @@ impl DisplayAs for ExpandExec {
}

impl ExecutionPlan for ExpandExec {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
Expand Down
5 changes: 0 additions & 5 deletions native/core/src/execution/operators/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

//! Native Iceberg table scan operator using iceberg-rust

use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::pin::Pin;
Expand Down Expand Up @@ -121,10 +120,6 @@ impl ExecutionPlan for IcebergScanExec {
"IcebergScanExec"
}

fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.output_schema)
}
Expand Down
5 changes: 0 additions & 5 deletions native/core/src/execution/operators/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! Parquet writer operator for writing RecordBatches to Parquet files

use std::{
any::Any,
collections::HashMap,
fmt,
fmt::{Debug, Formatter},
Expand Down Expand Up @@ -404,10 +403,6 @@ impl DisplayAs for ParquetWriterExec {

#[async_trait]
impl ExecutionPlan for ParquetWriterExec {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"ParquetWriterExec"
}
Expand Down
5 changes: 0 additions & 5 deletions native/core/src/execution/operators/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use itertools::Itertools;
use jni::objects::{Global, JObject, JValue};
use std::rc::Rc;
use std::{
any::Any,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
Expand Down Expand Up @@ -286,10 +285,6 @@ fn schema_from_data_types(data_types: &[DataType]) -> SchemaRef {
}

impl ExecutionPlan for ScanExec {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
Expand Down
5 changes: 0 additions & 5 deletions native/core/src/execution/operators/shuffle_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use datafusion::{
use futures::Stream;
use jni::objects::{Global, JByteBuffer, JObject};
use std::{
any::Any,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
Expand Down Expand Up @@ -221,10 +220,6 @@ fn schema_from_data_types(data_types: &[DataType]) -> SchemaRef {
}

impl ExecutionPlan for ShuffleScanExec {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
Expand Down
Loading
Loading