From f9768d5b8ae456d22e00316794166635422aae96 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 30 Apr 2026 15:37:03 -0400 Subject: [PATCH 1/2] feat: teach ArrayStream instrument(span) Signed-off-by: Daniel King --- Cargo.lock | 13 +++++++++ Cargo.toml | 1 + vortex-array/Cargo.toml | 1 + vortex-array/src/stream/ext.rs | 9 ++++++ vortex-array/src/stream/instrumented.rs | 38 +++++++++++++++++++++++++ vortex-array/src/stream/mod.rs | 1 + 6 files changed, 63 insertions(+) create mode 100644 vortex-array/src/stream/instrumented.rs diff --git a/Cargo.lock b/Cargo.lock index 727918f14e7..948a1ea617b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9639,6 +9639,18 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "futures", + "futures-task", + "pin-project", + "tracing", +] + [[package]] name = "tracing-log" version = "0.2.0" @@ -10064,6 +10076,7 @@ dependencies = [ "tabled", "termtree", "tracing", + "tracing-futures", "uuid", "vortex-array", "vortex-array-macros", diff --git a/Cargo.toml b/Cargo.toml index 92e08f1338f..08cc9911a4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -252,6 +252,7 @@ vortex-array-macros = { version = "0.1.0", path = "./vortex-array-macros" } tpchgen = { version = "2.0.2", git = "https://github.com/clflushopt/tpchgen-rs.git", rev = "438e9c2dbc25b2fff82c0efc08b3f13b5707874f" } tpchgen-arrow = { version = "2.0.2", git = "https://github.com/clflushopt/tpchgen-rs.git", rev = "438e9c2dbc25b2fff82c0efc08b3f13b5707874f" } tracing = { version = "0.1.41", default-features = false } +tracing-futures = { version = "0.2.5", features = ["futures-03"] } tracing-perfetto = "0.1.5" tracing-subscriber = "0.3" url = "2.5.7" diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index f9adbeb99db..a6831e3688e 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -66,6 +66,7 @@ tabled = { workspace = true, optional = true, default-features = false, features ] } termtree = { workspace = true } tracing = { workspace = true } +tracing-futures = { workspace = true } uuid = { workspace = true } vortex-array-macros = { workspace = true } vortex-buffer = { workspace = true, features = ["arrow"] } diff --git a/vortex-array/src/stream/ext.rs b/vortex-array/src/stream/ext.rs index e1f39f67214..9ee515d1443 100644 --- a/vortex-array/src/stream/ext.rs +++ b/vortex-array/src/stream/ext.rs @@ -4,6 +4,7 @@ use std::future::Future; use futures::TryStreamExt; +use tracing::Span; use vortex_error::VortexResult; use crate::ArrayRef; @@ -11,6 +12,7 @@ use crate::IntoArray; use crate::arrays::ChunkedArray; use crate::stream::ArrayStream; use crate::stream::SendableArrayStream; +use crate::stream::instrumented::instrument_array_stream; pub trait ArrayStreamExt: ArrayStream { /// Box the [`ArrayStream`] so that it can be sent between threads. @@ -38,6 +40,13 @@ pub trait ArrayStreamExt: ArrayStream { } } } + + fn instrument(self, span: Span) -> impl ArrayStream + where + Self: Sized, + { + instrument_array_stream(self, span) + } } impl ArrayStreamExt for S {} diff --git a/vortex-array/src/stream/instrumented.rs b/vortex-array/src/stream/instrumented.rs new file mode 100644 index 00000000000..982196abb26 --- /dev/null +++ b/vortex-array/src/stream/instrumented.rs @@ -0,0 +1,38 @@ +use crate::dtype::DType; +use crate::stream::ArrayStream; +use futures::Stream; +use pin_project_lite::pin_project; +use tracing::Span; +use tracing_futures::{Instrument, Instrumented}; + +pin_project! { + pub struct InstrumentedArrayStream { + #[pin] + stream: Instrumented, + } +} + +impl ArrayStream for InstrumentedArrayStream { + fn dtype(&self) -> &DType { + self.stream.inner().dtype() + } +} + +impl Stream for InstrumentedArrayStream { + type Item = S::Item; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.project().stream.poll_next(cx) + } +} + +pub fn instrument_array_stream( + stream: S, + span: Span, +) -> InstrumentedArrayStream { + let stream = stream.instrument(span); + InstrumentedArrayStream { stream } +} diff --git a/vortex-array/src/stream/mod.rs b/vortex-array/src/stream/mod.rs index 1d44fc83380..980ef136e4f 100644 --- a/vortex-array/src/stream/mod.rs +++ b/vortex-array/src/stream/mod.rs @@ -14,6 +14,7 @@ use crate::dtype::DType; mod adapter; mod ext; +mod instrumented; /// A stream of array chunks along with a DType. /// From bc3b8f5666594c0dec22678a00a7e8789d2e5c76 Mon Sep 17 00:00:00 2001 From: Daniel King Date: Thu, 30 Apr 2026 16:08:40 -0400 Subject: [PATCH 2/2] imports Signed-off-by: Daniel King --- vortex-array/src/stream/instrumented.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/vortex-array/src/stream/instrumented.rs b/vortex-array/src/stream/instrumented.rs index 982196abb26..0b0f0b3f58c 100644 --- a/vortex-array/src/stream/instrumented.rs +++ b/vortex-array/src/stream/instrumented.rs @@ -1,3 +1,8 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + use crate::dtype::DType; use crate::stream::ArrayStream; use futures::Stream; @@ -21,10 +26,7 @@ impl ArrayStream for InstrumentedArrayStream { impl Stream for InstrumentedArrayStream { type Item = S::Item; - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project().stream.poll_next(cx) } }