Skip to content

feat(aws_s3 sink): add Parquet encoder with schema_file and auto infer schema support#25156

Open
petere-datadog wants to merge 24 commits intomasterfrom
peter.ehik/aws-s3-parquet-encoding
Open

feat(aws_s3 sink): add Parquet encoder with schema_file and auto infer schema support#25156
petere-datadog wants to merge 24 commits intomasterfrom
peter.ehik/aws-s3-parquet-encoding

Conversation

@petere-datadog
Copy link
Copy Markdown

@petere-datadog petere-datadog commented Apr 9, 2026

Summary

  • Added support for parquet encoding for the aws s3 sink
  • Options supported:
    • schema_mode: auto_infer, relaxed, strict
    • schema_file: Only needed if schema mode is not auto_infer

This PR was initially started by @szibis: #24706

Vector configuration

auto-infer.yaml

sources:
  demo:
    type: demo_logs
    format: apache_common
    interval: 0.1

sinks:
  s3_parquet:
    type: aws_s3
    inputs:
      - demo
    bucket: obs-pipelines-e2e-tests
    region: us-east-1
    key_prefix: "peter-test/demo_logs/dt=%Y%m%d/hour=%H/"
    filename_time_format: "%s"
    filename_append_uuid: true
    compression: none  # Parquet handles its own compression internally

    # Standard per-event encoding is still required by the field even when
    # batch_encoding takes over. Set it to text as a no-op placeholder.
    encoding:
      codec: text

    batch_encoding:
      codec: parquet
      schema_mode: auto_infer
      compression: 
        algorithm: gzip
        level: 9

    batch:
      max_events: 10000
      timeout_secs: 5

schema-file.yaml

sources:
  demo:
    type: demo_logs
    format: apache_common
    interval: 0.1

sinks:
  s3_parquet:
    type: aws_s3
    inputs:
      - demo
    bucket: obs-pipelines-e2e-tests
    region: us-east-1
    key_prefix: "peter-test/demo_logs/dt=%Y%m%d/hour=%H/"
    filename_time_format: "%s"
    filename_append_uuid: true
    compression: none  # Parquet handles its own compression internally

    # Standard per-event encoding is still required by the field even when
    # batch_encoding takes over. Set it to text as a no-op placeholder.
    encoding:
      codec: text

    batch_encoding:
      codec: parquet
      schema_mode: strict
      schema_file: /Users/peter.ehikhuemen/go/src/github.com/DataDog/vectordotdev/vector/local/apache-common.schema
      compression: 
        algorithm: snappy

    batch:
      max_events: 10000
      timeout_secs: 5

apache-common.schema

message arrow_schema {
  optional binary host (STRING);
  optional binary message (STRING);
  optional binary service (STRING);
  optional binary source_type (STRING);
  optional int64 timestamp (TIMESTAMP(MICROS,true));
}

How did you test this PR?

cargo run --features "codecs-parquet" --  --config local/configs/aws-s3-sink-parquet-encoding.yaml

Change Type

  • Bug fix
  • New feature
  • Dependencies
  • Non-functional (chore, refactoring, docs)
  • Performance

Is this a breaking change?

  • Yes
  • No

All the new stuff is added under features: codec-parquet.

Does this PR include user facing changes?

  • Yes. Please add a changelog fragment based on our guidelines.
  • No. A maintainer will apply the no-changelog label to this PR.

