Skip to content

Commit cc4492e

Browse files
g-talbotclaude
andauthored
feat: enforce physical column ordering in Parquet files for two-GET streaming merge (#6281)
* feat: enforce physical column ordering in Parquet files Sort schema columns are written first (in their configured sort order), followed by all remaining data columns in alphabetical order. This physical layout enables a two-GET streaming merge during compaction: the footer GET provides the schema and offsets, then a single streaming GET from the start of the row group delivers sort columns first — allowing the compactor to compute the global merge order before data columns arrive. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test: verify input column order is actually scrambled The sanity check only asserted presence, not ordering. Now it verifies that host appears before service in the input (scrambled) which is the opposite of the sort-schema order (service before host). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * style: rustfmt test code Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: collapse nested if to satisfy clippy::collapsible_if Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4006b20 commit cc4492e

1 file changed

Lines changed: 173 additions & 2 deletions

File tree

  • quickwit/quickwit-parquet-engine/src/storage

quickwit/quickwit-parquet-engine/src/storage/writer.rs

Lines changed: 173 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -298,15 +298,69 @@ impl ParquetWriter {
298298
Ok(sorted_batch)
299299
}
300300

301-
/// Validate, sort, and build WriterProperties for a batch.
301+
/// Reorder columns for optimal physical layout in the Parquet file.
302+
///
303+
/// Sort schema columns are placed first (in their configured sort order),
304+
/// followed by all remaining data columns in alphabetical order. This
305+
/// layout enables a two-GET streaming merge during compaction: the first
306+
/// GET reads the footer, the second streams from the start of the row
307+
/// group — sort columns arrive first, allowing the compactor to compute
308+
/// the global merge order before data columns arrive.
309+
fn reorder_columns(&self, batch: &RecordBatch) -> RecordBatch {
310+
let schema = batch.schema();
311+
let mut ordered_indices: Vec<usize> = Vec::with_capacity(schema.fields().len());
312+
let mut used = vec![false; schema.fields().len()];
313+
314+
// Phase 1: sort schema columns in their configured order.
315+
for sf in &self.resolved_sort_fields {
316+
if let Ok(idx) = schema.index_of(sf.name.as_str())
317+
&& !used[idx]
318+
{
319+
ordered_indices.push(idx);
320+
used[idx] = true;
321+
}
322+
}
323+
324+
// Phase 2: remaining columns, alphabetically by name.
325+
let mut remaining: Vec<(usize, &str)> = schema
326+
.fields()
327+
.iter()
328+
.enumerate()
329+
.filter(|(i, _)| !used[*i])
330+
.map(|(i, f)| (i, f.name().as_str()))
331+
.collect();
332+
remaining.sort_by_key(|(_, name)| *name);
333+
for (idx, _) in remaining {
334+
ordered_indices.push(idx);
335+
}
336+
337+
// Build reordered schema and columns.
338+
let new_fields: Vec<Arc<arrow::datatypes::Field>> = ordered_indices
339+
.iter()
340+
.map(|&i| Arc::new(schema.field(i).clone()))
341+
.collect();
342+
let new_columns: Vec<Arc<dyn arrow::array::Array>> = ordered_indices
343+
.iter()
344+
.map(|&i| Arc::clone(batch.column(i)))
345+
.collect();
346+
let new_schema = Arc::new(arrow::datatypes::Schema::new_with_metadata(
347+
new_fields,
348+
schema.metadata().clone(),
349+
));
350+
351+
RecordBatch::try_new(new_schema, new_columns)
352+
.expect("reorder_columns: schema and columns must be consistent")
353+
}
354+
355+
/// Validate, sort, reorder columns, and build WriterProperties for a batch.
302356
fn prepare_write(
303357
&self,
304358
batch: &RecordBatch,
305359
split_metadata: Option<&MetricsSplitMetadata>,
306360
) -> Result<(RecordBatch, WriterProperties), ParquetWriteError> {
307361
validate_required_fields(&batch.schema())
308362
.map_err(|e| ParquetWriteError::SchemaValidation(e.to_string()))?;
309-
let sorted_batch = self.sort_batch(batch)?;
363+
let sorted_batch = self.reorder_columns(&self.sort_batch(batch)?);
310364

311365
let kv_metadata = split_metadata.map(build_compaction_key_value_metadata);
312366

@@ -907,4 +961,121 @@ mod tests {
907961
json_str
908962
);
909963
}
964+
965+
#[test]
966+
fn test_column_ordering_sort_columns_first_then_alphabetical() {
967+
// Default metrics sort fields: metric_name|service|env|datacenter|region|host|
968+
// timeseries_id|timestamp_secs
969+
let config = ParquetWriterConfig::default();
970+
let writer = ParquetWriter::new(config, &TableConfig::default());
971+
972+
// Create a batch with columns in a deliberately scrambled order.
973+
// The tag columns (service, env, region, host) plus two extra data
974+
// columns (zzz_extra, aaa_extra) that are NOT in the sort schema.
975+
let batch = create_test_batch_with_tags(
976+
3,
977+
&["host", "zzz_extra", "env", "region", "service", "aaa_extra"],
978+
);
979+
let input_schema = batch.schema();
980+
let input_names: Vec<&str> = input_schema
981+
.fields()
982+
.iter()
983+
.map(|f| f.name().as_str())
984+
.collect();
985+
// Sanity: input has tag columns in the scrambled order we specified,
986+
// not in sort-schema or alphabetical order.
987+
let host_pos = input_names.iter().position(|n| *n == "host").unwrap();
988+
let service_pos = input_names.iter().position(|n| *n == "service").unwrap();
989+
assert!(
990+
host_pos < service_pos,
991+
"input should have host before service (scrambled), got: {:?}",
992+
input_names
993+
);
994+
995+
let reordered = writer.reorder_columns(&batch);
996+
let schema = reordered.schema();
997+
let names: Vec<String> = schema.fields().iter().map(|f| f.name().clone()).collect();
998+
999+
// Sort schema columns that are present should come first, in sort order.
1000+
// From the default: metric_name, service, env, region, host, timestamp_secs
1001+
// (datacenter and timeseries_id are not in the batch).
1002+
// metric_type and value are required fields but NOT sort columns.
1003+
let expected_prefix = [
1004+
"metric_name",
1005+
"service",
1006+
"env",
1007+
"region",
1008+
"host",
1009+
"timestamp_secs",
1010+
];
1011+
let sort_prefix: Vec<&str> = names
1012+
.iter()
1013+
.map(|s| s.as_str())
1014+
.take_while(|n| expected_prefix.contains(n))
1015+
.collect();
1016+
assert_eq!(
1017+
sort_prefix, expected_prefix,
1018+
"sort schema columns should appear first in configured order, got: {:?}",
1019+
names
1020+
);
1021+
1022+
// Remaining columns should be alphabetical.
1023+
let remaining: Vec<&str> = names
1024+
.iter()
1025+
.skip(sort_prefix.len())
1026+
.map(|s| s.as_str())
1027+
.collect();
1028+
let mut sorted_remaining = remaining.clone();
1029+
sorted_remaining.sort();
1030+
assert_eq!(
1031+
remaining, sorted_remaining,
1032+
"non-sort columns should be in alphabetical order, got: {:?}",
1033+
remaining
1034+
);
1035+
1036+
// All original columns must be present (no data loss).
1037+
assert_eq!(reordered.num_columns(), batch.num_columns());
1038+
assert_eq!(reordered.num_rows(), batch.num_rows());
1039+
}
1040+
1041+
#[test]
1042+
fn test_column_ordering_preserved_in_parquet_file() {
1043+
use std::fs::File;
1044+
1045+
use parquet::file::reader::{FileReader, SerializedFileReader};
1046+
1047+
let config = ParquetWriterConfig::default();
1048+
let writer = ParquetWriter::new(config, &TableConfig::default());
1049+
1050+
let batch = create_test_batch_with_tags(3, &["host", "zzz_extra", "env", "service"]);
1051+
1052+
let temp_dir = std::env::temp_dir();
1053+
let path = temp_dir.join("test_column_ordering.parquet");
1054+
writer
1055+
.write_to_file_with_metadata(&batch, &path, None)
1056+
.unwrap();
1057+
1058+
// Read back and verify physical column order from the Parquet schema.
1059+
let file = File::open(&path).unwrap();
1060+
let reader = SerializedFileReader::new(file).unwrap();
1061+
let parquet_schema = reader.metadata().file_metadata().schema_descr();
1062+
let col_names: Vec<String> = (0..parquet_schema.num_columns())
1063+
.map(|i| parquet_schema.column(i).name().to_string())
1064+
.collect();
1065+
1066+
// Sort columns first: metric_name, service, env, host, timestamp_secs
1067+
// Then remaining alphabetically: metric_type, value, zzz_extra
1068+
assert_eq!(col_names[0], "metric_name");
1069+
assert_eq!(col_names[1], "service");
1070+
assert_eq!(col_names[2], "env");
1071+
assert_eq!(col_names[3], "host");
1072+
assert_eq!(col_names[4], "timestamp_secs");
1073+
1074+
let remaining = &col_names[5..];
1075+
let mut sorted = remaining.to_vec();
1076+
sorted.sort();
1077+
assert_eq!(remaining, &sorted, "data columns should be alphabetical");
1078+
1079+
std::fs::remove_file(&path).ok();
1080+
}
9101081
}

0 commit comments

Comments
 (0)