Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ vortex-array-macros = { version = "0.1.0", path = "./vortex-array-macros" }
tpchgen = { version = "2.0.2", git = "https://github.com/clflushopt/tpchgen-rs.git", rev = "438e9c2dbc25b2fff82c0efc08b3f13b5707874f" }
tpchgen-arrow = { version = "2.0.2", git = "https://github.com/clflushopt/tpchgen-rs.git", rev = "438e9c2dbc25b2fff82c0efc08b3f13b5707874f" }
tracing = { version = "0.1.41", default-features = false }
tracing-futures = { version = "0.2.5", features = ["futures-03"] }
tracing-perfetto = "0.1.5"
tracing-subscriber = "0.3"
url = "2.5.7"
Expand Down
1 change: 1 addition & 0 deletions vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ tabled = { workspace = true, optional = true, default-features = false, features
] }
termtree = { workspace = true }
tracing = { workspace = true }
tracing-futures = { workspace = true }
uuid = { workspace = true }
vortex-array-macros = { workspace = true }
vortex-buffer = { workspace = true, features = ["arrow"] }
Expand Down
9 changes: 9 additions & 0 deletions vortex-array/src/stream/ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
use std::future::Future;

use futures::TryStreamExt;
use tracing::Span;
use vortex_error::VortexResult;

use crate::ArrayRef;
use crate::IntoArray;
use crate::arrays::ChunkedArray;
use crate::stream::ArrayStream;
use crate::stream::SendableArrayStream;
use crate::stream::instrumented::instrument_array_stream;

pub trait ArrayStreamExt: ArrayStream {
/// Box the [`ArrayStream`] so that it can be sent between threads.
Expand Down Expand Up @@ -38,6 +40,13 @@ pub trait ArrayStreamExt: ArrayStream {
}
}
}

fn instrument(self, span: Span) -> impl ArrayStream
where
Self: Sized,
{
instrument_array_stream(self, span)
}
}

impl<S: ArrayStream> ArrayStreamExt for S {}
40 changes: 40 additions & 0 deletions vortex-array/src/stream/instrumented.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use std::{
pin::Pin,
task::{Context, Poll},
};

use crate::dtype::DType;
use crate::stream::ArrayStream;
use futures::Stream;
use pin_project_lite::pin_project;
use tracing::Span;
use tracing_futures::{Instrument, Instrumented};

pin_project! {
pub struct InstrumentedArrayStream<S> {
#[pin]
stream: Instrumented<S>,
}
}

impl<S: ArrayStream> ArrayStream for InstrumentedArrayStream<S> {
fn dtype(&self) -> &DType {
self.stream.inner().dtype()
}
}

impl<S: Stream> Stream for InstrumentedArrayStream<S> {
type Item = S::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().stream.poll_next(cx)
}
}

pub fn instrument_array_stream<S: ArrayStream>(
stream: S,
span: Span,
) -> InstrumentedArrayStream<S> {
let stream = stream.instrument(span);
InstrumentedArrayStream { stream }
}
1 change: 1 addition & 0 deletions vortex-array/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::dtype::DType;

mod adapter;
mod ext;
mod instrumented;

/// A stream of array chunks along with a DType.
///
Expand Down
Loading