Notes

  • Please read our Vector contributor resources.
  • Do not hesitate to use @vectordotdev/vector to reach out to us regarding this PR.
  • Some CI checks run only after we manually approve them.
    • We recommend adding a pre-push hook, please see this template.
    • Alternatively, we recommend running the following locally before pushing to the remote branch:
      • make fmt
      • make check-clippy (if there are failures it's possible some of them can be fixed with make clippy-fix)
      • make test
  • After a review is requested, please avoid force pushes to help us review incrementally.
    • Feel free to push as many commits as you want. They will be squashed into one before merging.
    • For example, you can run git merge origin master and git push.
  • If this PR introduces changes Vector dependencies (modifies Cargo.lock), please
    run make build-licenses to regenerate the license inventory and commit the changes (if any). More details on the dd-rust-license-tool.

Keep JSON-based build_record_batch/find_null_non_nullable_fields for
Parquet compatibility. Drop unused serde_arrow dep. Regenerate Cargo.lock.

Made-with: Cursor
@petere-datadog petere-datadog requested review from a team as code owners April 9, 2026 19:32
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 9, 2026

All contributors have signed the CLA ✍️ ✅
Posted by the CLA Assistant Lite bot.

@github-actions github-actions bot added domain: sinks Anything related to the Vector's sinks domain: ci Anything related to Vector's CI environment domain: external docs Anything related to Vector's external, public documentation labels Apr 9, 2026
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 1eaa921f96

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 03a4544905

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

@pront
Copy link
Copy Markdown
Member

pront commented Apr 10, 2026

Hey @petere-datadog, while I am taking a look at this PR please see this #25156 (comment) and the codex review comments.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ca201c5078

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 6dcc854103

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 08b9b547f4

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

@datadog-vectordotdev
Copy link
Copy Markdown

datadog-vectordotdev bot commented Apr 10, 2026

✅ Tests

🎉 All green!

❄️ No new flaky tests detected
🧪 All tests passed

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: 589f483 | Docs | Was this helpful? Give us feedback!

@petere-datadog petere-datadog changed the title feat(codecs): add Parquet encoder with schema_file and schema_mode: strict, relaxed or auto_infer feat(aws_s3_sink): add Parquet encoder with schema_file and auto infer schema support Apr 10, 2026
@petere-datadog petere-datadog changed the title feat(aws_s3_sink): add Parquet encoder with schema_file and auto infer schema support feat(aws_s3 sink): add Parquet encoder with schema_file and auto infer schema support Apr 10, 2026
@petere-datadog
Copy link
Copy Markdown
Author

recheck

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 2c3a74d8d3

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 8d6c9c2ea9

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +297 to +299
if json_values.is_empty() {
return Ok(());
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Reject Parquet batches that serialize to zero rows

When a non-empty batch contains only events that fail serde_json::to_value (for example logs with non-finite floats), json_values becomes empty and this branch returns success. The request-builder path then proceeds with an empty payload and finalizes the whole batch as delivered, which silently drops all events and can create empty .parquet objects. This should return an error (or otherwise skip request creation) whenever input events were present but no rows were encodable.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really an error if there's no encodable events, if anything this should be handled upstream

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0f8dde57d0

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

@petere-datadog
Copy link
Copy Markdown
Author

recheck

@petere-datadog
Copy link
Copy Markdown
Author

I have read the CLA Document and I hereby sign the CLA

Copy link
Copy Markdown
Member

@pront pront left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some things that stood out. I will take another look.

@Himan10
Copy link
Copy Markdown

Himan10 commented Apr 14, 2026

Hey, we are currently deploying vector on your UAT instances to test few workflows. One of our use-case consists of having logs in the parquet format with gzip compression. I did check some documentation but couldn't find any good resources on it. Could you let me know when this PR is going to be merged or if there's any resource that I can read on how to set-up.

Copy link
Copy Markdown
Contributor

@tessneau tessneau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice ! overall seems good to me, just some non-blockers, thanks for all the tests

Comment on lines +308 to +309
if !self.schema_field_names.contains(top_level.as_str()) {
return Err(Box::new(ArrowEncodingError::SchemaFetchError {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should emit the events dropped here, it feels a bit ambiguous since the user is choosing strict mode so it may be expected that not all events match the schema but since we'd be dropping the whole batch I think it makes sense

Suggested change
if !self.schema_field_names.contains(top_level.as_str()) {
return Err(Box::new(ArrowEncodingError::SchemaFetchError {
if !self.schema_field_names.contains(top_level.as_str()) {
self.events_dropped_handle.emit(Count(events.len()));
return Err(Box::new(ArrowEncodingError::SchemaFetchError {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I looked and we don't emit dropped events metric when encode returns an err and that's a problem I think, we should be emitting that higher up and I think we should fix this there. If I update that specific metric count here then I have to do it everywehre we use a "?" and that's not clean at all. So for now I'd rather not emit dropped events metric after schema validation fails, but we definitely need to include that upstream

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @pront

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All points are correct here 😅 Yes, ideally we would have this emitted upstream but that is a project on its own. Practically, the strict-mode path knows exactly how many events are being dropped and should emit the metric. We are already emitting event droppped events in this function already, so it would be inconsistent not emitting here.

I understand there are many cases that can fail here so we can wrap this whole logic like so:

fn encode(&mut self, events: Vec<Event>, buffer: &mut BytesMut) -> Result<(), Self::Error> {
    if events.is_empty() {
        return Ok(());
    }

    let count = events.len();
    let result = self.try_encode(events, buffer);
    if result.is_err() {
        self.events_dropped_handle.emit(Count(count));
    }
    result
}

This way, we will emit the metric no matter what error happens.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm the AutoInfer and build_record_batch paths already handle it themselves, maybe the simplest solution is to handle it manually here then in all paths.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think we can revisit this in another PR, do we know anyone from vector/documentaiton who can approve this?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @petere-datadog, since dropped events is actually an important aspect of Vector's observability contract, I'd like to see this addressed before we merge this PR. It's not a big lift, we just need to cover the error paths in this function that drop events, starting with the one @tessneau pointed out.

@petere-datadog
Copy link
Copy Markdown
Author

Hey, we are currently deploying vector on your UAT instances to test few workflows. One of our use-case consists of having logs in the parquet format with gzip compression. I did check some documentation but couldn't find any good resources on it. Could you let me know when this PR is going to be merged or if there's any resource that I can read on how to set-up.

It should be merged later today and yeah this will support encoding batched events with parquet and gzip compression

@maycmlee maycmlee self-assigned this Apr 14, 2026
futures.workspace = true
influxdb-line-protocol = { version = "2", default-features = false }
lookup = { package = "vector-lookup", path = "../vector-lookup", default-features = false, features = ["test"] }
lookup = { package = "vector-lookup", path = "../vector-lookup", default-features = false, features = [
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: please revert unrelated formatting changes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

domain: ci Anything related to Vector's CI environment domain: external docs Anything related to Vector's external, public documentation domain: sinks Anything related to the Vector's sinks under_review

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants