Skip to content

impl(pubsub): add exactly once processing to MessageStream#5010

Closed
PhongChuong wants to merge 1 commit intogoogleapis:mainfrom
PhongChuong:subEOLoop
Closed

impl(pubsub): add exactly once processing to MessageStream#5010
PhongChuong wants to merge 1 commit intogoogleapis:mainfrom
PhongChuong:subEOLoop

Conversation

@PhongChuong
Copy link
Copy Markdown
Contributor

@PhongChuong PhongChuong commented Mar 13, 2026

Process exactly once subscription in MessageStream.

A followup PR will update subsriber/client.rs documentation and add integration test.

Towards #3964

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Mar 13, 2026
@PhongChuong
Copy link
Copy Markdown
Contributor Author

PhongChuong commented Mar 13, 2026

This PR is largely based off @dbolduc implementation in main...dbolduc:google-cloud-rust:impl-pubsub-exactly-once-lease-state-the-rest

Sorry for the large PR but it's mostly adding and updating tests.

@PhongChuong PhongChuong marked this pull request as ready for review March 13, 2026 20:04
@PhongChuong PhongChuong requested a review from a team as a code owner March 13, 2026 20:04
Copy link
Copy Markdown
Member

@dbolduc dbolduc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, but I was expecting something else. I think it will be better to break this down into separate PRs:

  1. processing exactly once ack/nack actions
  2. adding the confirmed ack results to the lease loop.
  3. classifying messages as exactly-once in the message stream.
    • This should be a feat:
    • It should include the integration test, so we know it works.

... and focus the tests. We do not need the test suite to be symmetric on the delivery type.

Comment on lines +273 to +275
let exactly_once = resp
.subscription_properties
.is_some_and(|m| m.exactly_once_delivery_enabled);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this piece of the PR and add it back in a follow up. I am not going to look at the message_stream tests.

Comment on lines 63 to +66
Some(Action::Ack(ack_id)) => state.process(Action::Ack(ack_id)),
Some(Action::Nack(ack_id)) => state.process(Action::Nack(ack_id)),
// TODO(#3964) - process exactly-once acks/nacks in the lease state
_ => unreachable!("we do not return exactly-once handlers yet."),
Some(Action::ExactlyOnceAck(ack_id)) => state.process(Action::ExactlyOnceAck(ack_id)),
Some(Action::ExactlyOnceNack(ack_id)) => state.process(Action::ExactlyOnceNack(ack_id)),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Some(Action::Ack(ack_id)) => state.process(Action::Ack(ack_id)),
Some(Action::Nack(ack_id)) => state.process(Action::Nack(ack_id)),
// TODO(#3964) - process exactly-once acks/nacks in the lease state
_ => unreachable!("we do not return exactly-once handlers yet."),
Some(Action::ExactlyOnceAck(ack_id)) => state.process(Action::ExactlyOnceAck(ack_id)),
Some(Action::ExactlyOnceNack(ack_id)) => state.process(Action::ExactlyOnceNack(ack_id)),
Some(action) => state.process(action),

Then we don't need to duplicate all of the tests that distinguish between exactly-once and at-least-once.


#[tokio_test_no_panics(start_paused = true)]
async fn deadline_interval() -> anyhow::Result<()> {
async fn flush_acks_nacks_on_interval_exactly_once() -> anyhow::Result<()> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicating these tests is not helpful IMO.

I think you should just have a basic_exactly_once test that gets sent Action::ExactlyOnceAck and Action::ExactlyOnceNack, and we verify that the leaser.confirmed_ack() gets called and the leaser.nack() gets called.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as testing the lease loop as an interface, we added two things.

  1. forwarding actions from the application
  2. processing results of confirmed acks

(Doing two things at once is already too many. I would send one PR for part 1 and another for part 2)

To test 1, we do the basic_exactly_once test I described above.

Then to test 2, we can just add an exactly once message, report a result on the confirmed ack channel and see that it shows up in the message's receiver.

Comment on lines +133 to +147
fn test_at_least_once_ack(id: i32) -> Action {
Action::Ack(test_id(id))
}

fn test_at_least_once_nack(id: i32) -> Action {
Action::Nack(test_id(id))
}

fn test_exactly_once_ack(id: i32) -> Action {
Action::ExactlyOnceAck(test_id(id))
}

fn test_exactly_once_nack(id: i32) -> Action {
Action::ExactlyOnceNack(test_id(id))
}
Copy link
Copy Markdown
Member

@dbolduc dbolduc Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unhelpful, just spell these things out. And if you want to save characters you can

use Action::{Ack, Nack, ExactlyOnceAck, ExactlyOnceNack};

@PhongChuong
Copy link
Copy Markdown
Contributor Author

Will break this into smaller PRs with the suggestions.

@PhongChuong PhongChuong deleted the subEOLoop branch March 16, 2026 15:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsub Issues related to the Pub/Sub API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants