diff --git a/.github/actions/spelling/expect.txt b/.github/actions/spelling/expect.txt index 1d68a6978570b..04296aa8cb035 100644 --- a/.github/actions/spelling/expect.txt +++ b/.github/actions/spelling/expect.txt @@ -216,6 +216,7 @@ fns foobarfoobarfoo footgunning Forcepoint +fpp freelist fuzzcheck GC'ing @@ -380,6 +381,7 @@ myvalue Namazu nats ndjson +ndv nearline nextest ngx diff --git a/Cargo.lock b/Cargo.lock index e2f86f33a558e..5ba1e0bf0fcfd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2609,6 +2609,7 @@ dependencies = [ "memchr", "opentelemetry-proto", "ordered-float 4.6.0", + "parquet", "prost 0.12.6", "prost-reflect", "rand 0.9.2", @@ -5783,6 +5784,12 @@ dependencies = [ "web-sys", ] +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "inventory" version = "0.3.21" @@ -7920,6 +7927,38 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "parquet" +version = "56.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0dbd48ad52d7dccf8ea1b90a3ddbfaea4f69878dd7683e51c507d4bc52b5b27" +dependencies = [ + "ahash 0.8.11", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", + "base64 0.22.1", + "brotli", + "bytes 1.10.1", + "chrono", + "flate2", + "half", + "hashbrown 0.16.0", + "lz4_flex", + "num", + "num-bigint", + "paste", + "seq-macro", + "snap", + "thrift", + "twox-hash", + "zstd 0.13.2", +] + [[package]] name = "parse-size" version = "1.1.0" @@ -10110,6 +10149,12 @@ dependencies = [ "serde", ] +[[package]] +name = "seq-macro" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc" + [[package]] name = "serde" version = "1.0.228" @@ -11318,6 +11363,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "ordered-float 2.10.1", +] + [[package]] name = "tikv-jemalloc-sys" version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" diff --git a/Cargo.toml b/Cargo.toml index 102ae70602f00..4971202c357da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -589,6 +589,7 @@ enrichment-tables-memory = ["dep:evmap", "dep:evmap-derive", "dep:thread_local"] # Codecs codecs-arrow = ["dep:arrow", "dep:arrow-schema", "vector-lib/arrow"] +codecs-parquet = ["vector-lib/parquet"] codecs-opentelemetry = ["vector-lib/opentelemetry"] codecs-syslog = ["vector-lib/syslog"] @@ -849,7 +850,7 @@ sinks-aws_cloudwatch_logs = ["aws-core", "dep:aws-sdk-cloudwatchlogs", "dep:aws- sinks-aws_cloudwatch_metrics = ["aws-core", "dep:aws-sdk-cloudwatch"] sinks-aws_kinesis_firehose = ["aws-core", "dep:aws-sdk-firehose"] sinks-aws_kinesis_streams = ["aws-core", "dep:aws-sdk-kinesis"] -sinks-aws_s3 = ["dep:base64", "dep:md-5", "aws-core", "dep:aws-sdk-s3"] +sinks-aws_s3 = ["dep:base64", "dep:md-5", "aws-core", "dep:aws-sdk-s3", "codecs-parquet"] sinks-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"] sinks-aws_sns = ["aws-core", "dep:aws-sdk-sns"] sinks-axiom = ["sinks-http"] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index b507953d3272b..03f42b3b93069 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -396,6 +396,7 @@ inotify-sys,https://github.com/hannobraun/inotify-sys,ISC,Hanno Braun , Joshka" instant,https://github.com/sebcrozet/instant,BSD-3-Clause,sebcrozet +integer-encoding,https://github.com/dermesser/integer-encoding-rs,MIT,Lewin Bormann inventory,https://github.com/dtolnay/inventory,MIT OR Apache-2.0,David Tolnay io-lifetimes,https://github.com/sunfishcode/io-lifetimes,Apache-2.0 WITH LLVM-exception OR Apache-2.0 OR MIT,Dan Gohman io-uring,https://github.com/tokio-rs/io-uring,MIT OR Apache-2.0,quininer @@ -550,6 +551,7 @@ parking_lot,https://github.com/Amanieu/parking_lot,Apache-2.0 OR MIT,Amanieu d'A parking_lot,https://github.com/Amanieu/parking_lot,MIT OR Apache-2.0,Amanieu d'Antras parking_lot_core,https://github.com/Amanieu/parking_lot,Apache-2.0 OR MIT,Amanieu d'Antras parking_lot_core,https://github.com/Amanieu/parking_lot,MIT OR Apache-2.0,Amanieu d'Antras +parquet,https://github.com/apache/arrow-rs,Apache-2.0,Apache Arrow parse-size,https://github.com/kennytm/parse-size,MIT,kennytm passt,https://github.com/kevingimbel/passt,MIT OR Apache-2.0,Kevin Gimbel paste,https://github.com/dtolnay/paste,MIT OR Apache-2.0,David Tolnay @@ -688,6 +690,7 @@ secrecy,https://github.com/iqlusioninc/crates/tree/main/secrecy,Apache-2.0 OR MI security-framework,https://github.com/kornelski/rust-security-framework,MIT OR Apache-2.0,"Steven Fackler , Kornel " security-framework-sys,https://github.com/kornelski/rust-security-framework,MIT OR Apache-2.0,"Steven Fackler , Kornel " semver,https://github.com/dtolnay/semver,MIT OR Apache-2.0,David Tolnay +seq-macro,https://github.com/dtolnay/seq-macro,MIT OR Apache-2.0,David Tolnay serde,https://github.com/serde-rs/serde,MIT OR Apache-2.0,"Erick Tryzelaar , David Tolnay " serde-aux,https://github.com/iddm/serde-aux,MIT,Victor Polevoy serde-toml-merge,https://github.com/jdrouet/serde-toml-merge,MIT,Jeremie Drouet @@ -770,6 +773,7 @@ terminal_size,https://github.com/eminence/terminal-size,MIT OR Apache-2.0,Andrew thiserror,https://github.com/dtolnay/thiserror,MIT OR Apache-2.0,David Tolnay thiserror-impl,https://github.com/dtolnay/thiserror,MIT OR Apache-2.0,David Tolnay thread_local,https://github.com/Amanieu/thread_local-rs,MIT OR Apache-2.0,Amanieu d'Antras +thrift,https://github.com/apache/thrift/tree/master/lib/rs,Apache-2.0,Apache Thrift Developers tikv-jemalloc-sys,https://github.com/tikv/jemallocator,MIT OR Apache-2.0,"Alex Crichton , Gonzalo Brito Gadeschi , The TiKV Project Developers" tikv-jemallocator,https://github.com/tikv/jemallocator,MIT OR Apache-2.0,"Alex Crichton , Gonzalo Brito Gadeschi , Simon Sapin , Steven Fackler , The TiKV Project Developers" time,https://github.com/time-rs/time,MIT OR Apache-2.0,"Jacob Pratt , Time contributors" diff --git a/changelog.d/parquet_encoder_aws_s3.feature.md b/changelog.d/parquet_encoder_aws_s3.feature.md new file mode 100644 index 0000000000000..e57c8d04f44da --- /dev/null +++ b/changelog.d/parquet_encoder_aws_s3.feature.md @@ -0,0 +1,20 @@ +The `aws_s3` sink now supports [Apache Parquet](https://parquet.apache.org/) encoding, enabling +Vector to write columnar Parquet files optimized for analytics workloads. + +Parquet is a columnar storage format that provides efficient compression and encoding schemes, +making it ideal for long-term storage and query performance with tools like AWS Athena, Apache Spark, +and Presto. Users can now configure Parquet encoding with custom schemas defined directly in YAML +as a simple map of field names to data types. + +Features include: + +- Support for all common data types: strings (utf8), integers (int8-int64), unsigned integers, + floats (float32, float64), timestamps (second/millisecond/microsecond/nanosecond precision), + booleans, binary data, and decimals +- Configurable compression algorithms: snappy (default), gzip, zstd, lz4, brotli, or uncompressed + + +Each batch of events becomes one Parquet file in S3, with batch size controlled by the standard +`batch.max_events`, `batch.max_bytes`, and `batch.timeout_secs` settings. + +authors: rorylshanks diff --git a/lib/codecs/Cargo.toml b/lib/codecs/Cargo.toml index 01d9836f666b3..bf8b930001f68 100644 --- a/lib/codecs/Cargo.toml +++ b/lib/codecs/Cargo.toml @@ -15,6 +15,7 @@ path = "tests/bin/generate-avro-fixtures.rs" [dependencies] apache-avro = { version = "0.20.0", default-features = false } arrow = { version = "56.2.0", default-features = false, features = ["ipc"] } +parquet = { version = "56.2.0", default-features = false, features = ["arrow", "snap", "zstd", "lz4", "brotli", "flate2", "flate2-rust_backened"], optional = true } async-trait.workspace = true bytes.workspace = true chrono.workspace = true @@ -64,5 +65,6 @@ vrl.workspace = true [features] arrow = [] +parquet = ["dep:parquet", "arrow"] opentelemetry = ["dep:opentelemetry-proto"] syslog = ["dep:syslog_loose", "dep:strum", "dep:derive_more", "dep:serde-aux", "dep:toml"] diff --git a/lib/codecs/src/encoding/format/arrow.rs b/lib/codecs/src/encoding/format/arrow.rs index 3c2d3863f1fb2..fa2b89e24396c 100644 --- a/lib/codecs/src/encoding/format/arrow.rs +++ b/lib/codecs/src/encoding/format/arrow.rs @@ -235,7 +235,7 @@ pub fn encode_events_to_arrow_ipc_stream( } /// Recursively makes a Field and all its nested fields nullable -fn make_field_nullable(field: &arrow::datatypes::Field) -> arrow::datatypes::Field { +pub fn make_field_nullable(field: &arrow::datatypes::Field) -> arrow::datatypes::Field { let new_data_type = match field.data_type() { DataType::List(inner_field) => DataType::List(Arc::new(make_field_nullable(inner_field))), DataType::Struct(fields) => { @@ -254,7 +254,7 @@ fn make_field_nullable(field: &arrow::datatypes::Field) -> arrow::datatypes::Fie } /// Builds an Arrow RecordBatch from events -fn build_record_batch( +pub fn build_record_batch( schema: Arc, events: &[Event], ) -> Result { diff --git a/lib/codecs/src/encoding/format/mod.rs b/lib/codecs/src/encoding/format/mod.rs index 5026dda422d9e..116dd3d066ce5 100644 --- a/lib/codecs/src/encoding/format/mod.rs +++ b/lib/codecs/src/encoding/format/mod.rs @@ -16,8 +16,12 @@ mod native; mod native_json; #[cfg(feature = "opentelemetry")] mod otlp; +#[cfg(feature = "parquet")] +mod parquet; mod protobuf; mod raw_message; +#[cfg(any(feature = "arrow", feature = "parquet"))] +mod schema_definition; #[cfg(feature = "syslog")] mod syslog; mod text; @@ -38,8 +42,14 @@ pub use native::{NativeSerializer, NativeSerializerConfig}; pub use native_json::{NativeJsonSerializer, NativeJsonSerializerConfig}; #[cfg(feature = "opentelemetry")] pub use otlp::{OtlpSerializer, OtlpSerializerConfig}; +#[cfg(feature = "parquet")] +pub use parquet::{ + ParquetCompression, ParquetEncodingError, ParquetSerializer, ParquetSerializerConfig, +}; pub use protobuf::{ProtobufSerializer, ProtobufSerializerConfig, ProtobufSerializerOptions}; pub use raw_message::{RawMessageSerializer, RawMessageSerializerConfig}; +#[cfg(any(feature = "arrow", feature = "parquet"))] +pub use schema_definition::{SchemaDefinition, SchemaDefinitionError}; #[cfg(feature = "syslog")] pub use syslog::{SyslogSerializer, SyslogSerializerConfig}; pub use text::{TextSerializer, TextSerializerConfig}; diff --git a/lib/codecs/src/encoding/format/parquet.rs b/lib/codecs/src/encoding/format/parquet.rs new file mode 100644 index 0000000000000..b7688622a7275 --- /dev/null +++ b/lib/codecs/src/encoding/format/parquet.rs @@ -0,0 +1,1208 @@ +//! Apache Parquet format codec for batched event encoding +//! +//! Provides Apache Parquet columnar file format encoding with static schema support. +//! This encoder writes complete Parquet files with proper metadata and footers, +//! suitable for long-term storage and analytics workloads. + +use arrow::datatypes::Schema; +use bytes::{BufMut, Bytes, BytesMut}; +use parquet::{ + arrow::ArrowWriter, + basic::{BrotliLevel, Compression, GzipLevel, ZstdLevel}, + file::properties::{WriterProperties, WriterVersion}, + schema::types::ColumnPath, +}; +use snafu::Snafu; +use std::sync::Arc; +use vector_config::configurable_component; + +use vector_core::event::Event; + +// Reuse the Arrow encoder's record batch building logic +use super::arrow::{ArrowEncodingError, build_record_batch}; +use super::schema_definition::SchemaDefinition; + +/// Compression algorithm for Parquet files +#[configurable_component] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum ParquetCompression { + /// No compression + Uncompressed, + /// Snappy compression (fast, moderate compression ratio) + #[default] + Snappy, + /// GZIP compression (slower, better compression ratio) + Gzip, + /// Brotli compression + Brotli, + /// LZ4 compression (very fast, moderate compression) + Lz4, + /// ZSTD compression (good balance of speed and compression) + Zstd, +} + +impl ParquetCompression { + /// Convert to parquet Compression with optional level override + fn to_compression(self, level: Option) -> Result { + match (self, level) { + (ParquetCompression::Uncompressed, _) => Ok(Compression::UNCOMPRESSED), + (ParquetCompression::Snappy, _) => Ok(Compression::SNAPPY), + (ParquetCompression::Lz4, _) => Ok(Compression::LZ4), + (ParquetCompression::Gzip, Some(lvl)) => GzipLevel::try_new(lvl as u32) + .map(Compression::GZIP) + .map_err(|e| format!("Invalid GZIP compression level: {}", e)), + (ParquetCompression::Gzip, None) => Ok(Compression::GZIP(Default::default())), + (ParquetCompression::Brotli, Some(lvl)) => BrotliLevel::try_new(lvl as u32) + .map(Compression::BROTLI) + .map_err(|e| format!("Invalid Brotli compression level: {}", e)), + (ParquetCompression::Brotli, None) => Ok(Compression::BROTLI(Default::default())), + (ParquetCompression::Zstd, Some(lvl)) => ZstdLevel::try_new(lvl) + .map(Compression::ZSTD) + .map_err(|e| format!("Invalid ZSTD compression level: {}", e)), + (ParquetCompression::Zstd, None) => Ok(Compression::ZSTD(ZstdLevel::default())), + } + } +} + +impl From for Compression { + fn from(compression: ParquetCompression) -> Self { + compression + .to_compression(None) + .expect("Default compression should always be valid") + } +} + +/// Parquet writer version +#[configurable_component] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum ParquetWriterVersion { + /// Parquet format version 1.0 (maximum compatibility) + V1, + /// Parquet format version 2.0 (modern format with better encoding) + #[default] + V2, +} + +impl From for WriterVersion { + fn from(version: ParquetWriterVersion) -> Self { + match version { + ParquetWriterVersion::V1 => WriterVersion::PARQUET_1_0, + ParquetWriterVersion::V2 => WriterVersion::PARQUET_2_0, + } + } +} + +/// Configuration for Parquet serialization +#[configurable_component] +#[derive(Clone, Default)] +pub struct ParquetSerializerConfig { + /// The Arrow schema definition to use for encoding + /// + /// This schema defines the structure and types of the Parquet file columns. + /// Specified as a map of field names to data types. + /// + /// Mutually exclusive with `infer_schema`. Must specify either `schema` or `infer_schema: true`. + /// + /// Supported types: utf8, int8, int16, int32, int64, uint8, uint16, uint32, uint64, + /// float32, float64, boolean, binary, timestamp_second, timestamp_millisecond, + /// timestamp_microsecond, timestamp_nanosecond, date32, date64, and more. + #[serde(default)] + #[configurable(metadata(docs::examples = "schema_example()"))] + pub schema: Option, + + /// Automatically infer schema from event data + /// + /// When enabled, the schema is inferred from each batch of events independently. + /// The schema is determined by examining the types of values in the events. + /// + /// **Type mapping:** + /// - String values → `utf8` + /// - Integer values → `int64` + /// - Float values → `float64` + /// - Boolean values → `boolean` + /// - Timestamp values → `timestamp_microsecond` + /// - Arrays/Objects → `utf8` (serialized as JSON) + /// + /// **Type conflicts:** If a field has different types across events in the same batch, + /// it will be encoded as `utf8` (string) and all values will be converted to strings. + /// + /// **Important:** Schema consistency across batches is the operator's responsibility. + /// Use VRL transforms to ensure consistent types if needed. Each batch may produce + /// a different schema if event structure varies. + /// + /// **Bloom filters:** Not supported with inferred schemas. Use explicit schema for Bloom filters. + /// + /// Mutually exclusive with `schema`. Must specify either `schema` or `infer_schema: true`. + #[serde(default)] + #[configurable(metadata(docs::examples = true))] + pub infer_schema: bool, + + /// Column names to exclude from Parquet encoding + /// + /// These columns will be completely excluded from the Parquet file. + /// Useful for filtering out metadata, internal fields, or temporary data. + /// + /// Only applies when `infer_schema` is enabled. Ignored when using explicit schema. + #[serde(default)] + #[configurable(metadata( + docs::examples = "vec![\"_metadata\".to_string(), \"internal_id\".to_string()]" + ))] + pub exclude_columns: Option>, + + /// Maximum number of columns to encode + /// + /// Limits the number of columns in the Parquet file. Additional columns beyond + /// this limit will be silently dropped. Columns are selected in the order they + /// appear in the first event. + /// + /// Only applies when `infer_schema` is enabled. Ignored when using explicit schema. + #[serde(default = "default_max_columns")] + #[configurable(metadata(docs::examples = 500))] + #[configurable(metadata(docs::examples = 1000))] + pub max_columns: usize, + + /// Compression algorithm to use for Parquet columns + /// + /// Compression is applied to all columns in the Parquet file. + /// Snappy provides a good balance of speed and compression ratio. + #[serde(default)] + #[configurable(metadata(docs::examples = "snappy"))] + #[configurable(metadata(docs::examples = "gzip"))] + #[configurable(metadata(docs::examples = "zstd"))] + pub compression: ParquetCompression, + + /// Compression level for algorithms that support it. + /// + /// Only applies to ZSTD, GZIP, and Brotli compression. Ignored for other algorithms. + /// + /// **ZSTD levels** (1-22): + /// - 1-3: Fastest, moderate compression (level 3 is default) + /// - 4-9: Good balance of speed and compression + /// - 10-15: Better compression, slower encoding + /// - 16-22: Maximum compression, slowest (good for cold storage) + /// + /// **GZIP levels** (1-9): + /// - 1-3: Faster, less compression + /// - 6: Default balance (recommended) + /// - 9: Maximum compression, slowest + /// + /// **Brotli levels** (0-11): + /// - 0-4: Faster encoding + /// - 1: Default (recommended) + /// - 5-11: Better compression, slower + /// + /// Higher levels typically produce 20-50% smaller files but take 2-5x longer to encode. + /// Recommended: Use level 3-6 for hot data, 10-15 for cold storage. + #[serde(default)] + #[configurable(metadata(docs::examples = 3))] + #[configurable(metadata(docs::examples = 6))] + #[configurable(metadata(docs::examples = 10))] + pub compression_level: Option, + + /// Parquet format writer version. + /// + /// Controls which Parquet format version to write: + /// - **v1** (PARQUET_1_0): Original format, maximum compatibility (default) + /// - **v2** (PARQUET_2_0): Modern format with improved encoding and statistics + /// + /// Version 2 benefits: + /// - More efficient encoding for certain data types (10-20% smaller files) + /// - Better statistics for query optimization + /// - Improved data page format + /// - Required for some advanced features + /// + /// Use v1 for maximum compatibility with older readers (pre-2018 tools). + /// Use v2 for better performance with modern query engines (Athena, Spark, Presto). + #[serde(default)] + #[configurable(metadata(docs::examples = "v1"))] + #[configurable(metadata(docs::examples = "v2"))] + pub writer_version: ParquetWriterVersion, + + /// Number of rows per row group + /// + /// Row groups are Parquet's unit of parallelization. Larger row groups + /// can improve compression but increase memory usage during encoding. + /// + /// Since each batch becomes a separate Parquet file, this value + /// should be <= the batch max_events setting. Row groups cannot span multiple files. + /// If not specified, defaults to the batch size. + #[serde(default)] + #[configurable(metadata(docs::examples = 100000))] + #[configurable(metadata(docs::examples = 1000000))] + pub row_group_size: Option, + + /// Allow null values for non-nullable fields in the schema. + /// + /// When enabled, missing or incompatible values will be encoded as null even for fields + /// marked as non-nullable in the Arrow schema. This is useful when working with downstream + /// systems that can handle null values through defaults, computed columns, or other mechanisms. + /// + /// When disabled (default), missing values for non-nullable fields will cause encoding errors, + /// ensuring all required data is present before writing to Parquet. + #[serde(default)] + #[configurable(metadata(docs::examples = true))] + pub allow_nullable_fields: bool, + + /// Sorting order for rows within row groups. + /// + /// Pre-sorting rows by specified columns before writing can significantly improve both + /// compression ratios and query performance. This is especially valuable for time-series + /// data and event logs. + /// + /// **Benefits:** + /// - **Better compression** (20-40% smaller files): Similar values are grouped together + /// - **Faster queries**: More effective min/max statistics enable better row group skipping + /// - **Improved caching**: Query engines can more efficiently cache sorted data + /// + /// **Common patterns:** + /// - Time-series: Sort by timestamp descending (most recent first) + /// - Multi-tenant: Sort by tenant_id, then timestamp + /// - User analytics: Sort by user_id, then event_time + /// + /// **Trade-offs:** + /// - Adds sorting overhead during encoding (typically 10-30% slower writes) + /// - Requires buffering entire batch in memory for sorting + /// - Most beneficial when queries frequently filter on sorted columns + /// + /// **Example:** + /// ```yaml + /// sorting_columns: + /// - column: timestamp + /// descending: true + /// - column: user_id + /// descending: false + /// ``` + /// + /// If not specified, rows are written in the order they appear in the batch. + #[serde(default)] + pub sorting_columns: Option>, +} + +/// Column sorting configuration +#[configurable_component] +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct SortingColumnConfig { + /// Name of the column to sort by + #[configurable(metadata(docs::examples = "timestamp"))] + #[configurable(metadata(docs::examples = "user_id"))] + pub column: String, + + /// Sort in descending order (true) or ascending order (false) + /// + /// - `true`: Descending (Z-A, 9-0, newest-oldest) + /// - `false`: Ascending (A-Z, 0-9, oldest-newest) + #[serde(default)] + #[configurable(metadata(docs::examples = true))] + pub descending: bool, +} + +fn default_max_columns() -> usize { + 1000 +} + +fn schema_example() -> SchemaDefinition { + use super::schema_definition::FieldDefinition; + use std::collections::BTreeMap; + + let mut fields = BTreeMap::new(); + fields.insert( + "id".to_string(), + FieldDefinition { + r#type: "int64".to_string(), + bloom_filter: false, + bloom_filter_num_distinct_values: None, + bloom_filter_false_positive_pct: None, + }, + ); + fields.insert( + "name".to_string(), + FieldDefinition { + r#type: "utf8".to_string(), + bloom_filter: true, // Example: enable for high-cardinality string field + bloom_filter_num_distinct_values: Some(1_000_000), + bloom_filter_false_positive_pct: Some(0.01), + }, + ); + fields.insert( + "timestamp".to_string(), + FieldDefinition { + r#type: "timestamp_microsecond".to_string(), + bloom_filter: false, + bloom_filter_num_distinct_values: None, + bloom_filter_false_positive_pct: None, + }, + ); + SchemaDefinition { fields } +} + +impl std::fmt::Debug for ParquetSerializerConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ParquetSerializerConfig") + .field("schema", &self.schema.is_some()) + .field("infer_schema", &self.infer_schema) + .field("exclude_columns", &self.exclude_columns) + .field("max_columns", &self.max_columns) + .field("compression", &self.compression) + .field("compression_level", &self.compression_level) + .field("writer_version", &self.writer_version) + .field("row_group_size", &self.row_group_size) + .field("allow_nullable_fields", &self.allow_nullable_fields) + .field("sorting_columns", &self.sorting_columns) + .finish() + } +} + +impl ParquetSerializerConfig { + /// Create a new ParquetSerializerConfig with a schema definition + pub fn new(schema: SchemaDefinition) -> Self { + Self { + schema: Some(schema), + infer_schema: false, + exclude_columns: None, + max_columns: default_max_columns(), + compression: ParquetCompression::default(), + compression_level: None, + writer_version: ParquetWriterVersion::default(), + row_group_size: None, + allow_nullable_fields: false, + sorting_columns: None, + } + } + + /// Validate the configuration + fn validate(&self) -> Result<(), String> { + // Must specify exactly one schema method + match (self.schema.is_some(), self.infer_schema) { + (true, true) => { + Err("Cannot use both 'schema' and 'infer_schema: true'. Choose one.".to_string()) + } + (false, false) => { + Err("Must specify either 'schema' or 'infer_schema: true'".to_string()) + } + _ => Ok(()), + } + } + + /// The data type of events that are accepted by `ParquetSerializer`. + pub fn input_type(&self) -> vector_core::config::DataType { + vector_core::config::DataType::Log + } + + /// The schema required by the serializer. + pub fn schema_requirement(&self) -> vector_core::schema::Requirement { + vector_core::schema::Requirement::empty() + } +} + +/// Schema mode for Parquet serialization +#[derive(Clone, Debug)] +enum SchemaMode { + /// Use pre-defined explicit schema + Explicit { schema: Arc }, + /// Infer schema from each batch + Inferred { + exclude_columns: std::collections::BTreeSet, + max_columns: usize, + }, +} + +/// Parquet batch serializer that holds the schema and writer configuration +#[derive(Clone, Debug)] +pub struct ParquetSerializer { + schema_mode: SchemaMode, + writer_properties: WriterProperties, +} + +impl ParquetSerializer { + /// Create a new ParquetSerializer with the given configuration + pub fn new(config: ParquetSerializerConfig) -> Result { + // Validate configuration + config.validate().map_err(vector_common::Error::from)?; + + // Keep a copy of schema_def for later use with Bloom filters + let schema_def_opt = config.schema.clone(); + + // Determine schema mode + let schema_mode = if config.infer_schema { + SchemaMode::Inferred { + exclude_columns: config + .exclude_columns + .unwrap_or_default() + .into_iter() + .collect(), + max_columns: config.max_columns, + } + } else { + let schema_def = config.schema.ok_or_else(|| { + vector_common::Error::from("Schema required when infer_schema is false") + })?; + + // Convert SchemaDefinition to Arrow Schema + let mut schema = schema_def + .to_arrow_schema() + .map_err(|e| vector_common::Error::from(e.to_string()))?; + + // If allow_nullable_fields is enabled, transform the schema once here + if config.allow_nullable_fields { + schema = Arc::new(Schema::new_with_metadata( + schema + .fields() + .iter() + .map(|f| Arc::new(super::arrow::make_field_nullable(f))) + .collect::>(), + schema.metadata().clone(), + )); + } + + SchemaMode::Explicit { schema } + }; + + // Build writer properties + let compression = config + .compression + .to_compression(config.compression_level) + .map_err(vector_common::Error::from)?; + + tracing::debug!( + compression = ?config.compression, + compression_level = ?config.compression_level, + writer_version = ?config.writer_version, + infer_schema = config.infer_schema, + "Configuring Parquet writer properties" + ); + + let mut props_builder = WriterProperties::builder() + .set_compression(compression) + .set_writer_version(config.writer_version.into()); + + if let Some(row_group_size) = config.row_group_size { + props_builder = props_builder.set_max_row_group_size(row_group_size); + } + + // Only apply Bloom filters and sorting for explicit schema mode + if let (SchemaMode::Explicit { schema }, Some(schema_def)) = (&schema_mode, &schema_def_opt) + { + // Apply per-column Bloom filter settings from schema + let bloom_filter_configs = schema_def.extract_bloom_filter_configs(); + for bloom_config in bloom_filter_configs { + if let Some(col_idx) = schema + .fields() + .iter() + .position(|f| f.name() == &bloom_config.column_name) + { + // Use field-specific settings or sensible defaults + let fpp = bloom_config.fpp.unwrap_or(0.05); // Default 5% false positive rate + let mut ndv = bloom_config.ndv.unwrap_or(1_000_000); // Default 1M distinct values + + // Cap NDV to row group size (can't have more distinct values than total rows) + if let Some(row_group_size) = config.row_group_size { + ndv = ndv.min(row_group_size as u64); + } + + let column_path = ColumnPath::from(schema.field(col_idx).name().as_str()); + props_builder = props_builder + .set_column_bloom_filter_enabled(column_path.clone(), true) + .set_column_bloom_filter_fpp(column_path.clone(), fpp) + .set_column_bloom_filter_ndv(column_path, ndv); + } + } + + // Set sorting columns if configured + if let Some(sorting_cols) = &config.sorting_columns { + use parquet::format::SortingColumn; + + let parquet_sorting_cols: Vec = sorting_cols + .iter() + .map(|col| { + let col_idx = schema + .fields() + .iter() + .position(|f| f.name() == &col.column) + .ok_or_else(|| { + vector_common::Error::from(format!( + "Sorting column '{}' not found in schema", + col.column + )) + })?; + + Ok(SortingColumn::new(col_idx as i32, col.descending, false)) + }) + .collect::, vector_common::Error>>()?; + + props_builder = props_builder.set_sorting_columns(Some(parquet_sorting_cols)); + } + } + // Note: Bloom filters and sorting are NOT applied for inferred schemas + + let writer_properties = props_builder.build(); + + Ok(Self { + schema_mode, + writer_properties, + }) + } +} + +impl tokio_util::codec::Encoder> for ParquetSerializer { + type Error = ParquetEncodingError; + + fn encode(&mut self, events: Vec, buffer: &mut BytesMut) -> Result<(), Self::Error> { + if events.is_empty() { + return Err(ParquetEncodingError::NoEvents); + } + + // Determine schema based on mode + let schema = match &self.schema_mode { + SchemaMode::Explicit { schema } => Arc::clone(schema), + SchemaMode::Inferred { + exclude_columns, + max_columns, + } => infer_schema_from_events(&events, exclude_columns, *max_columns)?, + }; + + let bytes = encode_events_to_parquet(&events, schema, &self.writer_properties)?; + + // Use put() instead of extend_from_slice to avoid copying when possible + buffer.put(bytes); + Ok(()) + } +} + +/// Errors that can occur during Parquet encoding +#[derive(Debug, Snafu)] +pub enum ParquetEncodingError { + /// Failed to build Arrow record batch + #[snafu(display("Failed to build Arrow record batch: {}", source))] + RecordBatchCreation { + /// The underlying Arrow encoding error + source: ArrowEncodingError, + }, + + /// Failed to write Parquet data + #[snafu(display("Failed to write Parquet data: {}", source))] + ParquetWrite { + /// The underlying Parquet error + source: parquet::errors::ParquetError, + }, + + /// No events provided for encoding + #[snafu(display("No events provided for encoding"))] + NoEvents, + + /// Schema must be provided before encoding + #[snafu(display("Schema must be provided before encoding"))] + NoSchemaProvided, + + /// No fields could be inferred from events + #[snafu(display( + "No fields could be inferred from events (all fields excluded or only null values)" + ))] + NoFieldsInferred, + + /// Invalid event type (not a log event) + #[snafu(display("Invalid event type, expected log event"))] + InvalidEventType, + + /// JSON serialization error for nested types + #[snafu(display("Failed to serialize nested type as JSON: {}", source))] + JsonSerialization { + /// The underlying JSON error + source: serde_json::Error, + }, + + /// IO error during encoding + #[snafu(display("IO error: {}", source))] + Io { + /// The underlying IO error + source: std::io::Error, + }, +} + +impl From for ParquetEncodingError { + fn from(error: std::io::Error) -> Self { + Self::Io { source: error } + } +} + +impl From for ParquetEncodingError { + fn from(error: ArrowEncodingError) -> Self { + Self::RecordBatchCreation { source: error } + } +} + +impl From for ParquetEncodingError { + fn from(error: parquet::errors::ParquetError) -> Self { + Self::ParquetWrite { source: error } + } +} + +impl From for ParquetEncodingError { + fn from(error: serde_json::Error) -> Self { + Self::JsonSerialization { source: error } + } +} + +/// Infer Arrow DataType from a Vector Value +fn infer_arrow_type(value: &vector_core::event::Value) -> arrow::datatypes::DataType { + use arrow::datatypes::{DataType, TimeUnit}; + use vector_core::event::Value; + + match value { + Value::Bytes(_) => DataType::Utf8, + Value::Integer(_) => DataType::Int64, + Value::Float(_) => DataType::Float64, + Value::Boolean(_) => DataType::Boolean, + Value::Timestamp(_) => DataType::Timestamp(TimeUnit::Microsecond, None), + // Nested types and regex are always serialized as strings + Value::Array(_) | Value::Object(_) | Value::Regex(_) => DataType::Utf8, + // Null doesn't determine type, default to Utf8 + Value::Null => DataType::Utf8, + } +} + +/// Infer schema from a batch of events +fn infer_schema_from_events( + events: &[Event], + exclude_columns: &std::collections::BTreeSet, + max_columns: usize, +) -> Result, ParquetEncodingError> { + use arrow::datatypes::{DataType, Field}; + use std::collections::BTreeMap; + use vector_core::event::Value; + + let mut field_types: BTreeMap = BTreeMap::new(); + let mut type_conflicts: BTreeMap> = BTreeMap::new(); + + for event in events { + // Only process log events + let log = match event { + Event::Log(log) => log, + _ => return Err(ParquetEncodingError::InvalidEventType), + }; + + let fields_iter = log + .all_event_fields() + .ok_or(ParquetEncodingError::InvalidEventType)?; + + for (key, value) in fields_iter { + let key_str = key.to_string(); + + // Skip excluded columns + if exclude_columns.contains(&key_str) { + continue; + } + + // Skip Value::Null (doesn't determine type) + if matches!(value, Value::Null) { + continue; + } + + // Enforce max columns (skip new fields after limit) + if field_types.len() >= max_columns && !field_types.contains_key(&key_str) { + tracing::debug!( + column = %key_str, + max_columns = max_columns, + "Skipping column: max_columns limit reached" + ); + continue; + } + + let inferred_type = infer_arrow_type(value); + + match field_types.get(&key_str) { + None => { + // First occurrence of this field + field_types.insert(key_str, inferred_type); + } + Some(existing_type) if existing_type != &inferred_type => { + // Type conflict detected - fallback to Utf8 + tracing::warn!( + column = %key_str, + existing_type = ?existing_type, + new_type = ?inferred_type, + "Type conflict detected, encoding as Utf8" + ); + + type_conflicts + .entry(key_str.clone()) + .or_insert_with(|| vec![existing_type.clone()]) + .push(inferred_type); + + field_types.insert(key_str, DataType::Utf8); + } + Some(_) => { + // Same type, no action needed + } + } + } + } + + if field_types.is_empty() { + return Err(ParquetEncodingError::NoFieldsInferred); + } + + // Build Arrow schema (all fields nullable) + let arrow_fields: Vec> = field_types + .into_iter() + .map(|(name, dtype)| Arc::new(Field::new(name, dtype, true))) + .collect(); + + Ok(Arc::new(Schema::new(arrow_fields))) +} + +/// Encodes a batch of events into Parquet format +pub fn encode_events_to_parquet( + events: &[Event], + schema: Arc, + writer_properties: &WriterProperties, +) -> Result { + if events.is_empty() { + return Err(ParquetEncodingError::NoEvents); + } + + // Build Arrow RecordBatch from events (reuses Arrow encoder logic) + let record_batch = build_record_batch(schema, events)?; + + // Get batch metadata before we move into writer scope + let batch_schema = record_batch.schema(); + + // Write RecordBatch to Parquet format in memory + let mut buffer = Vec::new(); + { + let mut writer = + ArrowWriter::try_new(&mut buffer, batch_schema, Some(writer_properties.clone()))?; + + writer.write(&record_batch)?; + writer.close()?; + } + + Ok(Bytes::from(buffer)) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::{ + array::{ + Array, BinaryArray, BooleanArray, Float64Array, Int64Array, StringArray, + TimestampMicrosecondArray, + }, + datatypes::{DataType, Field, TimeUnit}, + }; + use bytes::Bytes; + use chrono::Utc; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use vector_core::event::LogEvent; + + #[test] + fn test_encode_all_types() { + let mut log = LogEvent::default(); + log.insert("string_field", "test"); + log.insert("int64_field", 42); + log.insert("float64_field", 3.15); + log.insert("bool_field", true); + log.insert("bytes_field", bytes::Bytes::from("binary")); + log.insert("timestamp_field", Utc::now()); + + let events = vec![Event::Log(log)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("string_field", DataType::Utf8, true), + Field::new("int64_field", DataType::Int64, true), + Field::new("float64_field", DataType::Float64, true), + Field::new("bool_field", DataType::Boolean, true), + Field::new("bytes_field", DataType::Binary, true), + Field::new( + "timestamp_field", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), + ])); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let result = encode_events_to_parquet(&events, Arc::clone(&schema), &props); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + + // Verify it's valid Parquet by reading it back + let reader = ParquetRecordBatchReaderBuilder::try_new(bytes) + .unwrap() + .build() + .unwrap(); + + let batches: Vec<_> = reader.collect::>().unwrap(); + assert_eq!(batches.len(), 1); + + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + assert_eq!(batch.num_columns(), 6); + + // Verify string field + assert_eq!( + batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + "test" + ); + + // Verify int64 field + assert_eq!( + batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + 42 + ); + + // Verify float64 field + assert!( + (batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap() + .value(0) + - 3.15) + .abs() + < 0.001 + ); + + // Verify boolean field + assert!( + batch + .column(3) + .as_any() + .downcast_ref::() + .unwrap() + .value(0) + ); + + // Verify binary field + assert_eq!( + batch + .column(4) + .as_any() + .downcast_ref::() + .unwrap() + .value(0), + b"binary" + ); + + // Verify timestamp field + assert!( + !batch + .column(5) + .as_any() + .downcast_ref::() + .unwrap() + .is_null(0) + ); + } + + #[test] + fn test_encode_null_values() { + let mut log1 = LogEvent::default(); + log1.insert("field_a", 1); + // field_b is missing + + let mut log2 = LogEvent::default(); + log2.insert("field_b", 2); + // field_a is missing + + let events = vec![Event::Log(log1), Event::Log(log2)]; + + let schema = Arc::new(Schema::new(vec![ + Field::new("field_a", DataType::Int64, true), + Field::new("field_b", DataType::Int64, true), + ])); + + let props = WriterProperties::builder().build(); + + let result = encode_events_to_parquet(&events, Arc::clone(&schema), &props); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let reader = ParquetRecordBatchReaderBuilder::try_new(bytes) + .unwrap() + .build() + .unwrap(); + + let batches: Vec<_> = reader.collect::>().unwrap(); + let batch = &batches[0]; + + assert_eq!(batch.num_rows(), 2); + + let field_a = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(field_a.value(0), 1); + assert!(field_a.is_null(1)); + + let field_b = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(field_b.is_null(0)); + assert_eq!(field_b.value(1), 2); + } + + #[test] + fn test_encode_empty_events() { + let events: Vec = vec![]; + let schema = Arc::new(Schema::new(vec![Field::new( + "field", + DataType::Int64, + true, + )])); + let props = WriterProperties::builder().build(); + let result = encode_events_to_parquet(&events, schema, &props); + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + ParquetEncodingError::NoEvents + )); + } + + #[test] + fn test_parquet_compression_types() { + let mut log = LogEvent::default(); + log.insert("message", "test message"); + + let events = vec![Event::Log(log)]; + let schema = Arc::new(Schema::new(vec![Field::new( + "message", + DataType::Utf8, + true, + )])); + + // Test different compression algorithms + let compressions = vec![ + ParquetCompression::Uncompressed, + ParquetCompression::Snappy, + ParquetCompression::Gzip, + ParquetCompression::Zstd, + ]; + + for compression in compressions { + let props = WriterProperties::builder() + .set_compression(compression.into()) + .build(); + + let result = encode_events_to_parquet(&events, Arc::clone(&schema), &props); + assert!(result.is_ok(), "Failed with compression: {:?}", compression); + + // Verify we can read it back + let bytes = result.unwrap(); + let reader = ParquetRecordBatchReaderBuilder::try_new(bytes) + .unwrap() + .build() + .unwrap(); + + let batches: Vec<_> = reader.collect::>().unwrap(); + assert_eq!(batches[0].num_rows(), 1); + } + } + + #[test] + fn test_parquet_serializer_config() { + use super::super::schema_definition::FieldDefinition; + use std::collections::BTreeMap; + + let mut fields = BTreeMap::new(); + fields.insert( + "field".to_string(), + FieldDefinition { + r#type: "int64".to_string(), + bloom_filter: false, + bloom_filter_num_distinct_values: None, + bloom_filter_false_positive_pct: None, + }, + ); + + let config = ParquetSerializerConfig { + schema: Some(SchemaDefinition { fields }), + infer_schema: false, + exclude_columns: None, + max_columns: default_max_columns(), + compression: ParquetCompression::Zstd, + compression_level: None, + writer_version: ParquetWriterVersion::default(), + row_group_size: Some(1000), + allow_nullable_fields: false, + sorting_columns: None, + }; + + let serializer = ParquetSerializer::new(config); + assert!(serializer.is_ok()); + } + + #[test] + fn test_parquet_serializer_no_schema_fails() { + let config = ParquetSerializerConfig { + schema: None, + infer_schema: false, + exclude_columns: None, + max_columns: default_max_columns(), + compression: ParquetCompression::default(), + compression_level: None, + writer_version: ParquetWriterVersion::default(), + row_group_size: None, + allow_nullable_fields: false, + sorting_columns: None, + }; + + let result = ParquetSerializer::new(config); + assert!(result.is_err()); + } + + #[test] + fn test_encoder_trait_implementation() { + use super::super::schema_definition::FieldDefinition; + use std::collections::BTreeMap; + use tokio_util::codec::Encoder; + + let mut fields = BTreeMap::new(); + fields.insert( + "id".to_string(), + FieldDefinition { + r#type: "int64".to_string(), + bloom_filter: false, + bloom_filter_num_distinct_values: None, + bloom_filter_false_positive_pct: None, + }, + ); + fields.insert( + "name".to_string(), + FieldDefinition { + r#type: "utf8".to_string(), + bloom_filter: false, + bloom_filter_num_distinct_values: None, + bloom_filter_false_positive_pct: None, + }, + ); + + let config = ParquetSerializerConfig::new(SchemaDefinition { fields }); + let mut serializer = ParquetSerializer::new(config).unwrap(); + + let mut log = LogEvent::default(); + log.insert("id", 1); + log.insert("name", "test"); + + let events = vec![Event::Log(log)]; + let mut buffer = BytesMut::new(); + + let result = serializer.encode(events, &mut buffer); + assert!(result.is_ok()); + assert!(!buffer.is_empty()); + + // Verify the buffer contains valid Parquet data + let bytes = Bytes::copy_from_slice(&buffer); + let reader = ParquetRecordBatchReaderBuilder::try_new(bytes); + assert!(reader.is_ok()); + } + + #[test] + fn test_large_batch_encoding() { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("value", DataType::Float64, true), + ])); + + // Create 10,000 events + let events: Vec = (0..10000) + .map(|i| { + let mut log = LogEvent::default(); + log.insert("id", i); + log.insert("value", i as f64 * 1.5); + Event::Log(log) + }) + .collect(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_max_row_group_size(5000) // 2 row groups + .build(); + + let result = encode_events_to_parquet(&events, Arc::clone(&schema), &props); + assert!(result.is_ok()); + + let bytes = result.unwrap(); + let reader = ParquetRecordBatchReaderBuilder::try_new(bytes) + .unwrap() + .build() + .unwrap(); + + let batches: Vec<_> = reader.collect::>().unwrap(); + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 10000); + } + + #[test] + fn test_allow_nullable_fields_config() { + use super::super::schema_definition::FieldDefinition; + use std::collections::BTreeMap; + use tokio_util::codec::Encoder; + + let mut fields = BTreeMap::new(); + fields.insert( + "required_field".to_string(), + FieldDefinition { + r#type: "int64".to_string(), + bloom_filter: false, + bloom_filter_num_distinct_values: None, + bloom_filter_false_positive_pct: None, + }, + ); + + let mut log1 = LogEvent::default(); + log1.insert("required_field", 42); + + let log2 = LogEvent::default(); + // log2 is missing required_field + + let events = vec![Event::Log(log1), Event::Log(log2)]; + + // Note: SchemaDefinition creates nullable fields by default + // This test verifies that the allow_nullable_fields flag works + let mut config = ParquetSerializerConfig::new(SchemaDefinition { fields }); + config.allow_nullable_fields = true; + + let mut serializer = ParquetSerializer::new(config).unwrap(); + let mut buffer = BytesMut::new(); + let result = serializer.encode(events.clone(), &mut buffer); + assert!(result.is_ok()); + + // Verify the data + let bytes = Bytes::copy_from_slice(&buffer); + let reader = ParquetRecordBatchReaderBuilder::try_new(bytes) + .unwrap() + .build() + .unwrap(); + + let batches: Vec<_> = reader.collect::>().unwrap(); + let batch = &batches[0]; + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(array.value(0), 42); + assert!(array.is_null(1)); + } +} diff --git a/lib/codecs/src/encoding/format/schema_definition.rs b/lib/codecs/src/encoding/format/schema_definition.rs new file mode 100644 index 0000000000000..1ed2c05b1ff31 --- /dev/null +++ b/lib/codecs/src/encoding/format/schema_definition.rs @@ -0,0 +1,390 @@ +//! Schema definition support for Arrow and Parquet encoders + +use std::{collections::BTreeMap, sync::Arc}; + +use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +#[allow(unused_imports)] // Used by vector_config macros +use serde::{Deserialize, Serialize}; +use snafu::Snafu; +use vector_config::configurable_component; + +/// Error type for schema definition parsing +#[derive(Debug, Snafu)] +pub enum SchemaDefinitionError { + /// Unknown data type specified in schema + #[snafu(display("Unknown data type '{}' for field '{}'", data_type, field_name))] + UnknownDataType { + /// The field name that had an unknown type + field_name: String, + /// The unknown data type string + data_type: String, + }, +} + +/// Per-column configuration including type and Bloom filter settings +#[configurable_component] +#[derive(Debug, Clone)] +pub struct FieldDefinition { + /// Data type for this field + #[configurable(metadata(docs::examples = "utf8"))] + #[configurable(metadata(docs::examples = "int64"))] + #[configurable(metadata(docs::examples = "timestamp_ms"))] + pub r#type: String, + + /// Enable Bloom filter for this specific column + /// + /// When enabled, a Bloom filter will be created for this column to improve + /// query performance for point lookups and IN clauses. Only enable for + /// high-cardinality columns (UUIDs, user IDs, etc.) to avoid overhead. + #[serde(default)] + #[configurable(metadata(docs::examples = true))] + pub bloom_filter: bool, + + /// Number of distinct values expected for this column's Bloom filter + /// + /// This controls the size of the Bloom filter. Should match the actual + /// cardinality of the column. Will be automatically capped to the batch size. + /// + /// - Low cardinality (countries, states): 1,000 - 100,000 + /// - Medium cardinality (cities, products): 100,000 - 1,000,000 + /// - High cardinality (UUIDs, user IDs): 10,000,000+ + #[serde(default, alias = "bloom_filter_ndv")] + #[configurable(metadata(docs::examples = 1000000))] + #[configurable(metadata(docs::examples = 10000000))] + pub bloom_filter_num_distinct_values: Option, + + /// False positive probability for this column's Bloom filter (as a percentage) + /// + /// Lower values create larger but more accurate filters. + /// + /// - 0.05 (5%): Good balance for general use + /// - 0.01 (1%): Better for high-selectivity queries + #[serde(default, alias = "bloom_filter_fpp")] + #[configurable(metadata(docs::examples = 0.05))] + #[configurable(metadata(docs::examples = 0.01))] + pub bloom_filter_false_positive_pct: Option, +} + +/// Bloom filter configuration for a specific column +#[derive(Debug, Clone)] +pub struct ColumnBloomFilterConfig { + /// Column name + pub column_name: String, + /// Whether Bloom filter is enabled for this column + pub enabled: bool, + /// Number of distinct values (if specified) + pub ndv: Option, + /// False positive probability (if specified) + pub fpp: Option, +} + +/// A schema definition that can be deserialized from configuration +#[configurable_component] +#[derive(Debug, Clone)] +pub struct SchemaDefinition { + /// Map of field names to their type and Bloom filter configuration + #[serde(flatten)] + #[configurable(metadata( + docs::additional_props_description = "A field definition specifying the data type and optional Bloom filter configuration." + ))] + pub fields: BTreeMap, +} + +impl SchemaDefinition { + /// Convert the schema definition to an Arrow Schema + pub fn to_arrow_schema(&self) -> Result, SchemaDefinitionError> { + let arrow_fields: Result, _> = self + .fields + .iter() + .map(|(name, field_def)| { + let data_type = parse_data_type(&field_def.r#type, name)?; + // All fields are nullable by default when defined in config + Ok(Arc::new(Field::new(name, data_type, true))) + }) + .collect(); + + Ok(Arc::new(Schema::new(arrow_fields?))) + } + + /// Extract per-column Bloom filter configurations + pub fn extract_bloom_filter_configs(&self) -> Vec { + self.fields + .iter() + .filter_map(|(name, field_def)| { + if field_def.bloom_filter { + Some(ColumnBloomFilterConfig { + column_name: name.clone(), + enabled: true, + ndv: field_def.bloom_filter_num_distinct_values, + fpp: field_def.bloom_filter_false_positive_pct, + }) + } else { + None + } + }) + .collect() + } +} + +/// Parse a data type string into an Arrow DataType +fn parse_data_type(type_str: &str, field_name: &str) -> Result { + let data_type = match type_str.to_lowercase().as_str() { + // String types + "utf8" | "string" => DataType::Utf8, + "large_utf8" | "large_string" => DataType::LargeUtf8, + + // Integer types + "int8" => DataType::Int8, + "int16" => DataType::Int16, + "int32" => DataType::Int32, + "int64" => DataType::Int64, + + // Unsigned integer types + "uint8" => DataType::UInt8, + "uint16" => DataType::UInt16, + "uint32" => DataType::UInt32, + "uint64" => DataType::UInt64, + + // Floating point types + "float32" | "float" => DataType::Float32, + "float64" | "double" => DataType::Float64, + + // Boolean + "boolean" | "bool" => DataType::Boolean, + + // Binary types + "binary" => DataType::Binary, + "large_binary" => DataType::LargeBinary, + + // Timestamp types + "timestamp_second" | "timestamp_s" => DataType::Timestamp(TimeUnit::Second, None), + "timestamp_millisecond" | "timestamp_ms" | "timestamp_millis" => { + DataType::Timestamp(TimeUnit::Millisecond, None) + } + "timestamp_microsecond" | "timestamp_us" | "timestamp_micros" => { + DataType::Timestamp(TimeUnit::Microsecond, None) + } + "timestamp_nanosecond" | "timestamp_ns" | "timestamp_nanos" => { + DataType::Timestamp(TimeUnit::Nanosecond, None) + } + + // Date types + "date32" | "date" => DataType::Date32, + "date64" => DataType::Date64, + + // Time types + "time32_second" | "time32_s" => DataType::Time32(TimeUnit::Second), + "time32_millisecond" | "time32_ms" => DataType::Time32(TimeUnit::Millisecond), + "time64_microsecond" | "time64_us" => DataType::Time64(TimeUnit::Microsecond), + "time64_nanosecond" | "time64_ns" => DataType::Time64(TimeUnit::Nanosecond), + + // Duration types + "duration_second" | "duration_s" => DataType::Duration(TimeUnit::Second), + "duration_millisecond" | "duration_ms" => DataType::Duration(TimeUnit::Millisecond), + "duration_microsecond" | "duration_us" => DataType::Duration(TimeUnit::Microsecond), + "duration_nanosecond" | "duration_ns" => DataType::Duration(TimeUnit::Nanosecond), + + // Decimal types + "decimal128" => DataType::Decimal128(38, 10), // Default precision and scale + "decimal256" => DataType::Decimal256(76, 10), // Default precision and scale + + // Unknown type + _ => { + return Err(SchemaDefinitionError::UnknownDataType { + field_name: field_name.to_string(), + data_type: type_str.to_string(), + }); + } + }; + + Ok(data_type) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_simple_schema_definition() { + let mut fields = BTreeMap::new(); + fields.insert( + "id".to_string(), + FieldDefinition { + r#type: "int64".to_string(), + bloom_filter: false, + bloom_filter_num_distinct_values: None, + bloom_filter_false_positive_pct: None, + }, + ); + fields.insert( + "name".to_string(), + FieldDefinition { + r#type: "utf8".to_string(), + bloom_filter: false, + bloom_filter_num_distinct_values: None, + bloom_filter_false_positive_pct: None, + }, + ); + fields.insert( + "value".to_string(), + FieldDefinition { + r#type: "float64".to_string(), + bloom_filter: false, + bloom_filter_num_distinct_values: None, + bloom_filter_false_positive_pct: None, + }, + ); + + let schema_def = SchemaDefinition { fields }; + let schema = schema_def.to_arrow_schema().unwrap(); + + assert_eq!(schema.fields().len(), 3); + + let id_field = schema.field_with_name("id").unwrap(); + assert_eq!(id_field.data_type(), &DataType::Int64); + assert!(id_field.is_nullable()); + + let name_field = schema.field_with_name("name").unwrap(); + assert_eq!(name_field.data_type(), &DataType::Utf8); + + let value_field = schema.field_with_name("value").unwrap(); + assert_eq!(value_field.data_type(), &DataType::Float64); + } + + #[test] + fn test_timestamp_types() { + let mut fields = BTreeMap::new(); + fields.insert( + "ts_s".to_string(), + FieldDefinition { + r#type: "timestamp_second".to_string(), + bloom_filter: false, + bloom_filter_num_distinct_values: None, + bloom_filter_false_positive_pct: None, + }, + ); + fields.insert( + "ts_ms".to_string(), + FieldDefinition { + r#type: "timestamp_millisecond".to_string(), + bloom_filter: false, + bloom_filter_num_distinct_values: None, + bloom_filter_false_positive_pct: None, + }, + ); + fields.insert( + "ts_us".to_string(), + FieldDefinition { + r#type: "timestamp_microsecond".to_string(), + bloom_filter: false, + bloom_filter_num_distinct_values: None, + bloom_filter_false_positive_pct: None, + }, + ); + fields.insert( + "ts_ns".to_string(), + FieldDefinition { + r#type: "timestamp_nanosecond".to_string(), + bloom_filter: false, + bloom_filter_num_distinct_values: None, + bloom_filter_false_positive_pct: None, + }, + ); + + let schema_def = SchemaDefinition { fields }; + let schema = schema_def.to_arrow_schema().unwrap(); + + assert_eq!( + schema.field_with_name("ts_s").unwrap().data_type(), + &DataType::Timestamp(TimeUnit::Second, None) + ); + assert_eq!( + schema.field_with_name("ts_ms").unwrap().data_type(), + &DataType::Timestamp(TimeUnit::Millisecond, None) + ); + assert_eq!( + schema.field_with_name("ts_us").unwrap().data_type(), + &DataType::Timestamp(TimeUnit::Microsecond, None) + ); + assert_eq!( + schema.field_with_name("ts_ns").unwrap().data_type(), + &DataType::Timestamp(TimeUnit::Nanosecond, None) + ); + } + + #[test] + fn test_unknown_data_type() { + let mut fields = BTreeMap::new(); + fields.insert( + "bad_field".to_string(), + FieldDefinition { + r#type: "unknown_type".to_string(), + bloom_filter: false, + bloom_filter_num_distinct_values: None, + bloom_filter_false_positive_pct: None, + }, + ); + + let schema_def = SchemaDefinition { fields }; + let result = schema_def.to_arrow_schema(); + + assert!(result.is_err()); + let err = result.unwrap_err(); + assert!(err.to_string().contains("unknown_type")); + } + + #[test] + fn test_bloom_filter_extraction() { + let mut fields = BTreeMap::new(); + fields.insert( + "id".to_string(), + FieldDefinition { + r#type: "int64".to_string(), + bloom_filter: false, + bloom_filter_num_distinct_values: None, + bloom_filter_false_positive_pct: None, + }, + ); + fields.insert( + "user_id".to_string(), + FieldDefinition { + r#type: "utf8".to_string(), + bloom_filter: true, + bloom_filter_num_distinct_values: Some(10_000_000), + bloom_filter_false_positive_pct: Some(0.01), + }, + ); + fields.insert( + "request_id".to_string(), + FieldDefinition { + r#type: "utf8".to_string(), + bloom_filter: true, + bloom_filter_num_distinct_values: None, // Will use global default + bloom_filter_false_positive_pct: None, + }, + ); + + let schema_def = SchemaDefinition { fields }; + let bloom_configs = schema_def.extract_bloom_filter_configs(); + + assert_eq!(bloom_configs.len(), 2); + + // Check user_id config + let user_id_config = bloom_configs + .iter() + .find(|c| c.column_name == "user_id") + .unwrap(); + assert!(user_id_config.enabled); + assert_eq!(user_id_config.ndv, Some(10_000_000)); + assert_eq!(user_id_config.fpp, Some(0.01)); + + // Check request_id config + let request_id_config = bloom_configs + .iter() + .find(|c| c.column_name == "request_id") + .unwrap(); + assert!(request_id_config.enabled); + assert_eq!(request_id_config.ndv, None); + assert_eq!(request_id_config.fpp, None); + } +} diff --git a/lib/codecs/src/encoding/mod.rs b/lib/codecs/src/encoding/mod.rs index 7d611790cb613..8c04dde38df81 100644 --- a/lib/codecs/src/encoding/mod.rs +++ b/lib/codecs/src/encoding/mod.rs @@ -21,6 +21,12 @@ pub use format::{ }; #[cfg(feature = "opentelemetry")] pub use format::{OtlpSerializer, OtlpSerializerConfig}; +#[cfg(feature = "parquet")] +pub use format::{ + ParquetCompression, ParquetEncodingError, ParquetSerializer, ParquetSerializerConfig, +}; +#[cfg(any(feature = "arrow", feature = "parquet"))] +pub use format::{SchemaDefinition, SchemaDefinitionError}; #[cfg(feature = "syslog")] pub use format::{SyslogSerializer, SyslogSerializerConfig}; pub use framing::{ @@ -30,7 +36,7 @@ pub use framing::{ NewlineDelimitedEncoderConfig, VarintLengthDelimitedEncoder, VarintLengthDelimitedEncoderConfig, }; -#[cfg(feature = "arrow")] +#[cfg(any(feature = "arrow", feature = "parquet"))] pub use serializer::BatchSerializerConfig; pub use serializer::{Serializer, SerializerConfig}; diff --git a/lib/codecs/src/encoding/serializer.rs b/lib/codecs/src/encoding/serializer.rs index 8422587bddf28..5c09fe9541f24 100644 --- a/lib/codecs/src/encoding/serializer.rs +++ b/lib/codecs/src/encoding/serializer.rs @@ -1,11 +1,15 @@ //! Serializer configuration and implementation for encoding structured events as bytes. use bytes::BytesMut; +#[cfg(feature = "parquet")] +use vector_common::Error as VectorError; use vector_config::configurable_component; use vector_core::{config::DataType, event::Event, schema}; #[cfg(feature = "arrow")] -use super::format::{ArrowStreamSerializer, ArrowStreamSerializerConfig}; +use super::format::ArrowStreamSerializerConfig; +#[cfg(feature = "parquet")] +use super::format::ParquetSerializerConfig; #[cfg(feature = "opentelemetry")] use super::format::{OtlpSerializer, OtlpSerializerConfig}; #[cfg(feature = "syslog")] @@ -112,6 +116,19 @@ pub enum SerializerConfig { /// [protobuf]: https://protobuf.dev/ Protobuf(ProtobufSerializerConfig), + /// Encodes events in [Apache Parquet][apache_parquet] columnar format. + /// + /// Parquet is a columnar storage format optimized for analytics workloads. + /// It provides efficient compression and encoding schemes, making it ideal + /// for long-term storage and query performance. + /// + /// [apache_parquet]: https://parquet.apache.org/ + #[cfg(feature = "parquet")] + Parquet { + /// Apache Parquet-specific encoder options. + parquet: ParquetSerializerConfig, + }, + /// No encoding. /// /// This encoding uses the `message` field of a log event. @@ -160,32 +177,40 @@ pub enum BatchSerializerConfig { #[cfg(feature = "arrow")] #[serde(rename = "arrow_stream")] ArrowStream(ArrowStreamSerializerConfig), + + /// Encodes events in [Apache Parquet][apache_parquet] columnar format. + /// + /// Parquet is a columnar storage format optimized for analytics workloads. + /// It provides efficient compression and encoding schemes, making it ideal + /// for long-term storage and query performance. + /// + /// [apache_parquet]: https://parquet.apache.org/ + #[cfg(feature = "parquet")] + Parquet { + /// Apache Parquet-specific encoder options. + parquet: ParquetSerializerConfig, + }, } -#[cfg(feature = "arrow")] +#[cfg(any(feature = "arrow", feature = "parquet"))] impl BatchSerializerConfig { - /// Build the `ArrowStreamSerializer` from this configuration. - pub fn build( - &self, - ) -> Result> { - match self { - BatchSerializerConfig::ArrowStream(arrow_config) => { - ArrowStreamSerializer::new(arrow_config.clone()) - } - } - } - /// The data type of events that are accepted by this batch serializer. pub fn input_type(&self) -> DataType { match self { + #[cfg(feature = "arrow")] BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.input_type(), + #[cfg(feature = "parquet")] + BatchSerializerConfig::Parquet { parquet } => parquet.input_type(), } } /// The schema required by the batch serializer. pub fn schema_requirement(&self) -> schema::Requirement { match self { + #[cfg(feature = "arrow")] BatchSerializerConfig::ArrowStream(arrow_config) => arrow_config.schema_requirement(), + #[cfg(feature = "parquet")] + BatchSerializerConfig::Parquet { parquet } => parquet.schema_requirement(), } } } @@ -288,6 +313,10 @@ impl SerializerConfig { Ok(Serializer::RawMessage(RawMessageSerializerConfig.build())) } SerializerConfig::Text(config) => Ok(Serializer::Text(config.build())), + #[cfg(feature = "parquet")] + SerializerConfig::Parquet { .. } => Err(VectorError::from( + "Parquet codec is available only for batch encoding and cannot be built as a framed serializer.", + )), #[cfg(feature = "syslog")] SerializerConfig::Syslog(config) => Ok(Serializer::Syslog(config.build())), } @@ -327,6 +356,8 @@ impl SerializerConfig { SerializerConfig::Gelf(_) => { FramingConfig::CharacterDelimited(CharacterDelimitedEncoderConfig::new(0)) } + #[cfg(feature = "parquet")] + SerializerConfig::Parquet { .. } => FramingConfig::NewlineDelimited, } } @@ -346,6 +377,8 @@ impl SerializerConfig { #[cfg(feature = "opentelemetry")] SerializerConfig::Otlp => OtlpSerializerConfig::default().input_type(), SerializerConfig::Protobuf(config) => config.input_type(), + #[cfg(feature = "parquet")] + SerializerConfig::Parquet { parquet } => parquet.input_type(), SerializerConfig::RawMessage => RawMessageSerializerConfig.input_type(), SerializerConfig::Text(config) => config.input_type(), #[cfg(feature = "syslog")] @@ -369,6 +402,8 @@ impl SerializerConfig { #[cfg(feature = "opentelemetry")] SerializerConfig::Otlp => OtlpSerializerConfig::default().schema_requirement(), SerializerConfig::Protobuf(config) => config.schema_requirement(), + #[cfg(feature = "parquet")] + SerializerConfig::Parquet { parquet } => parquet.schema_requirement(), SerializerConfig::RawMessage => RawMessageSerializerConfig.schema_requirement(), SerializerConfig::Text(config) => config.schema_requirement(), #[cfg(feature = "syslog")] diff --git a/lib/vector-lib/Cargo.toml b/lib/vector-lib/Cargo.toml index c72af97fdaa62..35bd661ddb372 100644 --- a/lib/vector-lib/Cargo.toml +++ b/lib/vector-lib/Cargo.toml @@ -27,6 +27,7 @@ vrl = { workspace = true, optional = true } allocation-tracing = ["vector-top?/allocation-tracing"] api-client = ["dep:vector-api-client"] arrow = ["codecs/arrow"] +parquet = ["codecs/parquet"] api = ["vector-tap/api"] file-source = ["dep:file-source", "dep:file-source-common"] lua = ["vector-core/lua"] diff --git a/scripts/generate-component-docs.rb b/scripts/generate-component-docs.rb index 5506212d1b30c..ed2b90532d54c 100755 --- a/scripts/generate-component-docs.rb +++ b/scripts/generate-component-docs.rb @@ -1666,6 +1666,39 @@ def unwrap_resolved_schema(root_schema, schema_name, friendly_name) return sort_hash_nested(unwrapped_resolved_schema) end +PARQUET_ALLOWED_SINKS = %w[aws_s3 gcp_cloud_storage azure_blob].freeze + +def remove_parquet_from_codec_config!(schema, field_name) + field = schema[field_name] + return if field.nil? + + field_options = field.dig('type', 'object', 'options') + return if field_options.nil? + + field_options.delete('parquet') + + codec = field_options['codec'] + codec_enum = codec.dig('type', 'string', 'enum') if codec.is_a?(Hash) + if codec_enum.is_a?(Hash) + codec_enum.delete('parquet') + elsif codec_enum.is_a?(Array) + codec_enum.delete('parquet') + end +end + +def prune_parquet_from_schema!(schema) + return unless schema.is_a?(Hash) + + schema.each do |field_name, field_def| + if %w[encoding batch_encoding].include?(field_name) + remove_parquet_from_codec_config!(schema, field_name) + end + + options = field_def.dig('type', 'object', 'options') + prune_parquet_from_schema!(options) if options.is_a?(Hash) + end +end + def render_and_import_schema(unwrapped_resolved_schema, friendly_name, config_map_path, cue_relative_path) # Set up the appropriate structure for the value based on the configuration map path. It defines @@ -1714,6 +1747,10 @@ def render_and_import_generated_component_schema(root_schema, schema_name, compo def render_and_import_component_schema(root_schema, schema_name, component_type, component_name) friendly_name = "'#{component_name}' #{component_type} configuration" unwrapped_resolved_schema = unwrap_resolved_schema(root_schema, schema_name, friendly_name) + unwrapped_resolved_schema = deep_copy(unwrapped_resolved_schema) + if component_type == 'sink' && !PARQUET_ALLOWED_SINKS.include?(component_name) + prune_parquet_from_schema!(unwrapped_resolved_schema) + end render_and_import_schema( unwrapped_resolved_schema, friendly_name, diff --git a/src/codecs/encoding/config.rs b/src/codecs/encoding/config.rs index 47bec858ffb08..0b81bad39fe04 100644 --- a/src/codecs/encoding/config.rs +++ b/src/codecs/encoding/config.rs @@ -1,4 +1,8 @@ +#[cfg(feature = "codecs-parquet")] +use crate::codecs::{BatchEncoder, BatchSerializer}; use crate::codecs::{Encoder, EncoderKind, Transformer}; +#[cfg(feature = "codecs-parquet")] +use vector_lib::codecs::encoding::ParquetSerializer; use vector_lib::{ codecs::{ CharacterDelimitedEncoder, LengthDelimitedEncoder, NewlineDelimitedEncoder, @@ -143,9 +147,22 @@ impl EncodingConfigWithFraming { /// Build the `Transformer` and `EncoderKind` for this config. pub fn build_encoder(&self, sink_type: SinkType) -> crate::Result<(Transformer, EncoderKind)> { - let (framer, serializer) = self.build(sink_type)?; - let encoder = EncoderKind::Framed(Box::new(Encoder::::new(framer, serializer))); - Ok((self.transformer(), encoder)) + match &self.encoding.encoding { + #[cfg(feature = "codecs-parquet")] + SerializerConfig::Parquet { parquet } => { + let serializer = ParquetSerializer::new(parquet.clone())?; + let encoder = EncoderKind::Batch(Box::new(BatchEncoder::new( + BatchSerializer::Parquet(Box::new(serializer)), + ))); + Ok((self.transformer(), encoder)) + } + _ => { + let (framer, serializer) = self.build(sink_type)?; + let encoder = + EncoderKind::Framed(Box::new(Encoder::::new(framer, serializer))); + Ok((self.transformer(), encoder)) + } + } } } diff --git a/src/codecs/encoding/encoder.rs b/src/codecs/encoding/encoder.rs index a36b98a6496cb..8749005ee981e 100644 --- a/src/codecs/encoding/encoder.rs +++ b/src/codecs/encoding/encoder.rs @@ -2,6 +2,8 @@ use bytes::BytesMut; use tokio_util::codec::Encoder as _; #[cfg(feature = "codecs-arrow")] use vector_lib::codecs::encoding::ArrowStreamSerializer; +#[cfg(feature = "codecs-parquet")] +use vector_lib::codecs::encoding::ParquetSerializer; use vector_lib::codecs::{ CharacterDelimitedEncoder, NewlineDelimitedEncoder, TextSerializerConfig, encoding::{Error, Framer, Serializer}, @@ -18,6 +20,9 @@ pub enum BatchSerializer { /// Arrow IPC stream format serializer. #[cfg(feature = "codecs-arrow")] Arrow(ArrowStreamSerializer), + /// Parquet columnar format serializer. + #[cfg(feature = "codecs-parquet")] + Parquet(Box), } /// An encoder that encodes batches of events. @@ -38,10 +43,13 @@ impl BatchEncoder { } /// Get the HTTP content type. - #[cfg(feature = "codecs-arrow")] + #[cfg(any(feature = "codecs-arrow", feature = "codecs-parquet"))] pub const fn content_type(&self) -> &'static str { match &self.serializer { + #[cfg(feature = "codecs-arrow")] BatchSerializer::Arrow(_) => "application/vnd.apache.arrow.stream", + #[cfg(feature = "codecs-parquet")] + BatchSerializer::Parquet(_) => "application/vnd.apache.parquet", } } } @@ -65,6 +73,24 @@ impl tokio_util::codec::Encoder> for BatchEncoder { } }) } + #[cfg(feature = "codecs-parquet")] + BatchSerializer::Parquet(serializer) => { + serializer.encode(events, buffer).map_err(|err| { + use vector_lib::codecs::encoding::ParquetEncodingError; + match &err { + ParquetEncodingError::RecordBatchCreation { source } => { + use vector_lib::codecs::encoding::ArrowEncodingError; + match source { + ArrowEncodingError::NullConstraint { .. } => { + Error::SchemaConstraintViolation(Box::new(err)) + } + _ => Error::SerializingError(Box::new(err)), + } + } + _ => Error::SerializingError(Box::new(err)), + } + }) + } _ => unreachable!("BatchSerializer cannot be constructed without encode()"), } } @@ -76,8 +102,8 @@ pub enum EncoderKind { /// Uses framing to encode individual events Framed(Box>), /// Encodes events in batches without framing - #[cfg(feature = "codecs-arrow")] - Batch(BatchEncoder), + #[cfg(any(feature = "codecs-arrow", feature = "codecs-parquet"))] + Batch(Box), } #[derive(Debug, Clone)] diff --git a/src/components/validation/resources/mod.rs b/src/components/validation/resources/mod.rs index 85f72f2aaf35f..bd5fee6930c92 100644 --- a/src/components/validation/resources/mod.rs +++ b/src/components/validation/resources/mod.rs @@ -239,6 +239,8 @@ fn serializer_config_to_deserializer( SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes, #[cfg(feature = "codecs-opentelemetry")] SerializerConfig::Otlp => todo!(), + #[cfg(feature = "codecs-parquet")] + SerializerConfig::Parquet { .. } => DeserializerConfig::Bytes, // Parquet files are binary #[cfg(feature = "codecs-syslog")] SerializerConfig::Syslog(_) => todo!(), }; diff --git a/src/internal_events/codecs.rs b/src/internal_events/codecs.rs index 27980af51b799..c6b2671eec568 100644 --- a/src/internal_events/codecs.rs +++ b/src/internal_events/codecs.rs @@ -137,13 +137,13 @@ impl InternalEvent for EncoderWriteError<'_, E> { } } -#[cfg(feature = "codecs-arrow")] +#[cfg(any(feature = "codecs-arrow", feature = "codecs-parquet"))] #[derive(Debug, NamedInternalEvent)] pub struct EncoderNullConstraintError<'a> { pub error: &'a crate::Error, } -#[cfg(feature = "codecs-arrow")] +#[cfg(any(feature = "codecs-arrow", feature = "codecs-parquet"))] impl InternalEvent for EncoderNullConstraintError<'_> { fn emit(self) { const CONSTRAINT_REASON: &str = "Schema constraint violation."; diff --git a/src/sinks/aws_s3/config.rs b/src/sinks/aws_s3/config.rs index 08ba92245d65c..03c7ca8295e28 100644 --- a/src/sinks/aws_s3/config.rs +++ b/src/sinks/aws_s3/config.rs @@ -2,10 +2,7 @@ use aws_sdk_s3::Client as S3Client; use tower::ServiceBuilder; use vector_lib::{ TimeZone, - codecs::{ - TextSerializerConfig, - encoding::{Framer, FramingConfig}, - }, + codecs::{TextSerializerConfig, encoding::FramingConfig}, configurable::configurable_component, sink::VectorSink, }; @@ -13,7 +10,7 @@ use vector_lib::{ use super::sink::S3RequestOptions; use crate::{ aws::{AwsAuthentication, RegionOrEndpoint}, - codecs::{Encoder, EncodingConfigWithFraming, SinkType}, + codecs::{EncodingConfigWithFraming, SinkType}, config::{AcknowledgementsConfig, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext}, sinks::{ Healthcheck, @@ -245,9 +242,7 @@ impl S3SinkConfig { let partitioner = S3KeyPartitioner::new(key_prefix, ssekms_key_id, None); - let transformer = self.encoding.transformer(); - let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?; - let encoder = Encoder::::new(framer, serializer); + let (transformer, encoder) = self.encoding.build_encoder(SinkType::MessageBased)?; let request_options = S3RequestOptions { bucket: self.bucket.clone(), @@ -283,7 +278,7 @@ impl S3SinkConfig { #[cfg(test)] mod tests { - use super::S3SinkConfig; + use super::*; #[test] fn generate_config() { diff --git a/src/sinks/aws_s3/sink.rs b/src/sinks/aws_s3/sink.rs index 26d47cdb7039c..e99e5634aafac 100644 --- a/src/sinks/aws_s3/sink.rs +++ b/src/sinks/aws_s3/sink.rs @@ -3,10 +3,10 @@ use std::io; use bytes::Bytes; use chrono::{FixedOffset, Utc}; use uuid::Uuid; -use vector_lib::{codecs::encoding::Framer, event::Finalizable, request_metadata::RequestMetadata}; +use vector_lib::{event::Finalizable, request_metadata::RequestMetadata}; use crate::{ - codecs::{Encoder, Transformer}, + codecs::{EncoderKind, Transformer}, event::Event, sinks::{ s3_common::{ @@ -28,7 +28,7 @@ pub struct S3RequestOptions { pub filename_append_uuid: bool, pub filename_extension: Option, pub api_options: S3Options, - pub encoder: (Transformer, Encoder), + pub encoder: (Transformer, EncoderKind), pub compression: Compression, pub filename_tz_offset: Option, } @@ -36,7 +36,7 @@ pub struct S3RequestOptions { impl RequestBuilder<(S3PartitionKey, Vec)> for S3RequestOptions { type Metadata = S3Metadata; type Events = Vec; - type Encoder = (Transformer, Encoder); + type Encoder = (Transformer, EncoderKind); type Payload = Bytes; type Request = S3Request; type Error = io::Error; // TODO: this is ugly. diff --git a/src/sinks/clickhouse/config.rs b/src/sinks/clickhouse/config.rs index 6ff081e9aafa0..5421d4a30d100 100644 --- a/src/sinks/clickhouse/config.rs +++ b/src/sinks/clickhouse/config.rs @@ -5,7 +5,9 @@ use std::fmt; use http::{Request, StatusCode, Uri}; use hyper::Body; use vector_lib::codecs::encoding::format::SchemaProvider; -use vector_lib::codecs::encoding::{ArrowStreamSerializerConfig, BatchSerializerConfig}; +use vector_lib::codecs::encoding::{ + ArrowStreamSerializer, ArrowStreamSerializerConfig, BatchSerializerConfig, +}; use super::{ request_builder::ClickhouseRequestBuilder, @@ -293,6 +295,12 @@ impl ClickhouseConfig { let mut arrow_config = match batch_encoding { BatchSerializerConfig::ArrowStream(config) => config.clone(), + #[cfg(feature = "codecs-parquet")] + BatchSerializerConfig::Parquet { .. } => { + return Err( + "'batch_encoding' does not support Parquet for the ClickHouse sink.".into(), + ); + } }; self.resolve_arrow_schema( @@ -304,10 +312,9 @@ impl ClickhouseConfig { ) .await?; - let resolved_batch_config = BatchSerializerConfig::ArrowStream(arrow_config); - let arrow_serializer = resolved_batch_config.build()?; + let arrow_serializer = ArrowStreamSerializer::new(arrow_config)?; let batch_serializer = BatchSerializer::Arrow(arrow_serializer); - let encoder = EncoderKind::Batch(BatchEncoder::new(batch_serializer)); + let encoder = EncoderKind::Batch(Box::new(BatchEncoder::new(batch_serializer))); return Ok((Format::ArrowStream, encoder)); } diff --git a/src/sinks/util/encoding.rs b/src/sinks/util/encoding.rs index 6265021ef6f1a..15b37940409e0 100644 --- a/src/sinks/util/encoding.rs +++ b/src/sinks/util/encoding.rs @@ -8,7 +8,7 @@ use vector_lib::{ request_metadata::GroupedCountByteSize, }; -#[cfg(feature = "codecs-arrow")] +#[cfg(any(feature = "codecs-arrow", feature = "codecs-parquet"))] use crate::internal_events::EncoderNullConstraintError; use crate::{codecs::Transformer, event::Event, internal_events::EncoderWriteError}; @@ -99,7 +99,7 @@ impl Encoder for (Transformer, crate::codecs::Encoder<()>) { } } -#[cfg(feature = "codecs-arrow")] +#[cfg(any(feature = "codecs-arrow", feature = "codecs-parquet"))] impl Encoder> for (Transformer, crate::codecs::BatchEncoder) { fn encode_input( &self, @@ -150,9 +150,9 @@ impl Encoder> for (Transformer, crate::codecs::EncoderKind) { crate::codecs::EncoderKind::Framed(encoder) => { (self.0.clone(), *encoder.clone()).encode_input(events, writer) } - #[cfg(feature = "codecs-arrow")] + #[cfg(any(feature = "codecs-arrow", feature = "codecs-parquet"))] crate::codecs::EncoderKind::Batch(encoder) => { - (self.0.clone(), encoder.clone()).encode_input(events, writer) + (self.0.clone(), *encoder.clone()).encode_input(events, writer) } } } diff --git a/website/cue/reference.cue b/website/cue/reference.cue index 05d22f27e1f1a..844da8cb118c0 100644 --- a/website/cue/reference.cue +++ b/website/cue/reference.cue @@ -56,7 +56,7 @@ _values: { // * `removed` - The component has been removed. #DevelopmentStatus: "beta" | "stable" | "deprecated" | "removed" -#EncodingCodec: "json" | "logfmt" | "text" | "csv" | "native" | "native_json" | "avro" | "gelf" +#EncodingCodec: "json" | "logfmt" | "text" | "csv" | "native" | "native_json" | "avro" | "gelf" | "parquet" #Endpoint: { description: string @@ -556,6 +556,8 @@ _values: { } #TypeObject: { + _args: required: bool + // `examples` clarify values through examples. This should be used // when examples cannot be derived from the `default` or `enum` // options. diff --git a/website/cue/reference/components/sinks.cue b/website/cue/reference/components/sinks.cue index cb399721a8c0b..7ebcf691e30bf 100644 --- a/website/cue/reference/components/sinks.cue +++ b/website/cue/reference/components/sinks.cue @@ -188,6 +188,20 @@ components: sinks: [Name=string]: { [apache_avro]: https://avro.apache.org/ """ } + if codec == "parquet" { + parquet: """ + Encodes events in [Apache Parquet][apache_parquet] columnar format. + + Parquet is a columnar storage format optimized for analytics workloads. It provides + efficient compression and encoding schemes, making it ideal for long-term storage and + query performance with tools like AWS Athena, Apache Spark, and Presto. + + This is a batch encoder that encodes multiple events at once into a single Parquet file. + Each batch of events becomes one Parquet file with proper metadata and footers. + + [apache_parquet]: https://parquet.apache.org/ + """ + } } } } @@ -216,6 +230,55 @@ components: sinks: [Name=string]: { } } } + if codec == "parquet" { + parquet: { + description: "Apache Parquet-specific encoder options." + required: false + relevant_when: "codec = `parquet`" + type: object: options: { + compression: { + description: "Compression algorithm for Parquet columns." + required: false + type: string: { + default: "snappy" + enum: { + snappy: "Snappy compression (fast, moderate compression ratio)" + gzip: "GZIP compression (balanced, good for AWS Athena)" + zstd: "ZSTD compression (best compression ratio)" + lz4: "LZ4 compression (very fast)" + brotli: "Brotli compression (good compression)" + uncompressed: "No compression" + } + } + } + row_group_size: { + description: """ + Number of rows per row group. + + Row groups are Parquet's unit of parallelization. Larger row groups can improve + compression but increase memory usage during encoding. If not specified, defaults + to the batch size. + """ + required: false + type: uint: { + default: null + examples: [100000, 1000000] + } + } + allow_nullable_fields: { + description: """ + Allow null values for non-nullable fields in the schema. + + When enabled, missing or incompatible values will be encoded as null even for fields + marked as non-nullable in the schema. This is useful when working with downstream + systems that can handle null values through defaults or computed columns. + """ + required: false + type: bool: default: false + } + } + } + } } } diff --git a/website/cue/reference/components/sinks/aws_s3.cue b/website/cue/reference/components/sinks/aws_s3.cue index cec5a50e47f82..529a9367c36b9 100644 --- a/website/cue/reference/components/sinks/aws_s3.cue +++ b/website/cue/reference/components/sinks/aws_s3.cue @@ -34,7 +34,7 @@ components: sinks: aws_s3: components._aws & { codec: { enabled: true framing: true - enum: ["json", "text"] + enum: ["json", "text", "parquet"] } } proxy: enabled: true @@ -103,6 +103,474 @@ components: sinks: aws_s3: components._aws & { """ } + parquet_encoding: { + title: "Parquet encoding" + body: """ + The AWS S3 sink supports encoding events in [Apache Parquet](\(urls.apache_parquet)) + format, which is a columnar storage format optimized for analytics workloads. Parquet + provides efficient compression and encoding schemes, making it ideal for long-term + storage and query performance with tools like AWS Athena, Apache Spark, and Presto. + + ## Schema Configuration + + Vector supports two approaches for defining the Parquet schema: + + 1. **Explicit Schema**: Define the exact structure and data types for your Parquet files + 2. **Automatic Schema Inference**: Let Vector automatically infer the schema from your event data + + You must choose exactly one approach - they are mutually exclusive. + + ### Automatic Schema Inference (Recommended for Getting Started) + + When enabled, Vector automatically infers the schema from each batch of events by examining + the data types of values in the events. This is the easiest way to get started with Parquet + encoding. + + **Type mapping:** + - String values → `utf8` + - Integer values → `int64` + - Float values → `float64` + - Boolean values → `boolean` + - Timestamp values → `timestamp_microsecond` + - Arrays/Objects → `utf8` (serialized as JSON) + + **Type conflicts:** If a field has different types across events in the same batch, + it will be encoded as `utf8` (string) and all values will be converted to strings. + + **Important:** Schema consistency across batches is the operator's responsibility. + Use VRL transforms to ensure consistent types if needed. Each batch may produce + a different schema if event structure varies. + + **Limitations:** Bloom filters and sorting are not supported with automatic schema inference. + Use explicit schema if you need these features. + + **Example configuration with schema inference:** + + ```yaml + sinks: + s3: + type: aws_s3 + bucket: my-bucket + compression: none # Parquet handles compression internally + batch: + max_events: 50000 + timeout_secs: 60 + encoding: + codec: parquet + parquet: + infer_schema: true + exclude_columns: + - _metadata + - internal_id + max_columns: 1000 + compression: zstd + compression_level: 6 + writer_version: v2 + row_group_size: 50000 + ``` + + ### Explicit Schema (Recommended for Production) + + For production use, explicitly defining the schema provides better control, consistency, + and access to advanced features like per-column Bloom filters and sorting. The schema + is defined as a map of field names to field definitions. + + All fields defined in the schema are nullable by default, meaning missing fields will be encoded + as NULL values in the Parquet file. + + **Example configuration with explicit schema:** + + ```yaml + sinks: + s3: + type: aws_s3 + bucket: my-bucket + compression: none # Parquet handles compression internally + batch: + max_events: 50000 + timeout_secs: 60 + encoding: + codec: parquet + parquet: + schema: + # Timestamps + timestamp: + type: timestamp_microsecond + bloom_filter: false + created_at: + type: timestamp_millisecond + bloom_filter: false + + # String fields with per-column Bloom filters + user_id: + type: utf8 + bloom_filter: true # Enable for high-cardinality field + bloom_filter_num_distinct_values: 10000000 + bloom_filter_false_positive_pct: 0.01 + event_name: + type: utf8 + bloom_filter: false + message: + type: utf8 + bloom_filter: false + + # Numeric fields + team_id: + type: int64 + bloom_filter: false + duration_ms: + type: float64 + bloom_filter: false + count: + type: int32 + bloom_filter: false + + # Boolean + is_active: + type: boolean + bloom_filter: false + + compression: zstd + compression_level: 6 # ZSTD level 1-22 (higher = better compression) + writer_version: v2 # Use modern Parquet format + row_group_size: 50000 # Should be <= batch.max_events + allow_nullable_fields: true + sorting_columns: # Pre-sort for better compression and queries + - column: timestamp + descending: true # Most recent first + ``` + + ## Supported Data Types + + The following data types are supported for Parquet schema fields: + + **String types:** + - `utf8` or `string`: UTF-8 encoded strings + - `large_utf8` or `large_string`: Large UTF-8 strings (>2GB) + + **Integer types:** + - `int8`, `int16`, `int32`, `int64`: Signed integers + - `uint8`, `uint16`, `uint32`, `uint64`: Unsigned integers + + **Floating point types:** + - `float32` or `float`: 32-bit floating point + - `float64` or `double`: 64-bit floating point + + **Timestamp types:** + - `timestamp_second` or `timestamp_s`: Seconds since Unix epoch + - `timestamp_millisecond`, `timestamp_ms`, or `timestamp_millis`: Milliseconds since Unix epoch + - `timestamp_microsecond`, `timestamp_us`, or `timestamp_micros`: Microseconds since Unix epoch + - `timestamp_nanosecond`, `timestamp_ns`, or `timestamp_nanos`: Nanoseconds since Unix epoch + + **Date types:** + - `date32` or `date`: Days since Unix epoch (32-bit) + - `date64`: Milliseconds since Unix epoch (64-bit) + + **Other types:** + - `boolean` or `bool`: Boolean values + - `binary`: Arbitrary binary data + - `large_binary`: Large binary data (>2GB) + - `decimal128`: 128-bit decimal with default precision + - `decimal256`: 256-bit decimal with default precision + + ## Parquet Configuration Options + + ### Schema Options + + #### schema + + Explicitly define the Arrow schema for encoding events to Parquet. This schema defines + the structure and types of the Parquet file columns, specified as a map of field names + to field definitions. + + Each field definition includes: + - **type**: The Arrow data type (required) + - **bloom_filter**: Enable Bloom filter for this column (optional, default: false) + - **bloom_filter_num_distinct_values**: Number of distinct values for this column's Bloom filter (optional) + - **bloom_filter_false_positive_pct**: False positive probability for this column's Bloom filter (optional) + + All fields are nullable by default, meaning missing fields will be encoded as NULL values. + + **Mutually exclusive with `infer_schema`**. You must specify either `schema` or + `infer_schema: true`, but not both. + + **Example:** + ```yaml + schema: + user_id: + type: utf8 + bloom_filter: true + bloom_filter_num_distinct_values: 10000000 + bloom_filter_false_positive_pct: 0.01 + timestamp: + type: timestamp_microsecond + bloom_filter: false + count: + type: int64 + bloom_filter: false + ``` + + #### infer_schema + + Automatically infer the schema from event data. When enabled, Vector examines each + batch of events and automatically determines the appropriate Arrow data types based + on the values present. + + **Type inference rules:** + - String values → `utf8` + - Integer values → `int64` + - Float values → `float64` + - Boolean values → `boolean` + - Timestamp values → `timestamp_microsecond` + - Arrays/Objects → `utf8` (serialized as JSON) + - Type conflicts → `utf8` (fallback to string with warning) + + **Important considerations:** + - Schema may vary between batches if event structure changes + - Use VRL transforms to ensure type consistency if needed + - Bloom filters and sorting are not available with inferred schemas + - For production workloads, explicit schemas are recommended + + **Mutually exclusive with `schema`**. You must specify either `schema` or + `infer_schema: true`, but not both. + + **Default**: `false` + + #### exclude_columns + + Column names to exclude from Parquet encoding when using automatic schema inference. + These columns will be completely excluded from the Parquet file. + + Useful for filtering out metadata, internal fields, or temporary data that shouldn't + be persisted to long-term storage. + + **Only applies when `infer_schema` is enabled**. Ignored when using explicit schema + (use the schema definition to control which fields are included). + + **Example:** + ```yaml + infer_schema: true + exclude_columns: + - _metadata + - internal_id + - temp_field + ``` + + #### max_columns + + Maximum number of columns to encode when using automatic schema inference. Additional + columns beyond this limit will be silently dropped. Columns are selected in the order + they appear in the first event. + + This protects against accidentally creating Parquet files with too many columns, which + can cause performance issues in query engines. + + **Only applies when `infer_schema` is enabled**. Ignored when using explicit schema. + + **Default**: `1000` + + **Recommended values:** + - Standard use cases: `1000` (default) + - Wide tables: `500` - `1000` + - Performance-critical: `100` - `500` + + ### Compression Options + + #### compression + + Compression algorithm applied to Parquet column data: + - `snappy` (default): Fast compression with moderate compression ratio + - `gzip`: Balanced compression, excellent AWS Athena compatibility + - `zstd`: Best compression ratio, ideal for cold storage + - `lz4`: Very fast compression, good for high-throughput scenarios + - `brotli`: Good compression, web-optimized + - `uncompressed`: No compression + + ### compression_level + + Compression level for algorithms that support it (ZSTD, GZIP, Brotli). This controls the + trade-off between compression ratio and encoding speed. + + **ZSTD levels (1-22):** + - **1-3**: Fastest encoding, moderate compression (level 3 is default) + - **4-9**: Good balance of speed and compression + - **10-15**: Better compression, slower encoding (recommended for cold storage) + - **16-22**: Maximum compression, slowest encoding + + **GZIP levels (1-9):** + - **1-3**: Faster encoding, less compression + - **6**: Default balance (recommended) + - **9**: Maximum compression, slowest + + **Brotli levels (0-11):** + - **0-4**: Faster encoding + - **1**: Default (recommended) + - **5-11**: Better compression, slower + + Higher levels typically produce 20-50% smaller files but take 2-5x longer to encode. + **Recommendation:** Use level 3-6 for hot data, 10-15 for cold storage. + + ### writer_version + + Parquet format version to write. Controls compatibility vs. performance. + + **Options:** + - **v1** (default): PARQUET_1_0 - Maximum compatibility with older readers + - **v2**: PARQUET_2_0 - Modern format with better encoding and statistics + + **Version 2 benefits:** + - 10-20% more efficient encoding for certain data types + - Better statistics for query optimization + - Improved data page format + - Required for some advanced features + + **When to use:** + - Use **v1** for maximum compatibility with pre-2018 tools + - Use **v2** for better performance with modern query engines (Athena, Spark, Presto) + + ### row_group_size + + Number of rows per row group in the Parquet file. Row groups are Parquet's unit of + parallelization - query engines can read different row groups in parallel. + + **Important:** Since each batch becomes a separate Parquet file, `row_group_size` should + be less than or equal to `batch.max_events`. Row groups cannot span multiple files. + If omitted, defaults to the batch size. + + **Trade-offs:** + - **Larger row groups** (500K-1M rows): Better compression, less query parallelism + - **Smaller row groups** (50K-100K rows): More query parallelism, slightly worse compression + + For AWS Athena, row groups of 128-256 MB (uncompressed) are often recommended. + + ### allow_nullable_fields + + When enabled, missing or incompatible values will be encoded as NULL even for fields that + would normally be non-nullable. This is useful when working with downstream systems that + can handle NULL values through defaults or computed columns. + + ### Per-Column Bloom Filters + + Bloom filters are probabilistic data structures that can significantly improve query + performance by allowing query engines (like AWS Athena, Apache Spark, and Presto) to + skip entire row groups when searching for specific values without reading the actual data. + + **Only available when using explicit schema** (not available with automatic schema inference). + + When using an explicit schema, you can enable Bloom filters on a per-column basis + by setting `bloom_filter: true` in the field definition. This gives you fine-grained + control over which columns get Bloom filters. + + **When to use Bloom filters:** + - High-cardinality columns: UUIDs, user IDs, session IDs, transaction IDs + - String columns frequently used in WHERE clauses: URLs, emails, tags, names + - Point queries: `WHERE user_id = 'abc123'` + - IN clause queries: `WHERE id IN ('x', 'y', 'z')` + + **When NOT to use Bloom filters:** + - Low-cardinality columns (countries, status codes, boolean flags) + - Columns rarely used in WHERE clauses + - Range queries (Bloom filters don't help with `>`, `<`, `BETWEEN`) + + **Trade-offs:** + - **Pros**: Significantly faster queries (often 10-100x), better row group pruning, reduced I/O + - **Cons**: Slightly larger file sizes (typically 1-5% overhead), minimal write overhead + + **Configuration example:** + + ```yaml + schema: + user_id: + type: utf8 + bloom_filter: true # Enable for high-cardinality column + bloom_filter_num_distinct_values: 10000000 # Expected distinct values + bloom_filter_false_positive_pct: 0.01 # 1% false positive rate + event_name: + type: utf8 + bloom_filter: false # Skip for low-cardinality column + timestamp: + type: timestamp_microsecond + bloom_filter: false # Skip for timestamp (use sorting instead) + ``` + + **Per-column Bloom filter settings:** + + - **bloom_filter**: Enable Bloom filter for this column (default: `false`) + - **bloom_filter_num_distinct_values**: Expected number of distinct values for this column's Bloom filter + - Low cardinality (countries, states): `1,000` - `100,000` + - Medium cardinality (cities, products): `100,000` - `1,000,000` + - High cardinality (user IDs, UUIDs): `10,000,000+` + - If not specified, defaults to `1,000,000` + - Automatically capped to the `row_group_size` value + - **bloom_filter_false_positive_pct**: False positive probability for this column's Bloom filter + - `0.05` (5%): Good balance for general use + - `0.01` (1%): Better for high-selectivity queries where precision matters + - `0.10` (10%): Smaller filters when storage is a concern + - If not specified, defaults to `0.05` + + A false positive means the Bloom filter indicates a value *might* be in a row group when it + actually isn't, requiring the engine to read and filter that row group. Lower FPP means fewer + unnecessary reads but larger Bloom filters. + + ### sorting_columns + + Pre-sort rows by specified columns before writing to Parquet. This can significantly improve + both compression ratios and query performance, especially for time-series data and event logs. + + **Benefits:** + - **20-40% better compression**: Similar values are grouped together, improving compression + - **Faster queries**: More effective min/max statistics enable better row group skipping + - **Improved caching**: Query engines can cache sorted data more efficiently + + **Common patterns:** + - **Time-series data**: Sort by `timestamp` descending (most recent first) + - **Multi-tenant systems**: Sort by `tenant_id`, then `timestamp` + - **User analytics**: Sort by `user_id`, then `event_time` + - **Logs**: Sort by `timestamp`, then `severity` + + **Configuration:** + ```yaml + sorting_columns: + - column: timestamp + descending: true # Most recent first + - column: user_id + descending: false # A-Z order + ``` + + **Trade-offs:** + - **Write performance**: Adds 10-30% sorting overhead during encoding + - **Memory usage**: Requires buffering entire batch in memory for sorting + - **Most beneficial**: When queries frequently filter on sorted columns + + **When to use:** + - Enable for time-series data where you query recent events frequently + - Enable for multi-tenant data partitioned by tenant_id + - Skip if write latency is critical and queries don't benefit from sorting + + If not specified, rows are written in the order they appear in the batch. + + ## Batching Behavior + + Each batch of events becomes **one Parquet file** in S3. The batch size is controlled by: + - `batch.max_events`: Maximum number of events per file + - `batch.max_bytes`: Maximum bytes per file + - `batch.timeout_secs`: Maximum time to wait before flushing + + Example: With `max_events: 50000`, each Parquet file will contain up to 50,000 rows. + + ## Important Notes + + - **Sink-level compression**: Set `compression: none` at the sink level since Parquet + handles compression internally through its `parquet.compression` setting + - **Schema configuration**: You must choose either explicit schema or automatic schema + inference (`infer_schema: true`). For production use, explicit schemas are recommended + for consistency and access to advanced features like Bloom filters and sorting + - **All fields nullable**: Fields defined in explicit schemas are nullable by default, + allowing for missing values. Inferred schemas also create nullable fields + - **AWS Athena compatibility**: Use `gzip` or `snappy` compression for best Athena compatibility + """ + } + log_on_put: { title: "Emit a log when putting an object" body: """ diff --git a/website/cue/reference/components/sinks/generated/aws_s3.cue b/website/cue/reference/components/sinks/generated/aws_s3.cue index 6c6dffb4fb3cb..07f444c23aed0 100644 --- a/website/cue/reference/components/sinks/generated/aws_s3.cue +++ b/website/cue/reference/components/sinks/generated/aws_s3.cue @@ -497,6 +497,15 @@ generated: components: sinks: aws_s3: configuration: { [otlp]: https://opentelemetry.io/docs/specs/otlp/ """ + parquet: """ + Encodes events in [Apache Parquet][apache_parquet] columnar format. + + Parquet is a columnar storage format optimized for analytics workloads. + It provides efficient compression and encoding schemes, making it ideal + for long-term storage and query performance. + + [apache_parquet]: https://parquet.apache.org/ + """ protobuf: """ Encodes an event as a [Protobuf][protobuf] message. @@ -664,6 +673,324 @@ generated: components: sinks: aws_s3: configuration: { required: false type: array: items: type: string: {} } + parquet: { + description: "Apache Parquet-specific encoder options." + relevant_when: "codec = \"parquet\"" + required: true + type: object: options: { + allow_nullable_fields: { + description: """ + Allow null values for non-nullable fields in the schema. + + When enabled, missing or incompatible values will be encoded as null even for fields + marked as non-nullable in the Arrow schema. This is useful when working with downstream + systems that can handle null values through defaults, computed columns, or other mechanisms. + + When disabled (default), missing values for non-nullable fields will cause encoding errors, + ensuring all required data is present before writing to Parquet. + """ + required: false + type: bool: { + default: false + examples: [true] + } + } + compression: { + description: """ + Compression algorithm to use for Parquet columns + + Compression is applied to all columns in the Parquet file. + Snappy provides a good balance of speed and compression ratio. + """ + required: false + type: string: { + default: "snappy" + enum: { + brotli: "Brotli compression" + gzip: "GZIP compression (slower, better compression ratio)" + lz4: "LZ4 compression (very fast, moderate compression)" + snappy: "Snappy compression (fast, moderate compression ratio)" + uncompressed: "No compression" + zstd: "ZSTD compression (good balance of speed and compression)" + } + examples: ["snappy", "gzip", "zstd"] + } + } + compression_level: { + description: """ + Compression level for algorithms that support it. + + Only applies to ZSTD, GZIP, and Brotli compression. Ignored for other algorithms. + + **ZSTD levels** (1-22): + - 1-3: Fastest, moderate compression (level 3 is default) + - 4-9: Good balance of speed and compression + - 10-15: Better compression, slower encoding + - 16-22: Maximum compression, slowest (good for cold storage) + + **GZIP levels** (1-9): + - 1-3: Faster, less compression + - 6: Default balance (recommended) + - 9: Maximum compression, slowest + + **Brotli levels** (0-11): + - 0-4: Faster encoding + - 1: Default (recommended) + - 5-11: Better compression, slower + + Higher levels typically produce 20-50% smaller files but take 2-5x longer to encode. + Recommended: Use level 3-6 for hot data, 10-15 for cold storage. + """ + required: false + type: int: examples: [3, 6, 10] + } + exclude_columns: { + description: """ + Column names to exclude from Parquet encoding + + These columns will be completely excluded from the Parquet file. + Useful for filtering out metadata, internal fields, or temporary data. + + Only applies when `infer_schema` is enabled. Ignored when using explicit schema. + """ + required: false + type: array: items: type: string: examples: ["vec![\"_metadata\".to_string(), \"internal_id\".to_string()]"] + } + infer_schema: { + description: """ + Automatically infer schema from event data + + When enabled, the schema is inferred from each batch of events independently. + The schema is determined by examining the types of values in the events. + + **Type mapping:** + - String values → `utf8` + - Integer values → `int64` + - Float values → `float64` + - Boolean values → `boolean` + - Timestamp values → `timestamp_microsecond` + - Arrays/Objects → `utf8` (serialized as JSON) + + **Type conflicts:** If a field has different types across events in the same batch, + it will be encoded as `utf8` (string) and all values will be converted to strings. + + **Important:** Schema consistency across batches is the operator's responsibility. + Use VRL transforms to ensure consistent types if needed. Each batch may produce + a different schema if event structure varies. + + **Bloom filters:** Not supported with inferred schemas. Use explicit schema for Bloom filters. + + Mutually exclusive with `schema`. Must specify either `schema` or `infer_schema: true`. + """ + required: false + type: bool: { + default: false + examples: [true] + } + } + max_columns: { + description: """ + Maximum number of columns to encode + + Limits the number of columns in the Parquet file. Additional columns beyond + this limit will be silently dropped. Columns are selected in the order they + appear in the first event. + + Only applies when `infer_schema` is enabled. Ignored when using explicit schema. + """ + required: false + type: uint: { + default: 1000 + examples: [500, 1000] + } + } + row_group_size: { + description: """ + Number of rows per row group + + Row groups are Parquet's unit of parallelization. Larger row groups + can improve compression but increase memory usage during encoding. + + Since each batch becomes a separate Parquet file, this value + should be <= the batch max_events setting. Row groups cannot span multiple files. + If not specified, defaults to the batch size. + """ + required: false + type: uint: examples: [100000, 1000000] + } + schema: { + description: """ + The Arrow schema definition to use for encoding + + This schema defines the structure and types of the Parquet file columns. + Specified as a map of field names to data types. + + Mutually exclusive with `infer_schema`. Must specify either `schema` or `infer_schema: true`. + + Supported types: utf8, int8, int16, int32, int64, uint8, uint16, uint32, uint64, + float32, float64, boolean, binary, timestamp_second, timestamp_millisecond, + timestamp_microsecond, timestamp_nanosecond, date32, date64, and more. + """ + required: false + type: object: { + examples: [{ + id: { + bloom_filter: false + bloom_filter_false_positive_pct: null + bloom_filter_num_distinct_values: null + type: "int64" + } + name: { + bloom_filter: true + bloom_filter_false_positive_pct: 0.01 + bloom_filter_num_distinct_values: 1000000 + type: "utf8" + } + timestamp: { + bloom_filter: false + bloom_filter_false_positive_pct: null + bloom_filter_num_distinct_values: null + type: "timestamp_microsecond" + } + }] + options: "*": { + description: "A field definition specifying the data type and optional Bloom filter configuration." + required: true + type: object: options: { + bloom_filter: { + description: """ + Enable Bloom filter for this specific column + + When enabled, a Bloom filter will be created for this column to improve + query performance for point lookups and IN clauses. Only enable for + high-cardinality columns (UUIDs, user IDs, etc.) to avoid overhead. + """ + required: false + type: bool: { + default: false + examples: [true] + } + } + bloom_filter_false_positive_pct: { + description: """ + False positive probability for this column's Bloom filter (as a percentage) + + Lower values create larger but more accurate filters. + + - 0.05 (5%): Good balance for general use + - 0.01 (1%): Better for high-selectivity queries + """ + required: false + type: float: examples: [0.05, 0.01] + } + bloom_filter_num_distinct_values: { + description: """ + Number of distinct values expected for this column's Bloom filter + + This controls the size of the Bloom filter. Should match the actual + cardinality of the column. Will be automatically capped to the batch size. + + - Low cardinality (countries, states): 1,000 - 100,000 + - Medium cardinality (cities, products): 100,000 - 1,000,000 + - High cardinality (UUIDs, user IDs): 10,000,000+ + """ + required: false + type: uint: examples: [1000000, 10000000] + } + type: { + description: "Data type for this field" + required: true + type: string: examples: ["utf8", "int64", "timestamp_ms"] + } + } + } + } + } + sorting_columns: { + description: """ + Sorting order for rows within row groups. + + Pre-sorting rows by specified columns before writing can significantly improve both + compression ratios and query performance. This is especially valuable for time-series + data and event logs. + + **Benefits:** + - **Better compression** (20-40% smaller files): Similar values are grouped together + - **Faster queries**: More effective min/max statistics enable better row group skipping + - **Improved caching**: Query engines can more efficiently cache sorted data + + **Common patterns:** + - Time-series: Sort by timestamp descending (most recent first) + - Multi-tenant: Sort by tenant_id, then timestamp + - User analytics: Sort by user_id, then event_time + + **Trade-offs:** + - Adds sorting overhead during encoding (typically 10-30% slower writes) + - Requires buffering entire batch in memory for sorting + - Most beneficial when queries frequently filter on sorted columns + + **Example:** + ```yaml + sorting_columns: + - column: timestamp + descending: true + - column: user_id + descending: false + ``` + + If not specified, rows are written in the order they appear in the batch. + """ + required: false + type: array: items: type: object: options: { + column: { + description: "Name of the column to sort by" + required: true + type: string: examples: ["timestamp", "user_id"] + } + descending: { + description: """ + Sort in descending order (true) or ascending order (false) + + - `true`: Descending (Z-A, 9-0, newest-oldest) + - `false`: Ascending (A-Z, 0-9, oldest-newest) + """ + required: false + type: bool: { + default: false + examples: [true] + } + } + } + } + writer_version: { + description: """ + Parquet format writer version. + + Controls which Parquet format version to write: + - **v1** (PARQUET_1_0): Original format, maximum compatibility (default) + - **v2** (PARQUET_2_0): Modern format with improved encoding and statistics + + Version 2 benefits: + - More efficient encoding for certain data types (10-20% smaller files) + - Better statistics for query optimization + - Improved data page format + - Required for some advanced features + + Use v1 for maximum compatibility with older readers (pre-2018 tools). + Use v2 for better performance with modern query engines (Athena, Spark, Presto). + """ + required: false + type: string: { + default: "v2" + enum: { + v1: "Parquet format version 1.0 (maximum compatibility)" + v2: "Parquet format version 2.0 (modern format with better encoding)" + } + examples: ["v1", "v2"] + } + } + } + } protobuf: { description: "Options for the Protobuf serializer." relevant_when: "codec = \"protobuf\"" diff --git a/website/cue/reference/components/sinks/generated/azure_blob.cue b/website/cue/reference/components/sinks/generated/azure_blob.cue index 5ea0c1dd20221..7b28bbb25cfbb 100644 --- a/website/cue/reference/components/sinks/generated/azure_blob.cue +++ b/website/cue/reference/components/sinks/generated/azure_blob.cue @@ -343,6 +343,15 @@ generated: components: sinks: azure_blob: configuration: { [otlp]: https://opentelemetry.io/docs/specs/otlp/ """ + parquet: """ + Encodes events in [Apache Parquet][apache_parquet] columnar format. + + Parquet is a columnar storage format optimized for analytics workloads. + It provides efficient compression and encoding schemes, making it ideal + for long-term storage and query performance. + + [apache_parquet]: https://parquet.apache.org/ + """ protobuf: """ Encodes an event as a [Protobuf][protobuf] message. @@ -510,6 +519,324 @@ generated: components: sinks: azure_blob: configuration: { required: false type: array: items: type: string: {} } + parquet: { + description: "Apache Parquet-specific encoder options." + relevant_when: "codec = \"parquet\"" + required: true + type: object: options: { + allow_nullable_fields: { + description: """ + Allow null values for non-nullable fields in the schema. + + When enabled, missing or incompatible values will be encoded as null even for fields + marked as non-nullable in the Arrow schema. This is useful when working with downstream + systems that can handle null values through defaults, computed columns, or other mechanisms. + + When disabled (default), missing values for non-nullable fields will cause encoding errors, + ensuring all required data is present before writing to Parquet. + """ + required: false + type: bool: { + default: false + examples: [true] + } + } + compression: { + description: """ + Compression algorithm to use for Parquet columns + + Compression is applied to all columns in the Parquet file. + Snappy provides a good balance of speed and compression ratio. + """ + required: false + type: string: { + default: "snappy" + enum: { + brotli: "Brotli compression" + gzip: "GZIP compression (slower, better compression ratio)" + lz4: "LZ4 compression (very fast, moderate compression)" + snappy: "Snappy compression (fast, moderate compression ratio)" + uncompressed: "No compression" + zstd: "ZSTD compression (good balance of speed and compression)" + } + examples: ["snappy", "gzip", "zstd"] + } + } + compression_level: { + description: """ + Compression level for algorithms that support it. + + Only applies to ZSTD, GZIP, and Brotli compression. Ignored for other algorithms. + + **ZSTD levels** (1-22): + - 1-3: Fastest, moderate compression (level 3 is default) + - 4-9: Good balance of speed and compression + - 10-15: Better compression, slower encoding + - 16-22: Maximum compression, slowest (good for cold storage) + + **GZIP levels** (1-9): + - 1-3: Faster, less compression + - 6: Default balance (recommended) + - 9: Maximum compression, slowest + + **Brotli levels** (0-11): + - 0-4: Faster encoding + - 1: Default (recommended) + - 5-11: Better compression, slower + + Higher levels typically produce 20-50% smaller files but take 2-5x longer to encode. + Recommended: Use level 3-6 for hot data, 10-15 for cold storage. + """ + required: false + type: int: examples: [3, 6, 10] + } + exclude_columns: { + description: """ + Column names to exclude from Parquet encoding + + These columns will be completely excluded from the Parquet file. + Useful for filtering out metadata, internal fields, or temporary data. + + Only applies when `infer_schema` is enabled. Ignored when using explicit schema. + """ + required: false + type: array: items: type: string: examples: ["vec![\"_metadata\".to_string(), \"internal_id\".to_string()]"] + } + infer_schema: { + description: """ + Automatically infer schema from event data + + When enabled, the schema is inferred from each batch of events independently. + The schema is determined by examining the types of values in the events. + + **Type mapping:** + - String values → `utf8` + - Integer values → `int64` + - Float values → `float64` + - Boolean values → `boolean` + - Timestamp values → `timestamp_microsecond` + - Arrays/Objects → `utf8` (serialized as JSON) + + **Type conflicts:** If a field has different types across events in the same batch, + it will be encoded as `utf8` (string) and all values will be converted to strings. + + **Important:** Schema consistency across batches is the operator's responsibility. + Use VRL transforms to ensure consistent types if needed. Each batch may produce + a different schema if event structure varies. + + **Bloom filters:** Not supported with inferred schemas. Use explicit schema for Bloom filters. + + Mutually exclusive with `schema`. Must specify either `schema` or `infer_schema: true`. + """ + required: false + type: bool: { + default: false + examples: [true] + } + } + max_columns: { + description: """ + Maximum number of columns to encode + + Limits the number of columns in the Parquet file. Additional columns beyond + this limit will be silently dropped. Columns are selected in the order they + appear in the first event. + + Only applies when `infer_schema` is enabled. Ignored when using explicit schema. + """ + required: false + type: uint: { + default: 1000 + examples: [500, 1000] + } + } + row_group_size: { + description: """ + Number of rows per row group + + Row groups are Parquet's unit of parallelization. Larger row groups + can improve compression but increase memory usage during encoding. + + Since each batch becomes a separate Parquet file, this value + should be <= the batch max_events setting. Row groups cannot span multiple files. + If not specified, defaults to the batch size. + """ + required: false + type: uint: examples: [100000, 1000000] + } + schema: { + description: """ + The Arrow schema definition to use for encoding + + This schema defines the structure and types of the Parquet file columns. + Specified as a map of field names to data types. + + Mutually exclusive with `infer_schema`. Must specify either `schema` or `infer_schema: true`. + + Supported types: utf8, int8, int16, int32, int64, uint8, uint16, uint32, uint64, + float32, float64, boolean, binary, timestamp_second, timestamp_millisecond, + timestamp_microsecond, timestamp_nanosecond, date32, date64, and more. + """ + required: false + type: object: { + examples: [{ + id: { + bloom_filter: false + bloom_filter_false_positive_pct: null + bloom_filter_num_distinct_values: null + type: "int64" + } + name: { + bloom_filter: true + bloom_filter_false_positive_pct: 0.01 + bloom_filter_num_distinct_values: 1000000 + type: "utf8" + } + timestamp: { + bloom_filter: false + bloom_filter_false_positive_pct: null + bloom_filter_num_distinct_values: null + type: "timestamp_microsecond" + } + }] + options: "*": { + description: "A field definition specifying the data type and optional Bloom filter configuration." + required: true + type: object: options: { + bloom_filter: { + description: """ + Enable Bloom filter for this specific column + + When enabled, a Bloom filter will be created for this column to improve + query performance for point lookups and IN clauses. Only enable for + high-cardinality columns (UUIDs, user IDs, etc.) to avoid overhead. + """ + required: false + type: bool: { + default: false + examples: [true] + } + } + bloom_filter_false_positive_pct: { + description: """ + False positive probability for this column's Bloom filter (as a percentage) + + Lower values create larger but more accurate filters. + + - 0.05 (5%): Good balance for general use + - 0.01 (1%): Better for high-selectivity queries + """ + required: false + type: float: examples: [0.05, 0.01] + } + bloom_filter_num_distinct_values: { + description: """ + Number of distinct values expected for this column's Bloom filter + + This controls the size of the Bloom filter. Should match the actual + cardinality of the column. Will be automatically capped to the batch size. + + - Low cardinality (countries, states): 1,000 - 100,000 + - Medium cardinality (cities, products): 100,000 - 1,000,000 + - High cardinality (UUIDs, user IDs): 10,000,000+ + """ + required: false + type: uint: examples: [1000000, 10000000] + } + type: { + description: "Data type for this field" + required: true + type: string: examples: ["utf8", "int64", "timestamp_ms"] + } + } + } + } + } + sorting_columns: { + description: """ + Sorting order for rows within row groups. + + Pre-sorting rows by specified columns before writing can significantly improve both + compression ratios and query performance. This is especially valuable for time-series + data and event logs. + + **Benefits:** + - **Better compression** (20-40% smaller files): Similar values are grouped together + - **Faster queries**: More effective min/max statistics enable better row group skipping + - **Improved caching**: Query engines can more efficiently cache sorted data + + **Common patterns:** + - Time-series: Sort by timestamp descending (most recent first) + - Multi-tenant: Sort by tenant_id, then timestamp + - User analytics: Sort by user_id, then event_time + + **Trade-offs:** + - Adds sorting overhead during encoding (typically 10-30% slower writes) + - Requires buffering entire batch in memory for sorting + - Most beneficial when queries frequently filter on sorted columns + + **Example:** + ```yaml + sorting_columns: + - column: timestamp + descending: true + - column: user_id + descending: false + ``` + + If not specified, rows are written in the order they appear in the batch. + """ + required: false + type: array: items: type: object: options: { + column: { + description: "Name of the column to sort by" + required: true + type: string: examples: ["timestamp", "user_id"] + } + descending: { + description: """ + Sort in descending order (true) or ascending order (false) + + - `true`: Descending (Z-A, 9-0, newest-oldest) + - `false`: Ascending (A-Z, 0-9, oldest-newest) + """ + required: false + type: bool: { + default: false + examples: [true] + } + } + } + } + writer_version: { + description: """ + Parquet format writer version. + + Controls which Parquet format version to write: + - **v1** (PARQUET_1_0): Original format, maximum compatibility (default) + - **v2** (PARQUET_2_0): Modern format with improved encoding and statistics + + Version 2 benefits: + - More efficient encoding for certain data types (10-20% smaller files) + - Better statistics for query optimization + - Improved data page format + - Required for some advanced features + + Use v1 for maximum compatibility with older readers (pre-2018 tools). + Use v2 for better performance with modern query engines (Athena, Spark, Presto). + """ + required: false + type: string: { + default: "v2" + enum: { + v1: "Parquet format version 1.0 (maximum compatibility)" + v2: "Parquet format version 2.0 (modern format with better encoding)" + } + examples: ["v1", "v2"] + } + } + } + } protobuf: { description: "Options for the Protobuf serializer." relevant_when: "codec = \"protobuf\"" diff --git a/website/cue/reference/components/sinks/generated/clickhouse.cue b/website/cue/reference/components/sinks/generated/clickhouse.cue index a0f4399bdc112..7141f9652815f 100644 --- a/website/cue/reference/components/sinks/generated/clickhouse.cue +++ b/website/cue/reference/components/sinks/generated/clickhouse.cue @@ -263,19 +263,13 @@ generated: components: sinks: clickhouse: configuration: { When disabled (default), missing values for non-nullable fields will cause encoding errors, ensuring all required data is present before sending to the sink. """ - required: false + relevant_when: "codec = \"arrow_stream\"" + required: false type: bool: default: false } codec: { - description: """ - Encodes events in [Apache Arrow][apache_arrow] IPC streaming format. - - This is the streaming variant of the Arrow IPC format, which writes - a continuous stream of record batches. - - [apache_arrow]: https://arrow.apache.org/ - """ - required: true + description: "The codec to use for batch encoding events." + required: true type: string: enum: arrow_stream: """ Encodes events in [Apache Arrow][apache_arrow] IPC streaming format. diff --git a/website/cue/reference/components/sinks/generated/gcp_cloud_storage.cue b/website/cue/reference/components/sinks/generated/gcp_cloud_storage.cue index 17681f2adb96c..5c22a42cbdafb 100644 --- a/website/cue/reference/components/sinks/generated/gcp_cloud_storage.cue +++ b/website/cue/reference/components/sinks/generated/gcp_cloud_storage.cue @@ -358,6 +358,15 @@ generated: components: sinks: gcp_cloud_storage: configuration: { [otlp]: https://opentelemetry.io/docs/specs/otlp/ """ + parquet: """ + Encodes events in [Apache Parquet][apache_parquet] columnar format. + + Parquet is a columnar storage format optimized for analytics workloads. + It provides efficient compression and encoding schemes, making it ideal + for long-term storage and query performance. + + [apache_parquet]: https://parquet.apache.org/ + """ protobuf: """ Encodes an event as a [Protobuf][protobuf] message. @@ -525,6 +534,324 @@ generated: components: sinks: gcp_cloud_storage: configuration: { required: false type: array: items: type: string: {} } + parquet: { + description: "Apache Parquet-specific encoder options." + relevant_when: "codec = \"parquet\"" + required: true + type: object: options: { + allow_nullable_fields: { + description: """ + Allow null values for non-nullable fields in the schema. + + When enabled, missing or incompatible values will be encoded as null even for fields + marked as non-nullable in the Arrow schema. This is useful when working with downstream + systems that can handle null values through defaults, computed columns, or other mechanisms. + + When disabled (default), missing values for non-nullable fields will cause encoding errors, + ensuring all required data is present before writing to Parquet. + """ + required: false + type: bool: { + default: false + examples: [true] + } + } + compression: { + description: """ + Compression algorithm to use for Parquet columns + + Compression is applied to all columns in the Parquet file. + Snappy provides a good balance of speed and compression ratio. + """ + required: false + type: string: { + default: "snappy" + enum: { + brotli: "Brotli compression" + gzip: "GZIP compression (slower, better compression ratio)" + lz4: "LZ4 compression (very fast, moderate compression)" + snappy: "Snappy compression (fast, moderate compression ratio)" + uncompressed: "No compression" + zstd: "ZSTD compression (good balance of speed and compression)" + } + examples: ["snappy", "gzip", "zstd"] + } + } + compression_level: { + description: """ + Compression level for algorithms that support it. + + Only applies to ZSTD, GZIP, and Brotli compression. Ignored for other algorithms. + + **ZSTD levels** (1-22): + - 1-3: Fastest, moderate compression (level 3 is default) + - 4-9: Good balance of speed and compression + - 10-15: Better compression, slower encoding + - 16-22: Maximum compression, slowest (good for cold storage) + + **GZIP levels** (1-9): + - 1-3: Faster, less compression + - 6: Default balance (recommended) + - 9: Maximum compression, slowest + + **Brotli levels** (0-11): + - 0-4: Faster encoding + - 1: Default (recommended) + - 5-11: Better compression, slower + + Higher levels typically produce 20-50% smaller files but take 2-5x longer to encode. + Recommended: Use level 3-6 for hot data, 10-15 for cold storage. + """ + required: false + type: int: examples: [3, 6, 10] + } + exclude_columns: { + description: """ + Column names to exclude from Parquet encoding + + These columns will be completely excluded from the Parquet file. + Useful for filtering out metadata, internal fields, or temporary data. + + Only applies when `infer_schema` is enabled. Ignored when using explicit schema. + """ + required: false + type: array: items: type: string: examples: ["vec![\"_metadata\".to_string(), \"internal_id\".to_string()]"] + } + infer_schema: { + description: """ + Automatically infer schema from event data + + When enabled, the schema is inferred from each batch of events independently. + The schema is determined by examining the types of values in the events. + + **Type mapping:** + - String values → `utf8` + - Integer values → `int64` + - Float values → `float64` + - Boolean values → `boolean` + - Timestamp values → `timestamp_microsecond` + - Arrays/Objects → `utf8` (serialized as JSON) + + **Type conflicts:** If a field has different types across events in the same batch, + it will be encoded as `utf8` (string) and all values will be converted to strings. + + **Important:** Schema consistency across batches is the operator's responsibility. + Use VRL transforms to ensure consistent types if needed. Each batch may produce + a different schema if event structure varies. + + **Bloom filters:** Not supported with inferred schemas. Use explicit schema for Bloom filters. + + Mutually exclusive with `schema`. Must specify either `schema` or `infer_schema: true`. + """ + required: false + type: bool: { + default: false + examples: [true] + } + } + max_columns: { + description: """ + Maximum number of columns to encode + + Limits the number of columns in the Parquet file. Additional columns beyond + this limit will be silently dropped. Columns are selected in the order they + appear in the first event. + + Only applies when `infer_schema` is enabled. Ignored when using explicit schema. + """ + required: false + type: uint: { + default: 1000 + examples: [500, 1000] + } + } + row_group_size: { + description: """ + Number of rows per row group + + Row groups are Parquet's unit of parallelization. Larger row groups + can improve compression but increase memory usage during encoding. + + Since each batch becomes a separate Parquet file, this value + should be <= the batch max_events setting. Row groups cannot span multiple files. + If not specified, defaults to the batch size. + """ + required: false + type: uint: examples: [100000, 1000000] + } + schema: { + description: """ + The Arrow schema definition to use for encoding + + This schema defines the structure and types of the Parquet file columns. + Specified as a map of field names to data types. + + Mutually exclusive with `infer_schema`. Must specify either `schema` or `infer_schema: true`. + + Supported types: utf8, int8, int16, int32, int64, uint8, uint16, uint32, uint64, + float32, float64, boolean, binary, timestamp_second, timestamp_millisecond, + timestamp_microsecond, timestamp_nanosecond, date32, date64, and more. + """ + required: false + type: object: { + examples: [{ + id: { + bloom_filter: false + bloom_filter_false_positive_pct: null + bloom_filter_num_distinct_values: null + type: "int64" + } + name: { + bloom_filter: true + bloom_filter_false_positive_pct: 0.01 + bloom_filter_num_distinct_values: 1000000 + type: "utf8" + } + timestamp: { + bloom_filter: false + bloom_filter_false_positive_pct: null + bloom_filter_num_distinct_values: null + type: "timestamp_microsecond" + } + }] + options: "*": { + description: "A field definition specifying the data type and optional Bloom filter configuration." + required: true + type: object: options: { + bloom_filter: { + description: """ + Enable Bloom filter for this specific column + + When enabled, a Bloom filter will be created for this column to improve + query performance for point lookups and IN clauses. Only enable for + high-cardinality columns (UUIDs, user IDs, etc.) to avoid overhead. + """ + required: false + type: bool: { + default: false + examples: [true] + } + } + bloom_filter_false_positive_pct: { + description: """ + False positive probability for this column's Bloom filter (as a percentage) + + Lower values create larger but more accurate filters. + + - 0.05 (5%): Good balance for general use + - 0.01 (1%): Better for high-selectivity queries + """ + required: false + type: float: examples: [0.05, 0.01] + } + bloom_filter_num_distinct_values: { + description: """ + Number of distinct values expected for this column's Bloom filter + + This controls the size of the Bloom filter. Should match the actual + cardinality of the column. Will be automatically capped to the batch size. + + - Low cardinality (countries, states): 1,000 - 100,000 + - Medium cardinality (cities, products): 100,000 - 1,000,000 + - High cardinality (UUIDs, user IDs): 10,000,000+ + """ + required: false + type: uint: examples: [1000000, 10000000] + } + type: { + description: "Data type for this field" + required: true + type: string: examples: ["utf8", "int64", "timestamp_ms"] + } + } + } + } + } + sorting_columns: { + description: """ + Sorting order for rows within row groups. + + Pre-sorting rows by specified columns before writing can significantly improve both + compression ratios and query performance. This is especially valuable for time-series + data and event logs. + + **Benefits:** + - **Better compression** (20-40% smaller files): Similar values are grouped together + - **Faster queries**: More effective min/max statistics enable better row group skipping + - **Improved caching**: Query engines can more efficiently cache sorted data + + **Common patterns:** + - Time-series: Sort by timestamp descending (most recent first) + - Multi-tenant: Sort by tenant_id, then timestamp + - User analytics: Sort by user_id, then event_time + + **Trade-offs:** + - Adds sorting overhead during encoding (typically 10-30% slower writes) + - Requires buffering entire batch in memory for sorting + - Most beneficial when queries frequently filter on sorted columns + + **Example:** + ```yaml + sorting_columns: + - column: timestamp + descending: true + - column: user_id + descending: false + ``` + + If not specified, rows are written in the order they appear in the batch. + """ + required: false + type: array: items: type: object: options: { + column: { + description: "Name of the column to sort by" + required: true + type: string: examples: ["timestamp", "user_id"] + } + descending: { + description: """ + Sort in descending order (true) or ascending order (false) + + - `true`: Descending (Z-A, 9-0, newest-oldest) + - `false`: Ascending (A-Z, 0-9, oldest-newest) + """ + required: false + type: bool: { + default: false + examples: [true] + } + } + } + } + writer_version: { + description: """ + Parquet format writer version. + + Controls which Parquet format version to write: + - **v1** (PARQUET_1_0): Original format, maximum compatibility (default) + - **v2** (PARQUET_2_0): Modern format with improved encoding and statistics + + Version 2 benefits: + - More efficient encoding for certain data types (10-20% smaller files) + - Better statistics for query optimization + - Improved data page format + - Required for some advanced features + + Use v1 for maximum compatibility with older readers (pre-2018 tools). + Use v2 for better performance with modern query engines (Athena, Spark, Presto). + """ + required: false + type: string: { + default: "v2" + enum: { + v1: "Parquet format version 1.0 (maximum compatibility)" + v2: "Parquet format version 2.0 (modern format with better encoding)" + } + examples: ["v1", "v2"] + } + } + } + } protobuf: { description: "Options for the Protobuf serializer." relevant_when: "codec = \"protobuf\"" diff --git a/website/cue/reference/urls.cue b/website/cue/reference/urls.cue index 079d8378e2696..2c96b6937cec3 100644 --- a/website/cue/reference/urls.cue +++ b/website/cue/reference/urls.cue @@ -19,6 +19,7 @@ urls: { apache_extended_status: "\(apache)/docs/current/mod/core.html#extendedstatus" apache_install: "\(apache)/docs/current/install.html" apache_mod_status: "http://httpd.apache.org/docs/current/mod/mod_status.html" + apache_parquet: "https://parquet.apache.org/" apt: "\(wikipedia)/wiki/APT_(software)" arm: "\(wikipedia)/wiki/ARM_architecture" aws_access_keys: "\(aws_docs)/IAM/latest/UserGuide/id_credentials_access-keys.html"