impl(pubsub): add exactly once processing to MessageStream#5010
impl(pubsub): add exactly once processing to MessageStream#5010PhongChuong wants to merge 1 commit intogoogleapis:mainfrom
Conversation
|
This PR is largely based off @dbolduc implementation in main...dbolduc:google-cloud-rust:impl-pubsub-exactly-once-lease-state-the-rest Sorry for the large PR but it's mostly adding and updating tests. |
There was a problem hiding this comment.
Thanks, but I was expecting something else. I think it will be better to break this down into separate PRs:
- processing exactly once ack/nack actions
- adding the confirmed ack results to the lease loop.
- classifying messages as exactly-once in the message stream.
- This should be a
feat: - It should include the integration test, so we know it works.
- This should be a
... and focus the tests. We do not need the test suite to be symmetric on the delivery type.
| let exactly_once = resp | ||
| .subscription_properties | ||
| .is_some_and(|m| m.exactly_once_delivery_enabled); |
There was a problem hiding this comment.
Please revert this piece of the PR and add it back in a follow up. I am not going to look at the message_stream tests.
| Some(Action::Ack(ack_id)) => state.process(Action::Ack(ack_id)), | ||
| Some(Action::Nack(ack_id)) => state.process(Action::Nack(ack_id)), | ||
| // TODO(#3964) - process exactly-once acks/nacks in the lease state | ||
| _ => unreachable!("we do not return exactly-once handlers yet."), | ||
| Some(Action::ExactlyOnceAck(ack_id)) => state.process(Action::ExactlyOnceAck(ack_id)), | ||
| Some(Action::ExactlyOnceNack(ack_id)) => state.process(Action::ExactlyOnceNack(ack_id)), |
There was a problem hiding this comment.
| Some(Action::Ack(ack_id)) => state.process(Action::Ack(ack_id)), | |
| Some(Action::Nack(ack_id)) => state.process(Action::Nack(ack_id)), | |
| // TODO(#3964) - process exactly-once acks/nacks in the lease state | |
| _ => unreachable!("we do not return exactly-once handlers yet."), | |
| Some(Action::ExactlyOnceAck(ack_id)) => state.process(Action::ExactlyOnceAck(ack_id)), | |
| Some(Action::ExactlyOnceNack(ack_id)) => state.process(Action::ExactlyOnceNack(ack_id)), | |
| Some(action) => state.process(action), |
Then we don't need to duplicate all of the tests that distinguish between exactly-once and at-least-once.
|
|
||
| #[tokio_test_no_panics(start_paused = true)] | ||
| async fn deadline_interval() -> anyhow::Result<()> { | ||
| async fn flush_acks_nacks_on_interval_exactly_once() -> anyhow::Result<()> { |
There was a problem hiding this comment.
Duplicating these tests is not helpful IMO.
I think you should just have a basic_exactly_once test that gets sent Action::ExactlyOnceAck and Action::ExactlyOnceNack, and we verify that the leaser.confirmed_ack() gets called and the leaser.nack() gets called.
There was a problem hiding this comment.
As far as testing the lease loop as an interface, we added two things.
- forwarding actions from the application
- processing results of confirmed acks
(Doing two things at once is already too many. I would send one PR for part 1 and another for part 2)
To test 1, we do the basic_exactly_once test I described above.
Then to test 2, we can just add an exactly once message, report a result on the confirmed ack channel and see that it shows up in the message's receiver.
| fn test_at_least_once_ack(id: i32) -> Action { | ||
| Action::Ack(test_id(id)) | ||
| } | ||
|
|
||
| fn test_at_least_once_nack(id: i32) -> Action { | ||
| Action::Nack(test_id(id)) | ||
| } | ||
|
|
||
| fn test_exactly_once_ack(id: i32) -> Action { | ||
| Action::ExactlyOnceAck(test_id(id)) | ||
| } | ||
|
|
||
| fn test_exactly_once_nack(id: i32) -> Action { | ||
| Action::ExactlyOnceNack(test_id(id)) | ||
| } |
There was a problem hiding this comment.
unhelpful, just spell these things out. And if you want to save characters you can
use Action::{Ack, Nack, ExactlyOnceAck, ExactlyOnceNack};|
Will break this into smaller PRs with the suggestions. |
Process exactly once subscription in MessageStream.
A followup PR will update subsriber/client.rs documentation and add integration test.
Towards #3964