Skip to content

Conversation

@PhongChuong
Copy link
Collaborator

Pause the Publisher when we encounter an error when there is a send error.

When an error is encountered for a pending batch, we:

  1. In BatchWorker, pause publishing and send out errors for pending_msgs.
  2. In the pending batch, send out error for its messages.
  3. New messages in the rx receiver are handled as they are received by the BatchWorker.

A resume operation will be added in a later PR.

This PR also introduce PublishError. Further work is needed to handle error propagation more fully

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Jan 15, 2026
@codecov
Copy link

codecov bot commented Jan 15, 2026

Codecov Report

❌ Patch coverage is 88.23529% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 94.82%. Comparing base (13bef44) to head (7030904).
⚠️ Report is 13 commits behind head on main.

Files with missing lines Patch % Lines
src/pubsub/src/publisher/worker.rs 66.66% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4286      +/-   ##
==========================================
- Coverage   94.85%   94.82%   -0.04%     
==========================================
  Files         187      187              
  Lines        7194     7207      +13     
==========================================
+ Hits         6824     6834      +10     
- Misses        370      373       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@PhongChuong PhongChuong marked this pull request as ready for review January 15, 2026 16:35
@PhongChuong PhongChuong requested a review from a team as a code owner January 15, 2026 16:35
@PhongChuong PhongChuong requested a review from suzmue January 15, 2026 16:35
Copy link
Member

@dbolduc dbolduc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get to the changes in worker.rs

Consider doing the error type refactor first, then the unit tests look more like what we want.


/// Publish is paused for the ordering key.
#[error("the ordering key was paused")]
OrderingKeyPaused(()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usability question, which is not trivial, so feel free to think about it later:

Publish consumes the application's PubsubMessage. If the operation fails, what should the application do to resend the message? Would they need to hold onto a clone of the message until the operation is complete?

It would be super convenient if we could give them their message back. The plumbing on our end could be brutal, but the application would appreciate it. 🤷

Copy link
Member

@dbolduc dbolduc Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our GAPICs all have this problem too.... idk if publishing is special. Something seems wrong about just dropping their message without trying to send it. But maybe I am worrying too much about the unhappy case. 🤷

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.
Ideally, in the error case we should give the user the message back. IMO, we can do this in 2 ways:

  1. Keep a clone of it internally and pass it back to the user if there is a failure.
  2. Augment generated code/error to pass the message back if there is an error during Send.
    I think it's a bigger discussion that should be left out of this PR.

}
}

fn convert_error(e: crate::error::PublishError) -> gax::error::Error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment: ah, I think you are hesitating to change Output to a Result<String, PublishError> because then we have to update all the code downstream of this.

Ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really yucky though. It might have been nicer to change the error type first.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should discuss as a group with @suzmue if returning gax::error::Error or Publish error directly is ideal. We decided to with gax::error::Error for now as it is consistent with the other clients.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels so wrong to me, but we do have a precedence for just throwing something in an Error::io

// TODO(#3626) - reconsider the error kind.
result.map_err(crate::Error::io)

Although in GCS, I think there was a more compelling reason. (We wanted to reuse the ReadObjectResponse type which was in terms of gax::Error.)

Copy link
Collaborator Author

@PhongChuong PhongChuong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review.


/// Publish is paused for the ordering key.
#[error("the ordering key was paused")]
OrderingKeyPaused(()),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.
Ideally, in the error case we should give the user the message back. IMO, we can do this in 2 ways:

  1. Keep a clone of it internally and pass it back to the user if there is a failure.
  2. Augment generated code/error to pass the message back if there is an error during Send.
    I think it's a bigger discussion that should be left out of this PR.

}
}

fn convert_error(e: crate::error::PublishError) -> gax::error::Error {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should discuss as a group with @suzmue if returning gax::error::Error or Publish error directly is ideal. We decided to with gax::error::Error for now as it is consistent with the other clients.

Copy link
Member

@dbolduc dbolduc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this is my first time looking at the publisher code, so I did my best.

What happens if there is an ordering key error, the application publishes messages with that ordering key, then calls flush() ? It should be paused right?

(Also, I am too lazy to read the code, do we ever clean up BatchWorkers? like when all batches are flushed and it is empty? If so we would not want to clean up ones that are paused. I didn't see this anywhere, so probably not.)

@@ -0,0 +1,36 @@
// Copyright 2025 Google LLC
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 2026

];

// Assert the first error is caused by the Publish send operation.
let mut got_err = publisher.publish(messages[0].clone()).await.unwrap_err();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: here and below, no need to clone the messages.

let mut inflight = JoinSet::new();
let mut inflight: JoinSet<Result<(), gax::error::Error>> = JoinSet::new();
loop {
tokio::select! {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style nit: Consider factoring as much logic out of the select! statement as you can. Like

  join = inflight.join_next(), if !inflight.is_empty() => self.on_join(join, &mut inflight),
  msg = self.rx.recv() => {
    match msg {
      None => break,
      Some(msg) => self.on_message(msg, &mut inflight),
    }
  }

Doesn't have to be in this PR.


let client = GapicPublisher::from_stub(mock);
let publisher = PublisherBuilder::new(client, "my-topic".to_string())
.set_message_count_threshold(1_u32)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also want a test where there are subsequent messages in this batch.

(Those codecov comments are actually useful!)

msg = self.rx.recv() => {
match msg {
Some(ToBatchWorker::Publish(msg)) => {
if self.paused {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't Flush need this treatment too?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsub Issues related to the Pub/Sub API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants