Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions native/shuffle/src/bin/shuffle_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ fn print_shuffle_metrics(metrics: &MetricsSet, total_wall_time_secs: f64) {
if let Some(nanos) = get_metric("repart_time") {
println!(" repart time: {}", fmt_time(nanos));
}
if let Some(nanos) = get_metric("interleave_time") {
println!(" interleave time: {}", fmt_time(nanos));
}
if let Some(nanos) = get_metric("encode_time") {
println!(" encode time: {}", fmt_time(nanos));
}
Expand Down
4 changes: 4 additions & 0 deletions native/shuffle/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub(crate) struct ShufflePartitionerMetrics {
/// Time to perform repartitioning
pub(crate) repart_time: Time,

/// Time spent in `interleave_record_batch` gathering shuffled batches
pub(crate) interleave_time: Time,

/// Time encoding batches to IPC format
pub(crate) encode_time: Time,

Expand All @@ -51,6 +54,7 @@ impl ShufflePartitionerMetrics {
Self {
baseline: BaselineMetrics::new(metrics, partition),
repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition),
interleave_time: MetricBuilder::new(metrics).subset_time("interleave_time", partition),
encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition),
write_time: MetricBuilder::new(metrics).subset_time("write_time", partition),
input_batches: MetricBuilder::new(metrics).counter("input_batches", partition),
Expand Down
7 changes: 5 additions & 2 deletions native/shuffle/src/partitioners/multi_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,12 @@ impl MultiPartitionShuffleRepartitioner {
Ok(())
}

#[allow(clippy::too_many_arguments)]
fn shuffle_write_partition(
partition_iter: &mut PartitionedBatchIterator,
shuffle_block_writer: &mut ShuffleBlockWriter,
output_data: &mut BufWriter<File>,
interleave_time: &Time,
encode_time: &Time,
write_time: &Time,
write_buffer_size: usize,
Expand All @@ -449,7 +451,7 @@ impl MultiPartitionShuffleRepartitioner {
write_buffer_size,
batch_size,
);
for batch in partition_iter {
while let Some(batch) = partition_iter.next(interleave_time) {
let batch = batch?;
buf_batch_writer.write(&batch, encode_time, write_time)?;
}
Expand Down Expand Up @@ -573,7 +575,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
.open(data_file)
.map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?;

let mut output_data = BufWriter::new(output_data);
let mut output_data = BufWriter::with_capacity(self.write_buffer_size, output_data);

#[allow(clippy::needless_range_loop)]
for i in 0..num_output_partitions {
Expand All @@ -596,6 +598,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner {
&mut partition_iter,
&mut self.shuffle_block_writer,
&mut output_data,
&self.metrics.interleave_time,
&self.metrics.encode_time,
&self.metrics.write_time,
self.write_buffer_size,
Expand Down
16 changes: 10 additions & 6 deletions native/shuffle/src/partitioners/partitioned_batch_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use arrow::array::RecordBatch;
use arrow::compute::interleave_record_batch;
use datafusion::common::DataFusionError;
use datafusion::physical_plan::metrics::Time;

/// A helper struct to produce shuffled batches.
/// This struct takes ownership of the buffered batches and partition indices from the
Expand Down Expand Up @@ -85,19 +86,22 @@ impl<'a> PartitionedBatchIterator<'a> {
pos: 0,
}
}
}

impl Iterator for PartitionedBatchIterator<'_> {
type Item = datafusion::common::Result<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
/// Returns the next shuffled batch, recording the gather cost into `interleave_time`.
pub(crate) fn next(
&mut self,
interleave_time: &Time,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although I'm not a huge fan of passing metrics that way, such pattern is currently in multiple place, like for encode_time

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was following the code style used by the other shuffle writer metrics.

) -> Option<datafusion::common::Result<RecordBatch>> {
Comment thread
wForget marked this conversation as resolved.
if self.pos >= self.indices.len() {
return None;
}

let indices_end = std::cmp::min(self.pos + self.batch_size, self.indices.len());
let indices = &self.indices[self.pos..indices_end];
match interleave_record_batch(&self.record_batches, indices) {
let mut timer = interleave_time.timer();
let result = interleave_record_batch(&self.record_batches, indices);
timer.stop();
match result {
Ok(batch) => {
Comment thread
wForget marked this conversation as resolved.
self.pos = indices_end;
Some(Ok(batch))
Expand Down
4 changes: 2 additions & 2 deletions native/shuffle/src/writers/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl PartitionWriter {
write_buffer_size: usize,
batch_size: usize,
) -> datafusion::common::Result<usize> {
if let Some(batch) = iter.next() {
if let Some(batch) = iter.next(&metrics.interleave_time) {
self.ensure_spill_file_created(runtime)?;

let total_bytes_written = {
Expand All @@ -95,7 +95,7 @@ impl PartitionWriter {
);
let mut bytes_written =
buf_batch_writer.write(&batch?, &metrics.encode_time, &metrics.write_time)?;
for batch in iter {
while let Some(batch) = iter.next(&metrics.interleave_time) {
let batch = batch?;
bytes_written += buf_batch_writer.write(
&batch,
Expand Down
Loading