From 358edc150c2730c4a3286ebc753768c871c6c5be Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 2 Apr 2026 14:52:07 -0700 Subject: [PATCH 01/11] Add Sent Flag to Prevent Dropping Tasks on Push Failure --- benches/store_bench.rs | 11 +- migrations/0006_add_sent.sql | 1 + .../0001_create_inflight_activations.sql | 8 +- pg_migrations/0002_add_bucket.sql | 3 - src/fetch/mod.rs | 2 +- src/fetch/tests.rs | 18 +- src/grpc/server.rs | 44 +---- src/grpc/server_tests.rs | 26 +-- src/kafka/deserialize_activation.rs | 1 + src/main.rs | 2 +- src/push/mod.rs | 34 +++- src/push/tests.rs | 8 +- src/store/inflight_activation.rs | 125 +++++++++--- src/store/inflight_activation_tests.rs | 186 ++++++++++-------- src/store/postgres_activation_store.rs | 64 ++++-- src/upkeep.rs | 8 +- 16 files changed, 323 insertions(+), 218 deletions(-) create mode 100644 migrations/0006_add_sent.sql delete mode 100644 pg_migrations/0002_add_bucket.sql diff --git a/benches/store_bench.rs b/benches/store_bench.rs index 2b29f38a..f2ca2f03 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -58,16 +58,11 @@ async fn get_pending_activations(num_activations: u32, num_workers: u32) { join_set.spawn(async move { let mut num_activations_processed = 0; - while !store - .get_pending_activations( - Some("sentry"), - Some(std::slice::from_ref(&ns)), - Some(1), - None, - ) + while store + .claim_activation_for_pull(Some("sentry"), Some(&ns)) .await .unwrap() - .is_empty() + .is_none() { num_activations_processed += 1; } diff --git a/migrations/0006_add_sent.sql b/migrations/0006_add_sent.sql new file mode 100644 index 00000000..f0acabc0 --- /dev/null +++ b/migrations/0006_add_sent.sql @@ -0,0 +1 @@ +ALTER TABLE inflight_taskactivations ADD COLUMN sent INTEGER NOT NULL DEFAULT 0; diff --git a/pg_migrations/0001_create_inflight_activations.sql b/pg_migrations/0001_create_inflight_activations.sql index ee8b26a4..7b31baa1 100644 --- a/pg_migrations/0001_create_inflight_activations.sql +++ b/pg_migrations/0001_create_inflight_activations.sql @@ -16,5 +16,11 @@ CREATE TABLE IF NOT EXISTS inflight_taskactivations ( application TEXT NOT NULL, namespace TEXT NOT NULL, taskname TEXT NOT NULL, - on_attempts_exceeded INTEGER NOT NULL DEFAULT 1 + on_attempts_exceeded INTEGER NOT NULL DEFAULT 1, + bucket SMALLINT NOT NULL DEFAULT 0, + sent BOOLEAN NOT NULL DEFAULT FALSE ); + +-- Supports pending claim queries (status, filters, ordering) including sent +CREATE INDEX IF NOT EXISTS idx_inflight_taskactivations_claim +ON inflight_taskactivations (status, bucket, sent); diff --git a/pg_migrations/0002_add_bucket.sql b/pg_migrations/0002_add_bucket.sql deleted file mode 100644 index 6283c667..00000000 --- a/pg_migrations/0002_add_bucket.sql +++ /dev/null @@ -1,3 +0,0 @@ -ALTER TABLE inflight_taskactivations ADD COLUMN bucket SMALLINT NOT NULL DEFAULT 0; - -CREATE INDEX idx_activation_claim ON inflight_taskactivations (status, bucket); diff --git a/src/fetch/mod.rs b/src/fetch/mod.rs index e104fefc..d53a5519 100644 --- a/src/fetch/mod.rs +++ b/src/fetch/mod.rs @@ -117,7 +117,7 @@ impl FetchPool { let namespaces = config.namespaces.as_deref(); match store - .get_pending_activations(application, namespaces, limit, bucket) + .claim_activations_for_push(application, namespaces, limit, bucket) .await { Ok(activations) if activations.is_empty() => { diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index aa101fcb..86cdb25f 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -70,31 +70,29 @@ impl InflightActivationStore for MockStore { unimplemented!() } - async fn get_pending_activations( + async fn claim_activations( &self, _application: Option<&str>, _namespaces: Option<&[String]>, _limit: Option, _bucket: Option, + mark_sent: bool, ) -> Result, Error> { if self.fail { return Err(anyhow!("mock store error")); } Ok(match self.pending.lock().await.take() { - Some(a) => vec![a], + Some(mut a) => { + a.sent = mark_sent; + vec![a] + } None => vec![], }) } - async fn get_pending_activations_from_namespaces( - &self, - _application: Option<&str>, - _namespaces: Option<&[String]>, - _limit: Option, - _bucket: Option, - ) -> Result, Error> { - unimplemented!() + async fn mark_activation_sent(&self, _id: &str) -> Result<(), Error> { + Ok(()) } async fn pending_activation_max_lag(&self, _now: &DateTime) -> f64 { diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 5d6fb3e4..bd2db13d 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -35,31 +35,16 @@ impl ConsumerService for TaskbrokerServer { let application = &request.get_ref().application; let namespace = &request.get_ref().namespace; - let namespaces = namespace.as_ref().map(std::slice::from_ref); let inflight = self .store - .get_pending_activations(application.as_deref(), namespaces, Some(1), None) + .claim_activation_for_pull(application.as_deref(), namespace.as_deref()) .await; match inflight { - Ok(activations) if activations.is_empty() => { - Err(Status::not_found("No pending activation")) - } - - Ok(activations) if activations.len() > 1 => { - error!( - count = activations.len(), - application = ?application, - namespace = ?namespace, - "get_pending_activations returned more than one row despite limit of 1", - ); - - Err(Status::internal("Unable to retrieve pending activation")) - } + Ok(None) => Err(Status::not_found("No pending activation")), - Ok(activations) => { - let inflight = &activations[0]; + Ok(Some(inflight)) => { let now = Utc::now(); if inflight.processing_attempts < 1 { @@ -141,10 +126,10 @@ impl ConsumerService for TaskbrokerServer { }; let start_time = Instant::now(); - let namespaces = namespace.as_ref().map(std::slice::from_ref); + let res = match self .store - .get_pending_activations(application.as_deref(), namespaces, Some(1), None) + .claim_activation_for_pull(application.as_deref(), namespace.as_deref()) .await { Err(e) => { @@ -152,24 +137,9 @@ impl ConsumerService for TaskbrokerServer { Err(Status::internal("Unable to fetch next task")) } - Ok(activations) if activations.is_empty() => { - Err(Status::not_found("No pending activation")) - } - - Ok(activations) if activations.len() > 1 => { - error!( - count = activations.len(), - application = ?application, - namespace = ?namespace, - "get_pending_activations returned more than one row despite limit of 1", - ); - - Err(Status::internal("Unable to fetch next task")) - } - - Ok(activations) => { - let inflight = &activations[0]; + Ok(None) => Ok(Response::new(SetTaskStatusResponse { task: None })), + Ok(Some(inflight)) => { if inflight.processing_attempts < 1 { let now = Utc::now(); let received_to_gettask_latency = inflight.received_latency(now); diff --git a/src/grpc/server_tests.rs b/src/grpc/server_tests.rs index f0834a95..4dd6be36 100644 --- a/src/grpc/server_tests.rs +++ b/src/grpc/server_tests.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use crate::config::{Config, DeliveryMode}; use crate::grpc::server::TaskbrokerServer; +use crate::store::inflight_activation::InflightActivationStatus; use prost::Message; use rstest::rstest; use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService; @@ -111,7 +112,10 @@ async fn test_get_task_success(#[case] adapter: &str) { let activations = make_activations(1); store.store(activations).await.unwrap(); - let service = TaskbrokerServer { store, config }; + let service = TaskbrokerServer { + store: store.clone(), + config, + }; let request = GetTaskRequest { namespace: None, application: None, @@ -122,6 +126,10 @@ async fn test_get_task_success(#[case] adapter: &str) { assert!(resp.get_ref().task.is_some()); let task = resp.get_ref().task.as_ref().unwrap(); assert!(task.id == "id_0"); + + let row = store.get_by_id("id_0").await.unwrap().expect("claimed row"); + assert!(row.sent); + assert_eq!(row.status, InflightActivationStatus::Processing); } #[tokio::test] @@ -291,9 +299,8 @@ async fn test_set_task_status_with_application_no_match(#[case] adapter: &str) { }), }; let response = service.set_task_status(Request::new(request)).await; - assert!(response.is_err()); - let e = response.unwrap_err(); - assert_eq!(e.code(), Code::NotFound); + assert!(response.is_ok()); + assert!(response.unwrap().get_ref().task.is_none()); } #[tokio::test] @@ -320,12 +327,9 @@ async fn test_set_task_status_with_namespace_requires_application(#[case] adapte }), }; let response = service.set_task_status(Request::new(request)).await; - assert!(response.is_err()); - - let resp = response.unwrap_err(); - assert_eq!( - resp.code(), - Code::NotFound, - "No task found as namespace without filter is invalid." + assert!(response.is_ok()); + assert!( + response.unwrap().get_ref().task.is_none(), + "namespace without application yields no next task in response" ); } diff --git a/src/kafka/deserialize_activation.rs b/src/kafka/deserialize_activation.rs index 28485606..1949f573 100644 --- a/src/kafka/deserialize_activation.rs +++ b/src/kafka/deserialize_activation.rs @@ -108,6 +108,7 @@ pub fn new( taskname, on_attempts_exceeded, bucket, + sent: false, }) } } diff --git a/src/main.rs b/src/main.rs index db0ebd84..53e3160f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -241,7 +241,7 @@ async fn main() -> Result<(), Error> { }); // Initialize push and fetch pools - let push_pool = Arc::new(PushPool::new(config.clone())); + let push_pool = Arc::new(PushPool::new(config.clone(), store.clone())); let fetch_pool = FetchPool::new(store.clone(), config.clone(), push_pool.clone()); // Initialize push threads diff --git a/src/push/mod.rs b/src/push/mod.rs index 55bae746..45ba3a51 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -16,7 +16,7 @@ use tonic::transport::Channel; use tracing::{debug, error, info}; use crate::config::Config; -use crate::store::inflight_activation::InflightActivation; +use crate::store::inflight_activation::{InflightActivation, InflightActivationStore}; type HmacSha256 = Hmac; @@ -88,26 +88,32 @@ pub struct PushPool { /// Taskbroker configuration. config: Arc, + + /// Activation store, which we need for marking tasks as sent. + store: Arc, } impl PushPool { /// Initialize a new push pool. - pub fn new(config: Arc) -> Self { + pub fn new(config: Arc, store: Arc) -> Self { let (sender, receiver) = flume::bounded(config.push_queue_size); Self { sender, receiver, config, + store, } } /// Spawn `config.push_threads` asynchronous tasks, each of which repeatedly moves pending activations from the channel to the worker service until the shutdown signal is received. pub async fn start(&self) -> Result<()> { + let store = self.store.clone(); let mut push_pool: JoinSet> = crate::tokio::spawn_pool(self.config.push_threads, |_| { let endpoint = self.config.worker_endpoint.clone(); let receiver = self.receiver.clone(); + let store = store.clone(); let guard = get_shutdown_guard().shutdown_on_drop(); @@ -158,7 +164,17 @@ impl PushPool { ) .await { - Ok(_) => debug!(task_id = %id, "Activation sent to worker"), + Ok(_) => { + debug!(task_id = %id, "Activation sent to worker"); + + if let Err(e) = store.mark_activation_sent(&id).await { + error!( + task_id = %id, + error = ?e, + "Failed to mark activation as sent after push" + ); + } + } // Once processing deadline expires, status will be set back to pending Err(e) => error!( @@ -185,7 +201,17 @@ impl PushPool { ) .await { - Ok(_) => debug!(task_id = %id, "Activation sent to worker"), + Ok(_) => { + debug!(task_id = %id, "Activation sent to worker"); + + if let Err(e) = store.mark_activation_sent(&id).await { + error!( + task_id = %id, + error = ?e, + "Failed to mark activation as sent after push" + ); + } + } // Once processing deadline expires, status will be set back to pending Err(e) => error!( diff --git a/src/push/tests.rs b/src/push/tests.rs index 5b79b95b..6ea99c9d 100644 --- a/src/push/tests.rs +++ b/src/push/tests.rs @@ -7,7 +7,7 @@ use tonic::async_trait; use super::*; use crate::config::Config; -use crate::test_utils::make_activations; +use crate::test_utils::{create_test_store, make_activations}; /// Fake worker client for unit testing. struct MockWorkerClient { @@ -117,7 +117,8 @@ async fn push_pool_submit_enqueues_item() { ..Config::default() }); - let pool = PushPool::new(config); + let store = create_test_store("sqlite").await; + let pool = PushPool::new(config, store); let activation = make_activations(1).remove(0); let result = pool.submit(activation).await; @@ -131,7 +132,8 @@ async fn push_pool_submit_backpressures_when_queue_full() { ..Config::default() }); - let pool = PushPool::new(config); + let store = create_test_store("sqlite").await; + let pool = PushPool::new(config, store); let first = make_activations(1).remove(0); let second = make_activations(1).remove(0); diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index d4c8b4a0..12f5a2da 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -180,6 +180,10 @@ pub struct InflightActivation { /// Bucket derived from activation ID (UUID as number % 256). Set once on ingestion. #[builder(setter(skip), default = "0")] pub bucket: i16, + + /// True after successful push. + #[builder(default = false)] + pub sent: bool, } impl InflightActivation { @@ -243,6 +247,7 @@ pub struct TableRow { #[sqlx(try_from = "i32")] pub on_attempts_exceeded: OnAttemptsExceeded, pub bucket: i16, + pub sent: bool, } impl TryFrom for TableRow { @@ -268,6 +273,7 @@ impl TryFrom for TableRow { taskname: value.taskname, on_attempts_exceeded: value.on_attempts_exceeded, bucket: value.bucket, + sent: value.sent, }) } } @@ -293,6 +299,7 @@ impl From for InflightActivation { taskname: value.taskname, on_attempts_exceeded: value.on_attempts_exceeded, bucket: value.bucket, + sent: value.sent, } } } @@ -376,13 +383,24 @@ pub trait InflightActivationStore: Send + Sync { /// Get `limit` pending activations, optionally filtered by namespaces and bucket subrange. /// If no limit is provided, all matching activations will be returned. - async fn get_pending_activations( + async fn claim_activations( + &self, + application: Option<&str>, + namespaces: Option<&[String]>, + limit: Option, + bucket: Option, + sent: bool, + ) -> Result, Error>; + + /// Claims `limit` activations within the `bucket` range. Column `sent` remains false until `mark_activation_sent` is called. + async fn claim_activations_for_push( &self, application: Option<&str>, namespaces: Option<&[String]>, limit: Option, bucket: Option, ) -> Result, Error> { + // If a namespace filter is used, an application must also be used if namespaces.is_some() && application.is_none() { warn!( ?namespaces, @@ -392,21 +410,41 @@ pub trait InflightActivationStore: Send + Sync { return Ok(vec![]); } - let results = self - .get_pending_activations_from_namespaces(application, namespaces, limit, bucket) - .await?; - - Ok(results) + self.claim_activations(application, namespaces, limit, bucket, false) + .await } - /// Claim pending activations (moves them to processing), optionally filtered by application and namespaces. - async fn get_pending_activations_from_namespaces( + /// Claims `limit` activations with application `application` and namespace `namespace`. + async fn claim_activation_for_pull( &self, application: Option<&str>, - namespaces: Option<&[String]>, - limit: Option, - bucket: Option, - ) -> Result, Error>; + namespace: Option<&str>, + ) -> Result, Error> { + // Convert single namespace to vector for internal use + let namespaces = namespace.map(|ns| vec![ns.to_string()]); + + // If a namespace filter is used, an application must also be used + if namespaces.is_some() && application.is_none() { + warn!( + ?namespaces, + "Received request for namespaced task without application" + ); + + return Ok(None); + } + + let mut rows = self + .claim_activations(application, namespaces.as_deref(), Some(1), None, true) + .await?; + + // If we are getting more than one task here, something is broken + assert!(rows.len() <= 1); + + Ok(rows.pop()) + } + + /// Record successful push. + async fn mark_activation_sent(&self, id: &str) -> Result<(), Error>; /// Get the age of the oldest pending activation in seconds async fn pending_activation_max_lag(&self, now: &DateTime) -> f64; @@ -738,7 +776,8 @@ impl InflightActivationStore for SqliteActivationStore { namespace, taskname, on_attempts_exceeded, - bucket + bucket, + sent FROM inflight_taskactivations WHERE id = $1 ", @@ -780,7 +819,8 @@ impl InflightActivationStore for SqliteActivationStore { namespace, taskname, on_attempts_exceeded, - bucket + bucket, + sent ) ", ); @@ -814,6 +854,7 @@ impl InflightActivationStore for SqliteActivationStore { b.push_bind(row.taskname); b.push_bind(row.on_attempts_exceeded as i32); b.push_bind(row.bucket); + b.push_bind(row.sent); }) .push(" ON CONFLICT(id) DO NOTHING") .build(); @@ -842,16 +883,14 @@ impl InflightActivationStore for SqliteActivationStore { meta_result } - /// Claim pending activations from specified namespaces (moves them to processing). - /// If namespaces is `None`, gets from any namespace. - /// If namespaces is `Some(...)`, restricts to those namespaces. #[instrument(skip_all)] - async fn get_pending_activations_from_namespaces( + async fn claim_activations( &self, application: Option<&str>, namespaces: Option<&[String]>, limit: Option, bucket: Option, + sent: bool, ) -> Result, Error> { let now = Utc::now(); @@ -865,6 +904,8 @@ impl InflightActivationStore for SqliteActivationStore { status = " )); query_builder.push_bind(InflightActivationStatus::Processing); + query_builder.push(", sent = "); + query_builder.push_bind(sent); query_builder.push( " WHERE id IN ( @@ -905,9 +946,7 @@ impl InflightActivationStore for SqliteActivationStore { } query_builder.push(") RETURNING *"); - let mut conn = self - .acquire_write_conn_metric("get_pending_activation") - .await?; + let mut conn = self.acquire_write_conn_metric("claim_activations").await?; let rows: Vec = query_builder .build_query_as::() .fetch_all(&mut *conn) @@ -916,6 +955,19 @@ impl InflightActivationStore for SqliteActivationStore { Ok(rows.into_iter().map(|row| row.into()).collect()) } + #[instrument(skip_all)] + async fn mark_activation_sent(&self, id: &str) -> Result<(), Error> { + let mut conn = self + .acquire_write_conn_metric("mark_activation_sent") + .await?; + sqlx::query("UPDATE inflight_taskactivations SET sent = 1 WHERE id = $1 AND status = $2") + .bind(id) + .bind(InflightActivationStatus::Processing) + .execute(&mut *conn) + .await?; + Ok(()) + } + /// Get the age of the oldest pending activation in seconds. /// Only activations with status=pending and processing_attempts=0 are considered /// as we are interested in latency to the *first* attempt. @@ -1039,7 +1091,8 @@ impl InflightActivationStore for SqliteActivationStore { namespace, taskname, on_attempts_exceeded, - bucket + bucket, + sent FROM inflight_taskactivations WHERE status = $1 ", @@ -1060,16 +1113,13 @@ impl InflightActivationStore for SqliteActivationStore { Ok(()) } - /// Update tasks that are in processing and have exceeded their processing deadline - /// Exceeding a processing deadline does not consume a retry as we don't know - /// if a worker took the task and was killed, or failed. + /// Update tasks that are in processing and have exceeded their processing deadline. #[instrument(skip_all)] async fn handle_processing_deadline(&self) -> Result { let now = Utc::now(); let mut atomic = self.write_pool.begin().await?; // Idempotent tasks that fail their processing deadlines go directly to failure - // there are no retries, as the worker will reject the task due to idempotency keys. let most_once_result = sqlx::query( "UPDATE inflight_taskactivations SET processing_deadline = null, status = $1 @@ -1086,12 +1136,27 @@ impl InflightActivationStore for SqliteActivationStore { processing_deadline_modified_rows = query_res.rows_affected(); } - // Update non-idempotent tasks. - // Increment processing_attempts by 1 and reset processing_deadline to null. + // Revert non-AMO activations that weren't delivered back to pending without consuming an attempt + let unsent = sqlx::query( + "UPDATE inflight_taskactivations + SET processing_deadline = null, status = $1, sent = 0 + WHERE processing_deadline < $2 AND sent = 0 AND status = $3", + ) + .bind(InflightActivationStatus::Pending) + .bind(now.timestamp()) + .bind(InflightActivationStatus::Processing) + .execute(&mut *atomic) + .await; + + if let Ok(query_res) = unsent { + processing_deadline_modified_rows += query_res.rows_affected(); + } + + // Revert activations that were delivered back to 'pending' AND consume an attempt let result = sqlx::query( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1 - WHERE processing_deadline < $2 AND status = $3", + SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1, sent = 0 + WHERE processing_deadline < $2 AND sent = 1 AND status = $3", ) .bind(InflightActivationStatus::Pending) .bind(now.timestamp()) diff --git a/src/store/inflight_activation_tests.rs b/src/store/inflight_activation_tests.rs index 363e6cef..0b46a96b 100644 --- a/src/store/inflight_activation_tests.rs +++ b/src/store/inflight_activation_tests.rs @@ -249,12 +249,11 @@ async fn test_get_pending_activation(#[case] adapter: &str) { let batch = make_activations(2); assert!(store.store(batch.clone()).await.is_ok()); - let mut got = store - .get_pending_activations(None, None, Some(1), None) + let result = store + .claim_activation_for_pull(None, None) .await - .unwrap(); - assert_eq!(got.len(), 1); - let result = got.pop().unwrap(); + .unwrap() + .expect("expected one activation"); assert_eq!(result.id, "id_0"); assert_eq!(result.status, InflightActivationStatus::Processing); @@ -288,7 +287,7 @@ async fn test_get_pending_activation_bucket_filter(#[case] adapter: &str) { assert!(store.store(batch).await.is_ok()); let mut first = store - .get_pending_activations(None, None, Some(1), Some((15, 25))) + .claim_activations_for_push(None, None, Some(1), Some((15, 25))) .await .unwrap(); assert_eq!(first.len(), 1); @@ -297,7 +296,7 @@ async fn test_get_pending_activation_bucket_filter(#[case] adapter: &str) { assert_eq!(first.bucket, 20); let mut second = store - .get_pending_activations(None, None, Some(1), Some((0, 15))) + .claim_activations_for_push(None, None, Some(1), Some((0, 15))) .await .unwrap(); assert_eq!(second.len(), 1); @@ -307,7 +306,7 @@ async fn test_get_pending_activation_bucket_filter(#[case] adapter: &str) { assert!( store - .get_pending_activations(None, None, Some(1), Some((15, 25))) + .claim_activations_for_push(None, None, Some(1), Some((15, 25))) .await .unwrap() .is_empty() @@ -343,17 +342,12 @@ async fn test_get_pending_activation_with_race(#[case] adapter: &str) { join_set.spawn(async move { rx.recv().await.unwrap(); { - let mut v = store - .get_pending_activations( - Some("sentry"), - Some(std::slice::from_ref(&ns)), - Some(1), - None, - ) + let v = store + .claim_activation_for_pull(Some("sentry"), Some(ns.as_str())) .await .unwrap(); - assert_eq!(v.len(), 1); - v.pop().unwrap() + assert!(v.is_some()); + v.unwrap() } }); } @@ -384,17 +378,11 @@ async fn test_get_pending_activation_with_namespace(#[case] adapter: &str) { let other_namespace = "other_namespace".to_string(); // Get activation from other namespace - let mut got = store - .get_pending_activations( - Some("sentry"), - Some(std::slice::from_ref(&other_namespace)), - Some(1), - None, - ) + let result = store + .claim_activation_for_pull(Some("sentry"), Some(other_namespace.as_str())) .await - .unwrap(); - assert_eq!(got.len(), 1); - let result = got.pop().unwrap(); + .unwrap() + .expect("expected one activation"); assert_eq!(result.id, "id_1"); assert_eq!(result.status, InflightActivationStatus::Processing); assert!(result.processing_deadline.unwrap() > Utc::now()); @@ -416,10 +404,11 @@ async fn test_get_pending_activation_from_multiple_namespaces(#[case] adapter: & batch[3].namespace = "ns4".into(); assert!(store.store(batch.clone()).await.is_ok()); - // Get activation from multiple namespaces (should get oldest) + // Get activation from multiple namespaces (should get oldest). + // Use `claim_activations` so upkeep-style `None` application + namespaces is allowed (not `claim_activations_for_push`). let namespaces = vec!["ns2".to_string(), "ns3".to_string()]; let result = store - .get_pending_activations_from_namespaces(None, Some(&namespaces), None, None) + .claim_activations(None, Some(&namespaces), None, None, false) .await .unwrap(); @@ -444,24 +433,18 @@ async fn test_get_pending_activation_with_namespace_requires_application(#[case] batch[1].namespace = "other_namespace".into(); assert!(store.store(batch.clone()).await.is_ok()); - // This is an invalid query as we don't want to allow clients - // to fetch tasks from any application. + // This is an invalid query as we don't want to allow clients to fetch tasks from any application let other_namespace = "other_namespace".to_string(); let got = store - .get_pending_activations( - None, - Some(std::slice::from_ref(&other_namespace)), - Some(1), - None, - ) + .claim_activation_for_pull(None, Some(other_namespace.as_str())) .await .unwrap(); - assert!(got.is_empty()); + assert!(got.is_none()); // We allow no application in this method because of usage in upkeep let namespaces = vec!["other_namespace".to_string()]; let activations = store - .get_pending_activations_from_namespaces(None, Some(&namespaces), Some(2), None) + .claim_activations(None, Some(&namespaces), Some(2), None, false) .await .unwrap(); assert_eq!( @@ -492,12 +475,9 @@ async fn test_get_pending_activation_skip_expires(#[case] adapter: &str) { batch[0].expires_at = Some(Utc::now() - Duration::from_secs(100)); assert!(store.store(batch.clone()).await.is_ok()); - let result = store - .get_pending_activations(None, None, Some(1), None) - .await; + let result = store.claim_activation_for_pull(None, None).await; assert!(result.is_ok()); - let res_vec = result.unwrap(); - assert!(res_vec.is_empty()); + assert!(result.unwrap().is_none()); assert_counts( StatusCount { @@ -523,12 +503,11 @@ async fn test_get_pending_activation_earliest(#[case] adapter: &str) { let ret = store.store(batch.clone()).await; assert!(ret.is_ok(), "{}", ret.err().unwrap().to_string()); - let mut got = store - .get_pending_activations(None, None, Some(1), None) + let result = store + .claim_activation_for_pull(None, None) .await - .unwrap(); - assert_eq!(got.len(), 1); - let result = got.pop().unwrap(); + .unwrap() + .expect("expected one activation"); assert_eq!( result.added_at, Utc.with_ymd_and_hms(1998, 6, 24, 0, 0, 0).unwrap() @@ -549,12 +528,11 @@ async fn test_get_pending_activation_fetches_application(#[case] adapter: &str) // Getting an activation with no application filter should // include activations with application set. - let mut got = store - .get_pending_activations(None, None, Some(1), None) + let result = store + .claim_activation_for_pull(None, None) .await - .unwrap(); - assert_eq!(got.len(), 1); - let result = got.pop().unwrap(); + .unwrap() + .expect("expected one activation"); assert_eq!(result.id, "id_0"); assert_eq!(result.status, InflightActivationStatus::Processing); assert!(result.processing_deadline.unwrap() > Utc::now()); @@ -574,31 +552,27 @@ async fn test_get_pending_activation_with_application(#[case] adapter: &str) { assert!(store.store(batch.clone()).await.is_ok()); // Get activation from a named application - let mut got = store - .get_pending_activations(Some("hammers"), None, Some(1), None) + let result = store + .claim_activation_for_pull(Some("hammers"), None) .await - .unwrap(); - assert_eq!(got.len(), 1); - let result = got.pop().unwrap(); + .unwrap() + .expect("expected one activation"); assert_eq!(result.id, "id_1"); assert_eq!(result.status, InflightActivationStatus::Processing); assert!(result.processing_deadline.unwrap() > Utc::now()); assert_eq!(result.application, "hammers"); let result_opt = store - .get_pending_activations(Some("hammers"), None, Some(1), None) + .claim_activation_for_pull(Some("hammers"), None) .await .unwrap(); assert!( - result_opt.is_empty(), + result_opt.is_none(), "no pending activations in hammers left" ); - let remaining = store - .get_pending_activations(None, None, Some(1), None) - .await - .unwrap(); - assert_eq!(remaining.len(), 1, "one pending activation in '' left"); + let remaining = store.claim_activation_for_pull(None, None).await.unwrap(); + assert!(remaining.is_some(), "one pending activation in '' left"); store.remove_db().await.unwrap(); } @@ -621,29 +595,22 @@ async fn test_get_pending_activation_with_application_and_namespace(#[case] adap let target_ns = "target".to_string(); // Get activation from a named application - let mut got = store - .get_pending_activations( - Some("hammers"), - Some(std::slice::from_ref(&target_ns)), - Some(1), - None, - ) + let result = store + .claim_activation_for_pull(Some("hammers"), Some(target_ns.as_str())) .await - .unwrap(); - assert_eq!(got.len(), 1); - let result = got.pop().unwrap(); + .unwrap() + .expect("expected one activation"); assert_eq!(result.id, "id_1"); assert_eq!(result.status, InflightActivationStatus::Processing); assert!(result.processing_deadline.unwrap() > Utc::now()); assert_eq!(result.application, "hammers"); assert_eq!(result.namespace, "target"); - let mut got = store - .get_pending_activations(Some("hammers"), None, Some(1), None) + let result = store + .claim_activation_for_pull(Some("hammers"), None) .await - .unwrap(); - assert_eq!(got.len(), 1); - let result = got.pop().unwrap(); + .unwrap() + .expect("expected one activation"); assert_eq!(result.id, "id_2"); assert_eq!(result.application, "hammers"); assert_eq!(result.namespace, "not-target"); @@ -662,7 +629,7 @@ async fn test_get_pending_activations_no_limit(#[case] adapter: &str) { assert!(store.store(batch).await.is_ok()); let got = store - .get_pending_activations(None, None, None, None) + .claim_activations_for_push(None, None, None, None) .await .unwrap(); assert_eq!(got.len(), N); @@ -696,7 +663,7 @@ async fn test_get_pending_activations_limit_below_pending(#[case] adapter: &str) assert!(store.store(batch).await.is_ok()); let got = store - .get_pending_activations(None, None, Some(X), None) + .claim_activations_for_push(None, None, Some(X), None) .await .unwrap(); assert_eq!(got.len(), X as usize); @@ -733,7 +700,7 @@ async fn test_get_pending_activations_limit_above_pending(#[case] adapter: &str) assert!(store.store(batch).await.is_ok()); let got = store - .get_pending_activations(None, None, Some(X), None) + .claim_activations_for_push(None, None, Some(X), None) .await .unwrap(); assert_eq!(got.len(), Y); @@ -849,10 +816,10 @@ async fn test_set_activation_status(#[case] adapter: &str) { .await; assert!( store - .get_pending_activations(None, None, Some(1), None) + .claim_activation_for_pull(None, None) .await .unwrap() - .is_empty() + .is_none() ); let result = store @@ -983,6 +950,7 @@ async fn test_handle_processing_deadline(#[case] adapter: &str) { let mut batch = make_activations(2); batch[1].status = InflightActivationStatus::Processing; + batch[1].sent = true; batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); assert!(store.store(batch.clone()).await.is_ok()); @@ -1027,6 +995,7 @@ async fn test_handle_processing_deadline_multiple_tasks(#[case] adapter: &str) { let mut batch = make_activations(2); batch[0].status = InflightActivationStatus::Processing; + batch[0].sent = true; batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); batch[1].status = InflightActivationStatus::Processing; batch[1].processing_deadline = Some(Utc::now() + chrono::Duration::days(30)); @@ -1065,9 +1034,11 @@ async fn test_handle_processing_at_most_once(#[case] adapter: &str) { // Both records are past processing deadlines let mut batch = make_activations(2); batch[0].status = InflightActivationStatus::Processing; + batch[0].sent = true; batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); batch[1].status = InflightActivationStatus::Processing; + batch[1].sent = true; replace_retry_state( &mut batch[1], Some(RetryState { @@ -1118,6 +1089,7 @@ async fn test_handle_processing_deadline_discard_after(#[case] adapter: &str) { let mut batch = make_activations(2); batch[1].status = InflightActivationStatus::Processing; + batch[1].sent = true; batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); replace_retry_state( &mut batch[1], @@ -1164,6 +1136,7 @@ async fn test_handle_processing_deadline_deadletter_after(#[case] adapter: &str) let mut batch = make_activations(2); batch[1].status = InflightActivationStatus::Processing; + batch[1].sent = true; batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); replace_retry_state( &mut batch[1], @@ -1210,6 +1183,7 @@ async fn test_handle_processing_deadline_no_retries_remaining(#[case] adapter: & let mut batch = make_activations(2); batch[1].status = InflightActivationStatus::Processing; + batch[1].sent = true; batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); replace_retry_state( &mut batch[1], @@ -1247,6 +1221,46 @@ async fn test_handle_processing_deadline_no_retries_remaining(#[case] adapter: & store.remove_db().await.unwrap(); } +#[tokio::test] +#[rstest] +#[case::sqlite("sqlite")] +#[case::postgres("postgres")] +async fn test_handle_processing_deadline_unsent_no_attempt_increment(#[case] adapter: &str) { + let store = create_test_store(adapter).await; + let mut batch = make_activations(1); + batch[0].status = InflightActivationStatus::Processing; + batch[0].sent = false; + batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); + assert!(store.store(batch.clone()).await.is_ok()); + let count = store.handle_processing_deadline().await.unwrap(); + assert_eq!(count, 1); + let task = store.get_by_id(&batch[0].id).await.unwrap().unwrap(); + assert_eq!(task.status, InflightActivationStatus::Pending); + assert_eq!(task.processing_attempts, 0); + assert!(!task.sent); + store.remove_db().await.unwrap(); +} + +#[tokio::test] +#[rstest] +#[case::sqlite("sqlite")] +#[case::postgres("postgres")] +async fn test_handle_processing_deadline_at_most_once_unsent_failure(#[case] adapter: &str) { + let store = create_test_store(adapter).await; + let mut batch = make_activations(1); + batch[0].status = InflightActivationStatus::Processing; + batch[0].sent = false; + batch[0].at_most_once = true; + batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); + assert!(store.store(batch.clone()).await.is_ok()); + let count = store.handle_processing_deadline().await.unwrap(); + assert_eq!(count, 1); + let task = store.get_by_id(&batch[0].id).await.unwrap().unwrap(); + assert_eq!(task.status, InflightActivationStatus::Failure); + assert!(!task.sent); + store.remove_db().await.unwrap(); +} + #[tokio::test] #[rstest] #[case::sqlite("sqlite")] diff --git a/src/store/postgres_activation_store.rs b/src/store/postgres_activation_store.rs index 5b7e0077..e10e67c1 100644 --- a/src/store/postgres_activation_store.rs +++ b/src/store/postgres_activation_store.rs @@ -176,7 +176,8 @@ impl InflightActivationStore for PostgresActivationStore { namespace, taskname, on_attempts_exceeded, - bucket + bucket, + sent FROM inflight_taskactivations WHERE id = $1 ", @@ -218,7 +219,8 @@ impl InflightActivationStore for PostgresActivationStore { namespace, taskname, on_attempts_exceeded, - bucket + bucket, + sent ) ", ); @@ -251,6 +253,7 @@ impl InflightActivationStore for PostgresActivationStore { b.push_bind(row.taskname); b.push_bind(row.on_attempts_exceeded as i32); b.push_bind(row.bucket); + b.push_bind(row.sent); }) .push(" ON CONFLICT(id) DO NOTHING") .build(); @@ -258,16 +261,14 @@ impl InflightActivationStore for PostgresActivationStore { Ok(query.execute(&mut *conn).await?.into()) } - /// Claim pending activations from specified namespaces (moves them to processing). - /// If namespaces is `None`, gets from any namespace. - /// If namespaces is `Some(...)`, restricts to those namespaces. #[instrument(skip_all)] - async fn get_pending_activations_from_namespaces( + async fn claim_activations( &self, application: Option<&str>, namespaces: Option<&[String]>, limit: Option, bucket: Option, + sent: bool, ) -> Result, Error> { let now = Utc::now(); @@ -320,13 +321,13 @@ impl InflightActivationStore for PostgresActivationStore { status = " )); query_builder.push_bind(InflightActivationStatus::Processing.to_string()); + query_builder.push(", sent = "); + query_builder.push_bind(sent); query_builder.push(" FROM selected_activations "); query_builder.push(" WHERE inflight_taskactivations.id = selected_activations.id"); query_builder.push(" RETURNING *, kafka_offset AS offset"); - let mut conn = self - .acquire_write_conn_metric("get_pending_activation") - .await?; + let mut conn = self.acquire_write_conn_metric("claim_activations").await?; let rows: Vec = query_builder .build_query_as::() .fetch_all(&mut *conn) @@ -335,6 +336,21 @@ impl InflightActivationStore for PostgresActivationStore { Ok(rows.into_iter().map(|row| row.into()).collect()) } + #[instrument(skip_all)] + async fn mark_activation_sent(&self, id: &str) -> Result<(), Error> { + let mut conn = self + .acquire_write_conn_metric("mark_activation_sent") + .await?; + sqlx::query( + "UPDATE inflight_taskactivations SET sent = TRUE WHERE id = $1 AND status = $2", + ) + .bind(id) + .bind(InflightActivationStatus::Processing.to_string()) + .execute(&mut *conn) + .await?; + Ok(()) + } + /// Get the age of the oldest pending activation in seconds. /// Only activations with status=pending and processing_attempts=0 are considered /// as we are interested in latency to the *first* attempt. @@ -479,7 +495,8 @@ impl InflightActivationStore for PostgresActivationStore { namespace, taskname, on_attempts_exceeded, - bucket + bucket, + sent FROM inflight_taskactivations WHERE status = $1 ", @@ -502,16 +519,12 @@ impl InflightActivationStore for PostgresActivationStore { Ok(()) } - /// Update tasks that are in processing and have exceeded their processing deadline - /// Exceeding a processing deadline does not consume a retry as we don't know - /// if a worker took the task and was killed, or failed. + /// Update tasks that are in processing and have exceeded their processing deadline. #[instrument(skip_all)] async fn handle_processing_deadline(&self) -> Result { let now = Utc::now(); let mut atomic = self.write_pool.begin().await?; - // At-most-once tasks that fail their processing deadlines go directly to failure - // there are no retries, as the worker will reject the task due to at_most_once keys. let most_once_result = sqlx::query( "UPDATE inflight_taskactivations SET processing_deadline = null, status = $1 @@ -528,12 +541,25 @@ impl InflightActivationStore for PostgresActivationStore { processing_deadline_modified_rows = query_res.rows_affected(); } - // Update regular tasks. - // Increment processing_attempts by 1 and reset processing_deadline to null. + let unsent = sqlx::query( + "UPDATE inflight_taskactivations + SET processing_deadline = null, status = $1, sent = FALSE + WHERE processing_deadline < $2 AND sent = FALSE AND status = $3", + ) + .bind(InflightActivationStatus::Pending.to_string()) + .bind(now) + .bind(InflightActivationStatus::Processing.to_string()) + .execute(&mut *atomic) + .await; + + if let Ok(query_res) = unsent { + processing_deadline_modified_rows += query_res.rows_affected(); + } + let result = sqlx::query( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1 - WHERE processing_deadline < $2 AND status = $3", + SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1, sent = FALSE + WHERE processing_deadline < $2 AND sent = TRUE AND status = $3", ) .bind(InflightActivationStatus::Pending.to_string()) .bind(now) diff --git a/src/upkeep.rs b/src/upkeep.rs index 82f0da52..cdd19b29 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -299,7 +299,7 @@ pub async fn do_upkeep( .expect("Could not create kafka producer in upkeep"), ); if let Ok(tasks) = store - .get_pending_activations_from_namespaces(None, Some(&demoted_namespaces), None, None) + .claim_activations(None, Some(&demoted_namespaces), None, None, true) .await { // Produce tasks to Kafka with updated namespace @@ -1163,14 +1163,14 @@ mod tests { 1 ); let pending = store - .get_pending_activations(None, None, Some(1), None) + .claim_activations(None, None, Some(1), None, true) .await .unwrap(); assert_eq!(pending.len(), 1); assert_eq!(pending[0].id, "id_0"); assert!( store - .get_pending_activations(None, None, Some(1), None) + .claim_activations(None, None, Some(1), None, true) .await .unwrap() .is_empty() @@ -1195,7 +1195,7 @@ mod tests { 1 ); let pending = store - .get_pending_activations(None, None, Some(1), None) + .claim_activations(None, None, Some(1), None, true) .await .unwrap(); assert_eq!(pending.len(), 1); From 7084a24646b0fa0eb20604a715d531bebdfa9c81 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Fri, 3 Apr 2026 13:08:33 -0700 Subject: [PATCH 02/11] Add Metrics for Processing Deadline Resets, Fix AI Reviewer Bugs --- benches/store_bench.rs | 2 +- src/fetch/tests.rs | 4 +- src/store/inflight_activation.rs | 82 +++++++++++++++++--------- src/store/inflight_activation_tests.rs | 77 ++++++++++++++++++++---- src/store/postgres_activation_store.rs | 63 +++++++++++--------- src/upkeep.rs | 50 ++++++++++++---- 6 files changed, 197 insertions(+), 81 deletions(-) diff --git a/benches/store_bench.rs b/benches/store_bench.rs index f2ca2f03..2463776c 100644 --- a/benches/store_bench.rs +++ b/benches/store_bench.rs @@ -62,7 +62,7 @@ async fn get_pending_activations(num_activations: u32, num_workers: u32) { .claim_activation_for_pull(Some("sentry"), Some(&ns)) .await .unwrap() - .is_none() + .is_some() { num_activations_processed += 1; } diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index 86cdb25f..2fb34f4c 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -9,11 +9,11 @@ use tonic::async_trait; use super::*; use crate::config::Config; use crate::push::PushError; -use crate::store::inflight_activation::InflightActivationStore; use crate::store::inflight_activation::{BucketRange, InflightActivation}; use crate::store::inflight_activation::{ FailedTasksForwarder, InflightActivationStatus, QueryResult, }; +use crate::store::inflight_activation::{InflightActivationStore, ProcessingDeadlineCounts}; use crate::test_utils::make_activations; /// Store stub that returns one activation once OR is always empty OR always fails. @@ -135,7 +135,7 @@ impl InflightActivationStore for MockStore { unimplemented!() } - async fn handle_processing_deadline(&self) -> Result { + async fn handle_processing_deadline(&self) -> Result { unimplemented!() } diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 12f5a2da..7600c3ad 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -186,6 +186,26 @@ pub struct InflightActivation { pub sent: bool, } +/// Counts how many tasks were changed from 'processing' to 'pending' by `handle_processing_deadline`. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct ProcessingDeadlineCounts { + /// The number of AMO tasks reverted. + pub at_most_once: u64, + + /// The number of regular, unsent tasks reverted. + pub non_amo_unsent: u64, + + /// The number of regular, sent tasks reverted. + pub non_amo_sent: u64, +} + +impl ProcessingDeadlineCounts { + /// Count the total number of tasks that reached their processing deadline and were reverted to pending. + pub fn total(&self) -> u64 { + self.at_most_once + self.non_amo_unsent + self.non_amo_sent + } +} + impl InflightActivation { /// The number of milliseconds between an activation's received timestamp /// and the provided datetime @@ -501,7 +521,7 @@ pub trait InflightActivationStore: Send + Sync { async fn clear(&self) -> Result<(), Error>; /// Update tasks that exceeded their processing deadline - async fn handle_processing_deadline(&self) -> Result; + async fn handle_processing_deadline(&self) -> Result; /// Update tasks that exceeded max processing attempts async fn handle_processing_attempts(&self) -> Result; @@ -1115,63 +1135,67 @@ impl InflightActivationStore for SqliteActivationStore { /// Update tasks that are in processing and have exceeded their processing deadline. #[instrument(skip_all)] - async fn handle_processing_deadline(&self) -> Result { + async fn handle_processing_deadline(&self) -> Result { let now = Utc::now(); let mut atomic = self.write_pool.begin().await?; // Idempotent tasks that fail their processing deadlines go directly to failure - let most_once_result = sqlx::query( + let amo = sqlx::query( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1 - WHERE processing_deadline < $2 AND at_most_once = TRUE AND status = $3", + SET processing_deadline = null, + status = $1 + WHERE processing_deadline < $2 + AND at_most_once = TRUE + AND status = $3", ) .bind(InflightActivationStatus::Failure) .bind(now.timestamp()) .bind(InflightActivationStatus::Processing) .execute(&mut *atomic) - .await; - - let mut processing_deadline_modified_rows = 0; - if let Ok(query_res) = most_once_result { - processing_deadline_modified_rows = query_res.rows_affected(); - } + .await?; - // Revert non-AMO activations that weren't delivered back to pending without consuming an attempt + // Revert activations that weren't delivered back to 'pending' without consuming an attempt let unsent = sqlx::query( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1, sent = 0 - WHERE processing_deadline < $2 AND sent = 0 AND status = $3", + SET processing_deadline = null, + status = $1, + sent = FALSE + WHERE processing_deadline < $2 + AND sent = FALSE + AND at_most_once = FALSE + AND status = $3", ) .bind(InflightActivationStatus::Pending) .bind(now.timestamp()) .bind(InflightActivationStatus::Processing) .execute(&mut *atomic) - .await; - - if let Ok(query_res) = unsent { - processing_deadline_modified_rows += query_res.rows_affected(); - } + .await?; // Revert activations that were delivered back to 'pending' AND consume an attempt - let result = sqlx::query( + let sent = sqlx::query( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1, sent = 0 - WHERE processing_deadline < $2 AND sent = 1 AND status = $3", + SET processing_deadline = null, + status = $1, + processing_attempts = processing_attempts + 1, + sent = FALSE + WHERE processing_deadline < $2 + AND sent = TRUE + AND at_most_once = FALSE + AND status = $3", ) .bind(InflightActivationStatus::Pending) .bind(now.timestamp()) .bind(InflightActivationStatus::Processing) .execute(&mut *atomic) - .await; + .await?; atomic.commit().await?; - if let Ok(query_res) = result { - processing_deadline_modified_rows += query_res.rows_affected(); - return Ok(processing_deadline_modified_rows); - } - - Err(anyhow!("Could not update tasks past processing_deadline")) + Ok(ProcessingDeadlineCounts { + at_most_once: amo.rows_affected(), + non_amo_unsent: unsent.rows_affected(), + non_amo_sent: sent.rows_affected(), + }) } /// Update tasks that have exceeded their max processing attempts. diff --git a/src/store/inflight_activation_tests.rs b/src/store/inflight_activation_tests.rs index 0b46a96b..02248ff9 100644 --- a/src/store/inflight_activation_tests.rs +++ b/src/store/inflight_activation_tests.rs @@ -10,7 +10,8 @@ use crate::{ config::Config, store::inflight_activation::{ InflightActivationBuilder, InflightActivationStatus, InflightActivationStore, - InflightActivationStoreConfig, QueryResult, SqliteActivationStore, create_sqlite_pool, + InflightActivationStoreConfig, ProcessingDeadlineCounts, QueryResult, + SqliteActivationStore, create_sqlite_pool, }, store::postgres_activation_store::build_pg_connect_options, test_utils::{ @@ -966,7 +967,14 @@ async fn test_handle_processing_deadline(#[case] adapter: &str) { let count = store.handle_processing_deadline().await; assert!(count.is_ok()); - assert_eq!(count.unwrap(), 1); + assert_eq!( + count.unwrap(), + ProcessingDeadlineCounts { + at_most_once: 0, + non_amo_unsent: 0, + non_amo_sent: 1, + } + ); assert_counts( StatusCount { pending: 2, @@ -982,7 +990,7 @@ async fn test_handle_processing_deadline(#[case] adapter: &str) { // Run again to check early return let count = store.handle_processing_deadline().await; assert!(count.is_ok()); - assert_eq!(count.unwrap(), 0); + assert_eq!(count.unwrap(), ProcessingDeadlineCounts::default()); store.remove_db().await.unwrap(); } @@ -1011,7 +1019,14 @@ async fn test_handle_processing_deadline_multiple_tasks(#[case] adapter: &str) { let count = store.handle_processing_deadline().await; assert!(count.is_ok()); - assert_eq!(count.unwrap(), 1); + assert_eq!( + count.unwrap(), + ProcessingDeadlineCounts { + at_most_once: 0, + non_amo_unsent: 0, + non_amo_sent: 1, + } + ); assert_counts( StatusCount { pending: 1, @@ -1064,7 +1079,14 @@ async fn test_handle_processing_at_most_once(#[case] adapter: &str) { let count = store.handle_processing_deadline().await; assert!(count.is_ok()); - assert_eq!(count.unwrap(), 2); + assert_eq!( + count.unwrap(), + ProcessingDeadlineCounts { + at_most_once: 1, + non_amo_unsent: 0, + non_amo_sent: 1, + } + ); assert_counts( StatusCount { pending: 1, @@ -1115,7 +1137,14 @@ async fn test_handle_processing_deadline_discard_after(#[case] adapter: &str) { let count = store.handle_processing_deadline().await; assert!(count.is_ok()); - assert_eq!(count.unwrap(), 1); + assert_eq!( + count.unwrap(), + ProcessingDeadlineCounts { + at_most_once: 0, + non_amo_unsent: 0, + non_amo_sent: 1, + } + ); assert_counts( StatusCount { pending: 2, @@ -1162,7 +1191,14 @@ async fn test_handle_processing_deadline_deadletter_after(#[case] adapter: &str) let count = store.handle_processing_deadline().await; assert!(count.is_ok()); - assert_eq!(count.unwrap(), 1); + assert_eq!( + count.unwrap(), + ProcessingDeadlineCounts { + at_most_once: 0, + non_amo_unsent: 0, + non_amo_sent: 1, + } + ); assert_counts( StatusCount { pending: 2, @@ -1208,7 +1244,14 @@ async fn test_handle_processing_deadline_no_retries_remaining(#[case] adapter: & let count = store.handle_processing_deadline().await; assert!(count.is_ok()); - assert_eq!(count.unwrap(), 1); + assert_eq!( + count.unwrap(), + ProcessingDeadlineCounts { + at_most_once: 0, + non_amo_unsent: 0, + non_amo_sent: 1, + } + ); assert_counts( StatusCount { processing: 0, @@ -1233,7 +1276,14 @@ async fn test_handle_processing_deadline_unsent_no_attempt_increment(#[case] ada batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); assert!(store.store(batch.clone()).await.is_ok()); let count = store.handle_processing_deadline().await.unwrap(); - assert_eq!(count, 1); + assert_eq!( + count, + ProcessingDeadlineCounts { + at_most_once: 0, + non_amo_unsent: 1, + non_amo_sent: 0, + } + ); let task = store.get_by_id(&batch[0].id).await.unwrap().unwrap(); assert_eq!(task.status, InflightActivationStatus::Pending); assert_eq!(task.processing_attempts, 0); @@ -1254,7 +1304,14 @@ async fn test_handle_processing_deadline_at_most_once_unsent_failure(#[case] ada batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); assert!(store.store(batch.clone()).await.is_ok()); let count = store.handle_processing_deadline().await.unwrap(); - assert_eq!(count, 1); + assert_eq!( + count, + ProcessingDeadlineCounts { + at_most_once: 1, + non_amo_unsent: 0, + non_amo_sent: 0, + } + ); let task = store.get_by_id(&batch[0].id).await.unwrap().unwrap(); assert_eq!(task.status, InflightActivationStatus::Failure); assert!(!task.sent); diff --git a/src/store/postgres_activation_store.rs b/src/store/postgres_activation_store.rs index e10e67c1..c8625f16 100644 --- a/src/store/postgres_activation_store.rs +++ b/src/store/postgres_activation_store.rs @@ -1,6 +1,6 @@ use crate::store::inflight_activation::{ BucketRange, DepthCounts, FailedTasksForwarder, InflightActivation, InflightActivationStatus, - InflightActivationStore, QueryResult, TableRow, + InflightActivationStore, ProcessingDeadlineCounts, QueryResult, TableRow, }; use anyhow::{Error, anyhow}; use async_trait::async_trait; @@ -521,60 +521,67 @@ impl InflightActivationStore for PostgresActivationStore { /// Update tasks that are in processing and have exceeded their processing deadline. #[instrument(skip_all)] - async fn handle_processing_deadline(&self) -> Result { + async fn handle_processing_deadline(&self) -> Result { let now = Utc::now(); let mut atomic = self.write_pool.begin().await?; - let most_once_result = sqlx::query( + // Idempotent tasks that fail their processing deadlines go directly to failure + let amo = sqlx::query( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1 - WHERE processing_deadline < $2 AND at_most_once = TRUE AND status = $3", + SET processing_deadline = null, + status = $1 + WHERE processing_deadline < $2 + AND at_most_once = TRUE + AND status = $3", ) .bind(InflightActivationStatus::Failure.to_string()) .bind(now) .bind(InflightActivationStatus::Processing.to_string()) .execute(&mut *atomic) - .await; - - let mut processing_deadline_modified_rows = 0; - if let Ok(query_res) = most_once_result { - processing_deadline_modified_rows = query_res.rows_affected(); - } + .await?; + // Revert activations that weren't delivered back to 'pending' without consuming an attempt let unsent = sqlx::query( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1, sent = FALSE - WHERE processing_deadline < $2 AND sent = FALSE AND status = $3", + SET processing_deadline = null, + status = $1, + sent = FALSE + WHERE processing_deadline < $2 + AND sent = FALSE + AND at_most_once = FALSE + AND status = $3", ) .bind(InflightActivationStatus::Pending.to_string()) .bind(now) .bind(InflightActivationStatus::Processing.to_string()) .execute(&mut *atomic) - .await; - - if let Ok(query_res) = unsent { - processing_deadline_modified_rows += query_res.rows_affected(); - } + .await?; - let result = sqlx::query( + // Revert activations that were delivered back to 'pending' AND consume an attempt + let sent = sqlx::query( "UPDATE inflight_taskactivations - SET processing_deadline = null, status = $1, processing_attempts = processing_attempts + 1, sent = FALSE - WHERE processing_deadline < $2 AND sent = TRUE AND status = $3", + SET processing_deadline = null, + status = $1, + processing_attempts = processing_attempts + 1, + sent = FALSE + WHERE processing_deadline < $2 + AND sent = TRUE + AND at_most_once = FALSE + AND status = $3", ) .bind(InflightActivationStatus::Pending.to_string()) .bind(now) .bind(InflightActivationStatus::Processing.to_string()) .execute(&mut *atomic) - .await; + .await?; atomic.commit().await?; - if let Ok(query_res) = result { - processing_deadline_modified_rows += query_res.rows_affected(); - return Ok(processing_deadline_modified_rows); - } - - Err(anyhow!("Could not update tasks past processing_deadline")) + Ok(ProcessingDeadlineCounts { + at_most_once: amo.rows_affected(), + non_amo_unsent: unsent.rows_affected(), + non_amo_sent: sent.rows_affected(), + }) } /// Update tasks that have exceeded their max processing attempts. diff --git a/src/upkeep.rs b/src/upkeep.rs index cdd19b29..abc87515 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -18,7 +18,7 @@ use uuid::Uuid; use crate::SERVICE_NAME; use crate::config::Config; use crate::runtime_config::RuntimeConfigManager; -use crate::store::inflight_activation::InflightActivationStore; +use crate::store::inflight_activation::{InflightActivationStore, ProcessingDeadlineCounts}; /// The upkeep task that periodically performs upkeep /// on the inflight store @@ -67,7 +67,7 @@ pub async fn upkeep( #[derive(Debug)] pub struct UpkeepResults { retried: u64, - processing_deadline_reset: u64, + processing_deadline_reset: ProcessingDeadlineCounts, processing_attempts_exceeded: u64, delay_elapsed: u64, expired: u64, @@ -85,7 +85,7 @@ pub struct UpkeepResults { impl UpkeepResults { fn empty(&self) -> bool { self.retried == 0 - && self.processing_deadline_reset == 0 + && self.processing_deadline_reset.total() == 0 && self.processing_attempts_exceeded == 0 && self.expired == 0 && self.completed == 0 @@ -115,7 +115,7 @@ pub async fn do_upkeep( let upkeep_start = Instant::now(); let mut result_context = UpkeepResults { retried: 0, - processing_deadline_reset: 0, + processing_deadline_reset: ProcessingDeadlineCounts::default(), processing_attempts_exceeded: 0, delay_elapsed: 0, expired: 0, @@ -299,7 +299,7 @@ pub async fn do_upkeep( .expect("Could not create kafka producer in upkeep"), ); if let Ok(tasks) = store - .claim_activations(None, Some(&demoted_namespaces), None, None, true) + .claim_activations(None, Some(&demoted_namespaces), None, None, false) .await { // Produce tasks to Kafka with updated namespace @@ -379,11 +379,15 @@ pub async fn do_upkeep( } if !result_context.empty() { + let processing_deadline_reset = result_context.processing_deadline_reset; + debug!( result_context.completed, result_context.deadlettered, result_context.discarded, - result_context.processing_deadline_reset, + processing_deadline_amo = processing_deadline_reset.at_most_once, + processing_deadline_non_amo_unsent = processing_deadline_reset.non_amo_unsent, + processing_deadline_non_amo_sent = processing_deadline_reset.non_amo_sent, result_context.processing_attempts_exceeded, result_context.expired, result_context.retried, @@ -413,11 +417,28 @@ pub async fn do_upkeep( .increment(result_context.discarded); metrics::counter!("upkeep.cleanup_action", "kind" => "mark_processing_attempts_exceeded_as_failure") .increment(result_context.processing_attempts_exceeded); - metrics::counter!("upkeep.cleanup_action", "kind" => "mark_processing_deadline_exceeded_as_failure") - .increment(result_context.processing_deadline_reset); metrics::counter!("upkeep.cleanup_action", "kind" => "mark_delay_elapsed_as_pending") .increment(result_context.delay_elapsed); + // Processing deadlines + metrics::counter!( + "upkeep.cleanup_action", + "kind" => "mark_processing_deadline_amo_failure" + ) + .increment(result_context.processing_deadline_reset.at_most_once); + + metrics::counter!( + "upkeep.cleanup_action", + "kind" => "mark_processing_deadline_non_amo_unsent" + ) + .increment(result_context.processing_deadline_reset.non_amo_unsent); + + metrics::counter!( + "upkeep.cleanup_action", + "kind" => "mark_processing_deadline_non_amo_sent" + ) + .increment(result_context.processing_deadline_reset.non_amo_sent); + // Forwarded tasks metrics::counter!("upkeep.forwarded_tasks").increment(result_context.forwarded); @@ -510,7 +531,7 @@ mod tests { use crate::{ config::Config, runtime_config::RuntimeConfigManager, - store::inflight_activation::InflightActivationStatus, + store::inflight_activation::{InflightActivationStatus, ProcessingDeadlineCounts}, test_utils::{ StatusCount, assert_counts, consume_topic, create_config, create_integration_config, create_integration_config_with_topic, create_producer, create_test_store, @@ -773,7 +794,7 @@ mod tests { store.as_ref(), ) .await; - assert_eq!(result_context.processing_deadline_reset, 0); + assert_eq!(result_context.processing_deadline_reset.total(), 0); } #[tokio::test] @@ -823,7 +844,14 @@ mod tests { .await; // 0 processing, 2 pending now - assert_eq!(result_context.processing_deadline_reset, 1); + assert_eq!( + result_context.processing_deadline_reset, + ProcessingDeadlineCounts { + at_most_once: 0, + non_amo_unsent: 1, + non_amo_sent: 0, + } + ); assert_counts( StatusCount { processing: 0, From 1d248a1b08d5a0608f20c63fdbf0ba920131e224 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Fri, 3 Apr 2026 13:23:48 -0700 Subject: [PATCH 03/11] Split Postgres Changes into Migrations --- pg_migrations/0001_create_inflight_activations.sql | 8 +------- pg_migrations/0002_add_bucket.sql | 3 +++ pg_migrations/0003_add_sent.sql | 4 ++++ 3 files changed, 8 insertions(+), 7 deletions(-) create mode 100644 pg_migrations/0002_add_bucket.sql create mode 100644 pg_migrations/0003_add_sent.sql diff --git a/pg_migrations/0001_create_inflight_activations.sql b/pg_migrations/0001_create_inflight_activations.sql index 7b31baa1..ee8b26a4 100644 --- a/pg_migrations/0001_create_inflight_activations.sql +++ b/pg_migrations/0001_create_inflight_activations.sql @@ -16,11 +16,5 @@ CREATE TABLE IF NOT EXISTS inflight_taskactivations ( application TEXT NOT NULL, namespace TEXT NOT NULL, taskname TEXT NOT NULL, - on_attempts_exceeded INTEGER NOT NULL DEFAULT 1, - bucket SMALLINT NOT NULL DEFAULT 0, - sent BOOLEAN NOT NULL DEFAULT FALSE + on_attempts_exceeded INTEGER NOT NULL DEFAULT 1 ); - --- Supports pending claim queries (status, filters, ordering) including sent -CREATE INDEX IF NOT EXISTS idx_inflight_taskactivations_claim -ON inflight_taskactivations (status, bucket, sent); diff --git a/pg_migrations/0002_add_bucket.sql b/pg_migrations/0002_add_bucket.sql new file mode 100644 index 00000000..6283c667 --- /dev/null +++ b/pg_migrations/0002_add_bucket.sql @@ -0,0 +1,3 @@ +ALTER TABLE inflight_taskactivations ADD COLUMN bucket SMALLINT NOT NULL DEFAULT 0; + +CREATE INDEX idx_activation_claim ON inflight_taskactivations (status, bucket); diff --git a/pg_migrations/0003_add_sent.sql b/pg_migrations/0003_add_sent.sql new file mode 100644 index 00000000..05fa6384 --- /dev/null +++ b/pg_migrations/0003_add_sent.sql @@ -0,0 +1,4 @@ +ALTER TABLE inflight_taskactivations ADD COLUMN sent BOOLEAN NOT NULL DEFAULT FALSE; + +DROP INDEX IF EXISTS idx_activation_claim; +CREATE INDEX idx_activation_claim ON inflight_taskactivations (status, bucket, sent); From 688dc04dc86dbafb3f39ad080e3f72305464c814 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Fri, 3 Apr 2026 13:38:51 -0700 Subject: [PATCH 04/11] Handle Claim One Invariant Gracefully --- src/grpc/server.rs | 1 + src/store/inflight_activation.rs | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/grpc/server.rs b/src/grpc/server.rs index bd2db13d..727d8cfb 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -137,6 +137,7 @@ impl ConsumerService for TaskbrokerServer { Err(Status::internal("Unable to fetch next task")) } + // If we return an error, the worker will place the result back in its internal queue and send the update again in the future, which is not desired Ok(None) => Ok(Response::new(SetTaskStatusResponse { task: None })), Ok(Some(inflight)) => { diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 7600c3ad..27df2100 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -458,9 +458,11 @@ pub trait InflightActivationStore: Send + Sync { .await?; // If we are getting more than one task here, something is broken - assert!(rows.len() <= 1); - - Ok(rows.pop()) + if rows.len() > 1 { + Err(anyhow!("Found more than one row despite limit of one")) + } else { + Ok(rows.pop()) + } } /// Record successful push. From 56d2efbf2324e25782053f1bcee50a79d83ab270 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Tue, 7 Apr 2026 11:14:18 -0700 Subject: [PATCH 05/11] Replace Sent Flag w/Sending Status --- migrations/0006_add_sent.sql | 1 - pg_migrations/0003_add_sent.sql | 4 -- src/fetch/tests.rs | 4 +- src/grpc/server_tests.rs | 1 - src/kafka/deserialize_activation.rs | 1 - src/store/inflight_activation.rs | 76 +++++++++++++------------- src/store/inflight_activation_tests.rs | 55 ++++++++++--------- src/store/postgres_activation_store.rs | 34 +++++------- src/test_utils.rs | 9 +++ src/upkeep.rs | 40 +++++++++++--- 10 files changed, 125 insertions(+), 100 deletions(-) delete mode 100644 migrations/0006_add_sent.sql delete mode 100644 pg_migrations/0003_add_sent.sql diff --git a/migrations/0006_add_sent.sql b/migrations/0006_add_sent.sql deleted file mode 100644 index f0acabc0..00000000 --- a/migrations/0006_add_sent.sql +++ /dev/null @@ -1 +0,0 @@ -ALTER TABLE inflight_taskactivations ADD COLUMN sent INTEGER NOT NULL DEFAULT 0; diff --git a/pg_migrations/0003_add_sent.sql b/pg_migrations/0003_add_sent.sql deleted file mode 100644 index 05fa6384..00000000 --- a/pg_migrations/0003_add_sent.sql +++ /dev/null @@ -1,4 +0,0 @@ -ALTER TABLE inflight_taskactivations ADD COLUMN sent BOOLEAN NOT NULL DEFAULT FALSE; - -DROP INDEX IF EXISTS idx_activation_claim; -CREATE INDEX idx_activation_claim ON inflight_taskactivations (status, bucket, sent); diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index 2fb34f4c..558a7ba1 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -76,7 +76,7 @@ impl InflightActivationStore for MockStore { _namespaces: Option<&[String]>, _limit: Option, _bucket: Option, - mark_sent: bool, + status: InflightActivationStatus, ) -> Result, Error> { if self.fail { return Err(anyhow!("mock store error")); @@ -84,7 +84,7 @@ impl InflightActivationStore for MockStore { Ok(match self.pending.lock().await.take() { Some(mut a) => { - a.sent = mark_sent; + a.status = status; vec![a] } None => vec![], diff --git a/src/grpc/server_tests.rs b/src/grpc/server_tests.rs index 4dd6be36..2bd1f768 100644 --- a/src/grpc/server_tests.rs +++ b/src/grpc/server_tests.rs @@ -128,7 +128,6 @@ async fn test_get_task_success(#[case] adapter: &str) { assert!(task.id == "id_0"); let row = store.get_by_id("id_0").await.unwrap().expect("claimed row"); - assert!(row.sent); assert_eq!(row.status, InflightActivationStatus::Processing); } diff --git a/src/kafka/deserialize_activation.rs b/src/kafka/deserialize_activation.rs index 1949f573..28485606 100644 --- a/src/kafka/deserialize_activation.rs +++ b/src/kafka/deserialize_activation.rs @@ -108,7 +108,6 @@ pub fn new( taskname, on_attempts_exceeded, bucket, - sent: false, }) } } diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 27df2100..c6252b58 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -37,6 +37,7 @@ pub enum InflightActivationStatus { /// Unused but necessary to align with sentry-protos Unspecified, Pending, + Sending, Processing, Failure, Retry, @@ -58,6 +59,8 @@ impl FromStr for InflightActivationStatus { Ok(InflightActivationStatus::Unspecified) } else if s == "Pending" { Ok(InflightActivationStatus::Pending) + } else if s == "Sending" { + Ok(InflightActivationStatus::Sending) } else if s == "Processing" { Ok(InflightActivationStatus::Processing) } else if s == "Failure" { @@ -180,10 +183,6 @@ pub struct InflightActivation { /// Bucket derived from activation ID (UUID as number % 256). Set once on ingestion. #[builder(setter(skip), default = "0")] pub bucket: i16, - - /// True after successful push. - #[builder(default = false)] - pub sent: bool, } /// Counts how many tasks were changed from 'processing' to 'pending' by `handle_processing_deadline`. @@ -267,7 +266,6 @@ pub struct TableRow { #[sqlx(try_from = "i32")] pub on_attempts_exceeded: OnAttemptsExceeded, pub bucket: i16, - pub sent: bool, } impl TryFrom for TableRow { @@ -293,7 +291,6 @@ impl TryFrom for TableRow { taskname: value.taskname, on_attempts_exceeded: value.on_attempts_exceeded, bucket: value.bucket, - sent: value.sent, }) } } @@ -319,7 +316,6 @@ impl From for InflightActivation { taskname: value.taskname, on_attempts_exceeded: value.on_attempts_exceeded, bucket: value.bucket, - sent: value.sent, } } } @@ -401,7 +397,7 @@ pub trait InflightActivationStore: Send + Sync { /// Store a batch of activations async fn store(&self, batch: Vec) -> Result; - /// Get `limit` pending activations, optionally filtered by namespaces and bucket subrange. + /// Get `limit` pending activations, optionally filtered by namespaces and bucket subrange, and update them to have status `status`. /// If no limit is provided, all matching activations will be returned. async fn claim_activations( &self, @@ -409,10 +405,10 @@ pub trait InflightActivationStore: Send + Sync { namespaces: Option<&[String]>, limit: Option, bucket: Option, - sent: bool, + status: InflightActivationStatus, ) -> Result, Error>; - /// Claims `limit` activations within the `bucket` range. Column `sent` remains false until `mark_activation_sent` is called. + /// Claims `limit` activations within the `bucket` range. Push mode uses status `Sending` until `mark_activation_sent` moves to `Processing`. async fn claim_activations_for_push( &self, application: Option<&str>, @@ -430,8 +426,14 @@ pub trait InflightActivationStore: Send + Sync { return Ok(vec![]); } - self.claim_activations(application, namespaces, limit, bucket, false) - .await + self.claim_activations( + application, + namespaces, + limit, + bucket, + InflightActivationStatus::Sending, + ) + .await } /// Claims `limit` activations with application `application` and namespace `namespace`. @@ -454,7 +456,13 @@ pub trait InflightActivationStore: Send + Sync { } let mut rows = self - .claim_activations(application, namespaces.as_deref(), Some(1), None, true) + .claim_activations( + application, + namespaces.as_deref(), + Some(1), + None, + InflightActivationStatus::Processing, + ) .await?; // If we are getting more than one task here, something is broken @@ -798,8 +806,7 @@ impl InflightActivationStore for SqliteActivationStore { namespace, taskname, on_attempts_exceeded, - bucket, - sent + bucket FROM inflight_taskactivations WHERE id = $1 ", @@ -841,8 +848,7 @@ impl InflightActivationStore for SqliteActivationStore { namespace, taskname, on_attempts_exceeded, - bucket, - sent + bucket ) ", ); @@ -876,7 +882,6 @@ impl InflightActivationStore for SqliteActivationStore { b.push_bind(row.taskname); b.push_bind(row.on_attempts_exceeded as i32); b.push_bind(row.bucket); - b.push_bind(row.sent); }) .push(" ON CONFLICT(id) DO NOTHING") .build(); @@ -912,7 +917,7 @@ impl InflightActivationStore for SqliteActivationStore { namespaces: Option<&[String]>, limit: Option, bucket: Option, - sent: bool, + status: InflightActivationStatus, ) -> Result, Error> { let now = Utc::now(); @@ -925,9 +930,7 @@ impl InflightActivationStore for SqliteActivationStore { ), status = " )); - query_builder.push_bind(InflightActivationStatus::Processing); - query_builder.push(", sent = "); - query_builder.push_bind(sent); + query_builder.push_bind(status); query_builder.push( " WHERE id IN ( @@ -982,11 +985,14 @@ impl InflightActivationStore for SqliteActivationStore { let mut conn = self .acquire_write_conn_metric("mark_activation_sent") .await?; - sqlx::query("UPDATE inflight_taskactivations SET sent = 1 WHERE id = $1 AND status = $2") - .bind(id) - .bind(InflightActivationStatus::Processing) - .execute(&mut *conn) - .await?; + sqlx::query( + "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 AND status = $3", + ) + .bind(InflightActivationStatus::Processing) + .bind(id) + .bind(InflightActivationStatus::Sending) + .execute(&mut *conn) + .await?; Ok(()) } @@ -1113,8 +1119,7 @@ impl InflightActivationStore for SqliteActivationStore { namespace, taskname, on_attempts_exceeded, - bucket, - sent + bucket FROM inflight_taskactivations WHERE status = $1 ", @@ -1148,11 +1153,12 @@ impl InflightActivationStore for SqliteActivationStore { status = $1 WHERE processing_deadline < $2 AND at_most_once = TRUE - AND status = $3", + AND (status = $3 OR status = $4)", ) .bind(InflightActivationStatus::Failure) .bind(now.timestamp()) .bind(InflightActivationStatus::Processing) + .bind(InflightActivationStatus::Sending) .execute(&mut *atomic) .await?; @@ -1160,16 +1166,14 @@ impl InflightActivationStore for SqliteActivationStore { let unsent = sqlx::query( "UPDATE inflight_taskactivations SET processing_deadline = null, - status = $1, - sent = FALSE + status = $1 WHERE processing_deadline < $2 - AND sent = FALSE AND at_most_once = FALSE AND status = $3", ) .bind(InflightActivationStatus::Pending) .bind(now.timestamp()) - .bind(InflightActivationStatus::Processing) + .bind(InflightActivationStatus::Sending) .execute(&mut *atomic) .await?; @@ -1178,10 +1182,8 @@ impl InflightActivationStore for SqliteActivationStore { "UPDATE inflight_taskactivations SET processing_deadline = null, status = $1, - processing_attempts = processing_attempts + 1, - sent = FALSE + processing_attempts = processing_attempts + 1 WHERE processing_deadline < $2 - AND sent = TRUE AND at_most_once = FALSE AND status = $3", ) diff --git a/src/store/inflight_activation_tests.rs b/src/store/inflight_activation_tests.rs index 02248ff9..41f7b3f6 100644 --- a/src/store/inflight_activation_tests.rs +++ b/src/store/inflight_activation_tests.rs @@ -409,17 +409,23 @@ async fn test_get_pending_activation_from_multiple_namespaces(#[case] adapter: & // Use `claim_activations` so upkeep-style `None` application + namespaces is allowed (not `claim_activations_for_push`). let namespaces = vec!["ns2".to_string(), "ns3".to_string()]; let result = store - .claim_activations(None, Some(&namespaces), None, None, false) + .claim_activations( + None, + Some(&namespaces), + None, + None, + InflightActivationStatus::Sending, + ) .await .unwrap(); assert_eq!(result.len(), 2); assert_eq!(result[1].id, "id_2"); assert_eq!(result[1].namespace, "ns3"); - assert_eq!(result[1].status, InflightActivationStatus::Processing); + assert_eq!(result[1].status, InflightActivationStatus::Sending); assert_eq!(result[0].id, "id_1"); assert_eq!(result[0].namespace, "ns2"); - assert_eq!(result[0].status, InflightActivationStatus::Processing); + assert_eq!(result[0].status, InflightActivationStatus::Sending); store.remove_db().await.unwrap(); } @@ -445,7 +451,13 @@ async fn test_get_pending_activation_with_namespace_requires_application(#[case] // We allow no application in this method because of usage in upkeep let namespaces = vec!["other_namespace".to_string()]; let activations = store - .claim_activations(None, Some(&namespaces), Some(2), None, false) + .claim_activations( + None, + Some(&namespaces), + Some(2), + None, + InflightActivationStatus::Sending, + ) .await .unwrap(); assert_eq!( @@ -636,13 +648,13 @@ async fn test_get_pending_activations_no_limit(#[case] adapter: &str) { assert_eq!(got.len(), N); assert!( got.iter() - .all(|a| a.status == InflightActivationStatus::Processing) + .all(|a| a.status == InflightActivationStatus::Sending) ); assert_eq!(store.count_pending_activations().await.unwrap(), 0); assert_counts( StatusCount { pending: 0, - processing: N, + sending: N, ..StatusCount::default() }, store.as_ref(), @@ -670,7 +682,7 @@ async fn test_get_pending_activations_limit_below_pending(#[case] adapter: &str) assert_eq!(got.len(), X as usize); assert!( got.iter() - .all(|a| a.status == InflightActivationStatus::Processing) + .all(|a| a.status == InflightActivationStatus::Sending) ); assert_eq!( store.count_pending_activations().await.unwrap(), @@ -679,7 +691,7 @@ async fn test_get_pending_activations_limit_below_pending(#[case] adapter: &str) assert_counts( StatusCount { pending: N - X as usize, - processing: X as usize, + sending: X as usize, ..StatusCount::default() }, store.as_ref(), @@ -707,13 +719,13 @@ async fn test_get_pending_activations_limit_above_pending(#[case] adapter: &str) assert_eq!(got.len(), Y); assert!( got.iter() - .all(|a| a.status == InflightActivationStatus::Processing) + .all(|a| a.status == InflightActivationStatus::Sending) ); assert_eq!(store.count_pending_activations().await.unwrap(), 0); assert_counts( StatusCount { pending: 0, - processing: Y, + sending: Y, ..StatusCount::default() }, store.as_ref(), @@ -951,7 +963,6 @@ async fn test_handle_processing_deadline(#[case] adapter: &str) { let mut batch = make_activations(2); batch[1].status = InflightActivationStatus::Processing; - batch[1].sent = true; batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); assert!(store.store(batch.clone()).await.is_ok()); @@ -1003,14 +1014,14 @@ async fn test_handle_processing_deadline_multiple_tasks(#[case] adapter: &str) { let mut batch = make_activations(2); batch[0].status = InflightActivationStatus::Processing; - batch[0].sent = true; batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); - batch[1].status = InflightActivationStatus::Processing; + batch[1].status = InflightActivationStatus::Sending; batch[1].processing_deadline = Some(Utc::now() + chrono::Duration::days(30)); assert!(store.store(batch).await.is_ok()); assert_counts( StatusCount { - processing: 2, + processing: 1, + sending: 1, ..StatusCount::default() }, store.as_ref(), @@ -1030,7 +1041,7 @@ async fn test_handle_processing_deadline_multiple_tasks(#[case] adapter: &str) { assert_counts( StatusCount { pending: 1, - processing: 1, + sending: 1, ..StatusCount::default() }, store.as_ref(), @@ -1049,11 +1060,10 @@ async fn test_handle_processing_at_most_once(#[case] adapter: &str) { // Both records are past processing deadlines let mut batch = make_activations(2); batch[0].status = InflightActivationStatus::Processing; - batch[0].sent = true; batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); batch[1].status = InflightActivationStatus::Processing; - batch[1].sent = true; + replace_retry_state( &mut batch[1], Some(RetryState { @@ -1111,7 +1121,6 @@ async fn test_handle_processing_deadline_discard_after(#[case] adapter: &str) { let mut batch = make_activations(2); batch[1].status = InflightActivationStatus::Processing; - batch[1].sent = true; batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); replace_retry_state( &mut batch[1], @@ -1165,7 +1174,6 @@ async fn test_handle_processing_deadline_deadletter_after(#[case] adapter: &str) let mut batch = make_activations(2); batch[1].status = InflightActivationStatus::Processing; - batch[1].sent = true; batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); replace_retry_state( &mut batch[1], @@ -1219,7 +1227,6 @@ async fn test_handle_processing_deadline_no_retries_remaining(#[case] adapter: & let mut batch = make_activations(2); batch[1].status = InflightActivationStatus::Processing; - batch[1].sent = true; batch[1].processing_deadline = Some(Utc.with_ymd_and_hms(2024, 11, 14, 21, 22, 23).unwrap()); replace_retry_state( &mut batch[1], @@ -1271,8 +1278,7 @@ async fn test_handle_processing_deadline_no_retries_remaining(#[case] adapter: & async fn test_handle_processing_deadline_unsent_no_attempt_increment(#[case] adapter: &str) { let store = create_test_store(adapter).await; let mut batch = make_activations(1); - batch[0].status = InflightActivationStatus::Processing; - batch[0].sent = false; + batch[0].status = InflightActivationStatus::Sending; batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); assert!(store.store(batch.clone()).await.is_ok()); let count = store.handle_processing_deadline().await.unwrap(); @@ -1287,7 +1293,6 @@ async fn test_handle_processing_deadline_unsent_no_attempt_increment(#[case] ada let task = store.get_by_id(&batch[0].id).await.unwrap().unwrap(); assert_eq!(task.status, InflightActivationStatus::Pending); assert_eq!(task.processing_attempts, 0); - assert!(!task.sent); store.remove_db().await.unwrap(); } @@ -1298,8 +1303,7 @@ async fn test_handle_processing_deadline_unsent_no_attempt_increment(#[case] ada async fn test_handle_processing_deadline_at_most_once_unsent_failure(#[case] adapter: &str) { let store = create_test_store(adapter).await; let mut batch = make_activations(1); - batch[0].status = InflightActivationStatus::Processing; - batch[0].sent = false; + batch[0].status = InflightActivationStatus::Sending; batch[0].at_most_once = true; batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); assert!(store.store(batch.clone()).await.is_ok()); @@ -1314,7 +1318,6 @@ async fn test_handle_processing_deadline_at_most_once_unsent_failure(#[case] ada ); let task = store.get_by_id(&batch[0].id).await.unwrap().unwrap(); assert_eq!(task.status, InflightActivationStatus::Failure); - assert!(!task.sent); store.remove_db().await.unwrap(); } diff --git a/src/store/postgres_activation_store.rs b/src/store/postgres_activation_store.rs index c8625f16..33e34cae 100644 --- a/src/store/postgres_activation_store.rs +++ b/src/store/postgres_activation_store.rs @@ -176,8 +176,7 @@ impl InflightActivationStore for PostgresActivationStore { namespace, taskname, on_attempts_exceeded, - bucket, - sent + bucket FROM inflight_taskactivations WHERE id = $1 ", @@ -219,8 +218,7 @@ impl InflightActivationStore for PostgresActivationStore { namespace, taskname, on_attempts_exceeded, - bucket, - sent + bucket ) ", ); @@ -253,7 +251,6 @@ impl InflightActivationStore for PostgresActivationStore { b.push_bind(row.taskname); b.push_bind(row.on_attempts_exceeded as i32); b.push_bind(row.bucket); - b.push_bind(row.sent); }) .push(" ON CONFLICT(id) DO NOTHING") .build(); @@ -268,7 +265,7 @@ impl InflightActivationStore for PostgresActivationStore { namespaces: Option<&[String]>, limit: Option, bucket: Option, - sent: bool, + status: InflightActivationStatus, ) -> Result, Error> { let now = Utc::now(); @@ -320,9 +317,7 @@ impl InflightActivationStore for PostgresActivationStore { processing_deadline = now() + (processing_deadline_duration * interval '1 second') + (interval '{grace_period} seconds'), status = " )); - query_builder.push_bind(InflightActivationStatus::Processing.to_string()); - query_builder.push(", sent = "); - query_builder.push_bind(sent); + query_builder.push_bind(status.to_string()); query_builder.push(" FROM selected_activations "); query_builder.push(" WHERE inflight_taskactivations.id = selected_activations.id"); query_builder.push(" RETURNING *, kafka_offset AS offset"); @@ -342,10 +337,11 @@ impl InflightActivationStore for PostgresActivationStore { .acquire_write_conn_metric("mark_activation_sent") .await?; sqlx::query( - "UPDATE inflight_taskactivations SET sent = TRUE WHERE id = $1 AND status = $2", + "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 AND status = $3", ) - .bind(id) .bind(InflightActivationStatus::Processing.to_string()) + .bind(id) + .bind(InflightActivationStatus::Sending.to_string()) .execute(&mut *conn) .await?; Ok(()) @@ -495,8 +491,7 @@ impl InflightActivationStore for PostgresActivationStore { namespace, taskname, on_attempts_exceeded, - bucket, - sent + bucket FROM inflight_taskactivations WHERE status = $1 ", @@ -532,11 +527,12 @@ impl InflightActivationStore for PostgresActivationStore { status = $1 WHERE processing_deadline < $2 AND at_most_once = TRUE - AND status = $3", + AND (status = $3 OR status = $4)", ) .bind(InflightActivationStatus::Failure.to_string()) .bind(now) .bind(InflightActivationStatus::Processing.to_string()) + .bind(InflightActivationStatus::Sending.to_string()) .execute(&mut *atomic) .await?; @@ -544,16 +540,14 @@ impl InflightActivationStore for PostgresActivationStore { let unsent = sqlx::query( "UPDATE inflight_taskactivations SET processing_deadline = null, - status = $1, - sent = FALSE + status = $1 WHERE processing_deadline < $2 - AND sent = FALSE AND at_most_once = FALSE AND status = $3", ) .bind(InflightActivationStatus::Pending.to_string()) .bind(now) - .bind(InflightActivationStatus::Processing.to_string()) + .bind(InflightActivationStatus::Sending.to_string()) .execute(&mut *atomic) .await?; @@ -562,10 +556,8 @@ impl InflightActivationStore for PostgresActivationStore { "UPDATE inflight_taskactivations SET processing_deadline = null, status = $1, - processing_attempts = processing_attempts + 1, - sent = FALSE + processing_attempts = processing_attempts + 1 WHERE processing_deadline < $2 - AND sent = TRUE AND at_most_once = FALSE AND status = $3", ) diff --git a/src/test_utils.rs b/src/test_utils.rs index 392b48c1..22931a1f 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -396,6 +396,7 @@ pub fn replace_retry_state(inflight: &mut InflightActivation, retry: Option Date: Thu, 9 Apr 2026 13:12:54 -0700 Subject: [PATCH 06/11] Mark Demoted Namespace Tasks as Sending, Log Error on No Pending Activations --- src/grpc/server.rs | 12 ++++++++---- src/kafka/inflight_activation_writer.rs | 4 +++- src/store/inflight_activation.rs | 7 ++++++- src/store/postgres_activation_store.rs | 9 ++++++--- src/upkeep.rs | 2 +- 5 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/grpc/server.rs b/src/grpc/server.rs index 727d8cfb..a9e13557 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -11,7 +11,7 @@ use tonic::{Request, Response, Status}; use crate::config::{Config, DeliveryMode}; use crate::store::inflight_activation::{InflightActivationStatus, InflightActivationStore}; -use tracing::{error, instrument}; +use tracing::{error, instrument, warn}; pub struct TaskbrokerServer { pub store: Arc, @@ -42,7 +42,7 @@ impl ConsumerService for TaskbrokerServer { .await; match inflight { - Ok(None) => Err(Status::not_found("No pending activation")), + Ok(None) => Err(Status::not_found("No pending activations")), Ok(Some(inflight)) => { let now = Utc::now(); @@ -137,8 +137,12 @@ impl ConsumerService for TaskbrokerServer { Err(Status::internal("Unable to fetch next task")) } - // If we return an error, the worker will place the result back in its internal queue and send the update again in the future, which is not desired - Ok(None) => Ok(Response::new(SetTaskStatusResponse { task: None })), + Ok(None) => { + warn!("No pending activations"); + + // If we return an error, the worker will place the result back in its internal queue and send the update again in the future, which is not desired + Ok(Response::new(SetTaskStatusResponse { task: None })) + } Ok(Some(inflight)) => { if inflight.processing_attempts < 1 { diff --git a/src/kafka/inflight_activation_writer.rs b/src/kafka/inflight_activation_writer.rs index 9bea3c1c..95ed59ec 100644 --- a/src/kafka/inflight_activation_writer.rs +++ b/src/kafka/inflight_activation_writer.rs @@ -85,6 +85,7 @@ impl Reducer for InflightActivationWriter { let DepthCounts { pending, delay, + sending, processing, } = self .store @@ -94,7 +95,8 @@ impl Reducer for InflightActivationWriter { let exceeded_pending_limit = pending + batch.len() > self.config.max_pending_activations; let exceeded_delay_limit = delay + batch.len() > self.config.max_delay_activations; - let exceeded_processing_limit = processing >= self.config.max_processing_activations; + let exceeded_processing_limit = + processing + sending >= self.config.max_processing_activations; let exceeded_db_size = if let Some(db_max_size) = self.config.db_max_size { self.store .db_size() diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index c6252b58..1a9c70e3 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -376,6 +376,9 @@ pub struct DepthCounts { /// Number of delayed tasks in the store. pub delay: usize, + /// The number of tasks being sent in the store. + pub sending: usize, + /// The number of processing tasks in the store. pub processing: usize, } @@ -494,15 +497,17 @@ pub trait InflightActivationStore: Send + Sync { /// Queue depths for pending, delay, and processing (writer backpressure and upkeep gauges). /// Default implementation uses separate calls, but stores may override with a single query. async fn count_depths(&self) -> Result { - let (pending, delay, processing) = join!( + let (pending, delay, sending, processing) = join!( self.count_by_status(InflightActivationStatus::Pending), self.count_by_status(InflightActivationStatus::Delay), + self.count_by_status(InflightActivationStatus::Sending), self.count_by_status(InflightActivationStatus::Processing), ); Ok(DepthCounts { pending: pending?, delay: delay?, + sending: sending?, processing: processing?, }) } diff --git a/src/store/postgres_activation_store.rs b/src/store/postgres_activation_store.rs index 33e34cae..d2709cc9 100644 --- a/src/store/postgres_activation_store.rs +++ b/src/store/postgres_activation_store.rs @@ -403,12 +403,14 @@ impl InflightActivationStore for PostgresActivationStore { let sql = " SELECT COUNT(*) FILTER (WHERE status = $1) AS pending, COUNT(*) FILTER (WHERE status = $2) AS delay, - COUNT(*) FILTER (WHERE status = $3) AS processing + COUNT(*) FILTER (WHERE status = $3) AS sending, + COUNT(*) FILTER (WHERE status = $4) AS processing FROM inflight_taskactivations"; - let row: (i64, i64, i64) = sqlx::query_as(sql) + let row: (i64, i64, i64, i64) = sqlx::query_as(sql) .bind(InflightActivationStatus::Pending.to_string()) .bind(InflightActivationStatus::Delay.to_string()) + .bind(InflightActivationStatus::Sending.to_string()) .bind(InflightActivationStatus::Processing.to_string()) .fetch_one(&self.read_pool) .await?; @@ -416,7 +418,8 @@ impl InflightActivationStore for PostgresActivationStore { Ok(DepthCounts { pending: row.0 as usize, delay: row.1 as usize, - processing: row.2 as usize, + sending: row.2 as usize, + processing: row.3 as usize, }) } diff --git a/src/upkeep.rs b/src/upkeep.rs index 27871f69..0f299695 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -306,7 +306,7 @@ pub async fn do_upkeep( Some(&demoted_namespaces), None, None, - InflightActivationStatus::Processing, + InflightActivationStatus::Sending, ) .await { From 991c4ee1ffb5ddbca96c861ebfd3ce1ed02c8986 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 9 Apr 2026 13:19:36 -0700 Subject: [PATCH 07/11] Emit Metrics for Sending Tasks --- src/upkeep.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/upkeep.rs b/src/upkeep.rs index 0f299695..7c296b09 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -76,6 +76,7 @@ pub struct UpkeepResults { completed: u64, failed: u64, pending: u32, + sending: u32, processing: u32, delay: u32, deadlettered: u64, @@ -124,6 +125,7 @@ pub async fn do_upkeep( completed: 0, failed: 0, pending: 0, + sending: 0, processing: 0, delay: 0, deadlettered: 0, @@ -383,6 +385,7 @@ pub async fn do_upkeep( if let Ok(depths) = depth_counts { result_context.pending = depths.pending as u32; result_context.delay = depths.delay as u32; + result_context.sending = depths.sending as u32; result_context.processing = depths.processing as u32; } @@ -400,6 +403,7 @@ pub async fn do_upkeep( result_context.expired, result_context.retried, result_context.pending, + result_context.sending, result_context.processing, result_context.delay, result_context.delay_elapsed, @@ -452,6 +456,7 @@ pub async fn do_upkeep( // State of inflight tasks metrics::gauge!("upkeep.current_pending_tasks").set(result_context.pending); + metrics::gauge!("upkeep.current_sending_tasks").set(result_context.sending); metrics::gauge!("upkeep.current_processing_tasks").set(result_context.processing); metrics::gauge!("upkeep.current_delayed_tasks").set(result_context.delay); metrics::gauge!("upkeep.pending_activation.max_lag.sec").set(max_lag); From 454e0c76adf9df01289f4b549fa14685bb6f0b18 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 9 Apr 2026 13:32:14 -0700 Subject: [PATCH 08/11] Add Sending Count to `UpkeepResults` Empty Calculation, Warn on No Activations Marked Sent --- src/store/inflight_activation.rs | 11 ++++++++++- src/upkeep.rs | 1 + 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 1a9c70e3..0fcf283b 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -990,7 +990,8 @@ impl InflightActivationStore for SqliteActivationStore { let mut conn = self .acquire_write_conn_metric("mark_activation_sent") .await?; - sqlx::query( + + let result = sqlx::query( "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 AND status = $3", ) .bind(InflightActivationStatus::Processing) @@ -998,6 +999,14 @@ impl InflightActivationStore for SqliteActivationStore { .bind(InflightActivationStatus::Sending) .execute(&mut *conn) .await?; + + if result.rows_affected() == 0 { + warn!( + task_id = %id, + "Activation could not be marked as sent, it may be missing or its status may have already changed" + ); + } + Ok(()) } diff --git a/src/upkeep.rs b/src/upkeep.rs index 7c296b09..8616bf29 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -94,6 +94,7 @@ impl UpkeepResults { && self.completed == 0 && self.failed == 0 && self.pending == 0 + && self.sending == 0 && self.processing == 0 && self.delay == 0 && self.discarded == 0 From 646242f8103634c260e6139457d50bae60ddaf84 Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Thu, 9 Apr 2026 13:51:49 -0700 Subject: [PATCH 09/11] Add Rows Affected Check to PSQL Mark Sent, Fix Unit Tests --- src/grpc/server_tests.rs | 2 +- src/store/postgres_activation_store.rs | 13 +++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/grpc/server_tests.rs b/src/grpc/server_tests.rs index 2bd1f768..4dea90b3 100644 --- a/src/grpc/server_tests.rs +++ b/src/grpc/server_tests.rs @@ -51,7 +51,7 @@ async fn test_get_task(#[case] adapter: &str) { assert!(response.is_err()); let e = response.unwrap_err(); assert_eq!(e.code(), Code::NotFound); - assert_eq!(e.message(), "No pending activation"); + assert_eq!(e.message(), "No pending activations"); } #[tokio::test] diff --git a/src/store/postgres_activation_store.rs b/src/store/postgres_activation_store.rs index d2709cc9..a68c9a98 100644 --- a/src/store/postgres_activation_store.rs +++ b/src/store/postgres_activation_store.rs @@ -12,7 +12,7 @@ use sqlx::{ postgres::{PgConnectOptions, PgPool, PgPoolOptions, PgRow}, }; use std::{str::FromStr, time::Instant}; -use tracing::instrument; +use tracing::{instrument, warn}; use crate::config::Config; @@ -336,7 +336,8 @@ impl InflightActivationStore for PostgresActivationStore { let mut conn = self .acquire_write_conn_metric("mark_activation_sent") .await?; - sqlx::query( + + let result = sqlx::query( "UPDATE inflight_taskactivations SET status = $1 WHERE id = $2 AND status = $3", ) .bind(InflightActivationStatus::Processing.to_string()) @@ -344,6 +345,14 @@ impl InflightActivationStore for PostgresActivationStore { .bind(InflightActivationStatus::Sending.to_string()) .execute(&mut *conn) .await?; + + if result.rows_affected() == 0 { + warn!( + task_id = %id, + "Activation could not be marked as sent, it may be missing or its status may have already changed" + ); + } + Ok(()) } From d573743cad495245e1f99ee57fe41d495616b9fd Mon Sep 17 00:00:00 2001 From: james-mcnulty Date: Fri, 10 Apr 2026 11:40:52 -0700 Subject: [PATCH 10/11] Rename `Sending` to `Claimed` --- src/fetch/tests.rs | 2 +- src/kafka/inflight_activation_writer.rs | 4 +-- src/push/mod.rs | 12 ++++---- src/store/inflight_activation.rs | 38 ++++++++++++------------- src/store/inflight_activation_tests.rs | 30 +++++++++---------- src/store/postgres_activation_store.rs | 18 ++++++------ src/test_utils.rs | 8 +++--- src/upkeep.rs | 14 ++++----- 8 files changed, 64 insertions(+), 62 deletions(-) diff --git a/src/fetch/tests.rs b/src/fetch/tests.rs index 558a7ba1..68ff1fda 100644 --- a/src/fetch/tests.rs +++ b/src/fetch/tests.rs @@ -91,7 +91,7 @@ impl InflightActivationStore for MockStore { }) } - async fn mark_activation_sent(&self, _id: &str) -> Result<(), Error> { + async fn mark_activation_processing(&self, _id: &str) -> Result<(), Error> { Ok(()) } diff --git a/src/kafka/inflight_activation_writer.rs b/src/kafka/inflight_activation_writer.rs index 95ed59ec..f84c3ff2 100644 --- a/src/kafka/inflight_activation_writer.rs +++ b/src/kafka/inflight_activation_writer.rs @@ -85,7 +85,7 @@ impl Reducer for InflightActivationWriter { let DepthCounts { pending, delay, - sending, + claimed, processing, } = self .store @@ -96,7 +96,7 @@ impl Reducer for InflightActivationWriter { let exceeded_pending_limit = pending + batch.len() > self.config.max_pending_activations; let exceeded_delay_limit = delay + batch.len() > self.config.max_delay_activations; let exceeded_processing_limit = - processing + sending >= self.config.max_processing_activations; + processing + claimed >= self.config.max_processing_activations; let exceeded_db_size = if let Some(db_max_size) = self.config.db_max_size { self.store .db_size() diff --git a/src/push/mod.rs b/src/push/mod.rs index 45ba3a51..50089044 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -109,8 +109,9 @@ impl PushPool { /// Spawn `config.push_threads` asynchronous tasks, each of which repeatedly moves pending activations from the channel to the worker service until the shutdown signal is received. pub async fn start(&self) -> Result<()> { let store = self.store.clone(); - let mut push_pool: JoinSet> = - crate::tokio::spawn_pool(self.config.push_threads, |_| { + let mut push_pool: JoinSet> = crate::tokio::spawn_pool( + self.config.push_threads, + |_| { let endpoint = self.config.worker_endpoint.clone(); let receiver = self.receiver.clone(); let store = store.clone(); @@ -167,7 +168,7 @@ impl PushPool { Ok(_) => { debug!(task_id = %id, "Activation sent to worker"); - if let Err(e) = store.mark_activation_sent(&id).await { + if let Err(e) = store.mark_activation_processing(&id).await { error!( task_id = %id, error = ?e, @@ -204,7 +205,7 @@ impl PushPool { Ok(_) => { debug!(task_id = %id, "Activation sent to worker"); - if let Err(e) = store.mark_activation_sent(&id).await { + if let Err(e) = store.mark_activation_processing(&id).await { error!( task_id = %id, error = ?e, @@ -224,7 +225,8 @@ impl PushPool { Ok(()) } - }); + }, + ); while let Some(result) = push_pool.join_next().await { match result { diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 0fcf283b..a77aa551 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -30,14 +30,14 @@ use crate::config::Config; pub type BucketRange = (i16, i16); -/// The members of this enum should be synced with the members -/// of InflightActivationStatus in sentry_protos +/// The members of this enum should be a superset of the members +/// of `InflightActivationStatus` in `sentry_protos`. #[derive(Clone, Copy, Debug, PartialEq, Eq, Type)] pub enum InflightActivationStatus { /// Unused but necessary to align with sentry-protos Unspecified, Pending, - Sending, + Claimed, Processing, Failure, Retry, @@ -59,8 +59,8 @@ impl FromStr for InflightActivationStatus { Ok(InflightActivationStatus::Unspecified) } else if s == "Pending" { Ok(InflightActivationStatus::Pending) - } else if s == "Sending" { - Ok(InflightActivationStatus::Sending) + } else if s == "Claimed" { + Ok(InflightActivationStatus::Claimed) } else if s == "Processing" { Ok(InflightActivationStatus::Processing) } else if s == "Failure" { @@ -376,8 +376,8 @@ pub struct DepthCounts { /// Number of delayed tasks in the store. pub delay: usize, - /// The number of tasks being sent in the store. - pub sending: usize, + /// Activations claimed for push delivery but not yet marked processing. + pub claimed: usize, /// The number of processing tasks in the store. pub processing: usize, @@ -411,7 +411,7 @@ pub trait InflightActivationStore: Send + Sync { status: InflightActivationStatus, ) -> Result, Error>; - /// Claims `limit` activations within the `bucket` range. Push mode uses status `Sending` until `mark_activation_sent` moves to `Processing`. + /// Claims `limit` activations within the `bucket` range. Push mode uses status `Claimed` until `mark_activation_processing` moves to `Processing`. async fn claim_activations_for_push( &self, application: Option<&str>, @@ -434,7 +434,7 @@ pub trait InflightActivationStore: Send + Sync { namespaces, limit, bucket, - InflightActivationStatus::Sending, + InflightActivationStatus::Claimed, ) .await } @@ -477,7 +477,7 @@ pub trait InflightActivationStore: Send + Sync { } /// Record successful push. - async fn mark_activation_sent(&self, id: &str) -> Result<(), Error>; + async fn mark_activation_processing(&self, id: &str) -> Result<(), Error>; /// Get the age of the oldest pending activation in seconds async fn pending_activation_max_lag(&self, now: &DateTime) -> f64; @@ -497,17 +497,17 @@ pub trait InflightActivationStore: Send + Sync { /// Queue depths for pending, delay, and processing (writer backpressure and upkeep gauges). /// Default implementation uses separate calls, but stores may override with a single query. async fn count_depths(&self) -> Result { - let (pending, delay, sending, processing) = join!( + let (pending, delay, claimed, processing) = join!( self.count_by_status(InflightActivationStatus::Pending), self.count_by_status(InflightActivationStatus::Delay), - self.count_by_status(InflightActivationStatus::Sending), + self.count_by_status(InflightActivationStatus::Claimed), self.count_by_status(InflightActivationStatus::Processing), ); Ok(DepthCounts { pending: pending?, delay: delay?, - sending: sending?, + claimed: claimed?, processing: processing?, }) } @@ -986,9 +986,9 @@ impl InflightActivationStore for SqliteActivationStore { } #[instrument(skip_all)] - async fn mark_activation_sent(&self, id: &str) -> Result<(), Error> { + async fn mark_activation_processing(&self, id: &str) -> Result<(), Error> { let mut conn = self - .acquire_write_conn_metric("mark_activation_sent") + .acquire_write_conn_metric("mark_activation_processing") .await?; let result = sqlx::query( @@ -996,7 +996,7 @@ impl InflightActivationStore for SqliteActivationStore { ) .bind(InflightActivationStatus::Processing) .bind(id) - .bind(InflightActivationStatus::Sending) + .bind(InflightActivationStatus::Claimed) .execute(&mut *conn) .await?; @@ -1172,11 +1172,11 @@ impl InflightActivationStore for SqliteActivationStore { .bind(InflightActivationStatus::Failure) .bind(now.timestamp()) .bind(InflightActivationStatus::Processing) - .bind(InflightActivationStatus::Sending) + .bind(InflightActivationStatus::Claimed) .execute(&mut *atomic) .await?; - // Revert activations that weren't delivered back to 'pending' without consuming an attempt + // Revert activations that weren't delivered back to 'pending' without consuming an attempt (release claims) let unsent = sqlx::query( "UPDATE inflight_taskactivations SET processing_deadline = null, @@ -1187,7 +1187,7 @@ impl InflightActivationStore for SqliteActivationStore { ) .bind(InflightActivationStatus::Pending) .bind(now.timestamp()) - .bind(InflightActivationStatus::Sending) + .bind(InflightActivationStatus::Claimed) .execute(&mut *atomic) .await?; diff --git a/src/store/inflight_activation_tests.rs b/src/store/inflight_activation_tests.rs index 41f7b3f6..caec31d3 100644 --- a/src/store/inflight_activation_tests.rs +++ b/src/store/inflight_activation_tests.rs @@ -414,7 +414,7 @@ async fn test_get_pending_activation_from_multiple_namespaces(#[case] adapter: & Some(&namespaces), None, None, - InflightActivationStatus::Sending, + InflightActivationStatus::Claimed, ) .await .unwrap(); @@ -422,10 +422,10 @@ async fn test_get_pending_activation_from_multiple_namespaces(#[case] adapter: & assert_eq!(result.len(), 2); assert_eq!(result[1].id, "id_2"); assert_eq!(result[1].namespace, "ns3"); - assert_eq!(result[1].status, InflightActivationStatus::Sending); + assert_eq!(result[1].status, InflightActivationStatus::Claimed); assert_eq!(result[0].id, "id_1"); assert_eq!(result[0].namespace, "ns2"); - assert_eq!(result[0].status, InflightActivationStatus::Sending); + assert_eq!(result[0].status, InflightActivationStatus::Claimed); store.remove_db().await.unwrap(); } @@ -456,7 +456,7 @@ async fn test_get_pending_activation_with_namespace_requires_application(#[case] Some(&namespaces), Some(2), None, - InflightActivationStatus::Sending, + InflightActivationStatus::Claimed, ) .await .unwrap(); @@ -648,13 +648,13 @@ async fn test_get_pending_activations_no_limit(#[case] adapter: &str) { assert_eq!(got.len(), N); assert!( got.iter() - .all(|a| a.status == InflightActivationStatus::Sending) + .all(|a| a.status == InflightActivationStatus::Claimed) ); assert_eq!(store.count_pending_activations().await.unwrap(), 0); assert_counts( StatusCount { pending: 0, - sending: N, + claimed: N, ..StatusCount::default() }, store.as_ref(), @@ -682,7 +682,7 @@ async fn test_get_pending_activations_limit_below_pending(#[case] adapter: &str) assert_eq!(got.len(), X as usize); assert!( got.iter() - .all(|a| a.status == InflightActivationStatus::Sending) + .all(|a| a.status == InflightActivationStatus::Claimed) ); assert_eq!( store.count_pending_activations().await.unwrap(), @@ -691,7 +691,7 @@ async fn test_get_pending_activations_limit_below_pending(#[case] adapter: &str) assert_counts( StatusCount { pending: N - X as usize, - sending: X as usize, + claimed: X as usize, ..StatusCount::default() }, store.as_ref(), @@ -719,13 +719,13 @@ async fn test_get_pending_activations_limit_above_pending(#[case] adapter: &str) assert_eq!(got.len(), Y); assert!( got.iter() - .all(|a| a.status == InflightActivationStatus::Sending) + .all(|a| a.status == InflightActivationStatus::Claimed) ); assert_eq!(store.count_pending_activations().await.unwrap(), 0); assert_counts( StatusCount { pending: 0, - sending: Y, + claimed: Y, ..StatusCount::default() }, store.as_ref(), @@ -1015,13 +1015,13 @@ async fn test_handle_processing_deadline_multiple_tasks(#[case] adapter: &str) { let mut batch = make_activations(2); batch[0].status = InflightActivationStatus::Processing; batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); - batch[1].status = InflightActivationStatus::Sending; + batch[1].status = InflightActivationStatus::Claimed; batch[1].processing_deadline = Some(Utc::now() + chrono::Duration::days(30)); assert!(store.store(batch).await.is_ok()); assert_counts( StatusCount { processing: 1, - sending: 1, + claimed: 1, ..StatusCount::default() }, store.as_ref(), @@ -1041,7 +1041,7 @@ async fn test_handle_processing_deadline_multiple_tasks(#[case] adapter: &str) { assert_counts( StatusCount { pending: 1, - sending: 1, + claimed: 1, ..StatusCount::default() }, store.as_ref(), @@ -1278,7 +1278,7 @@ async fn test_handle_processing_deadline_no_retries_remaining(#[case] adapter: & async fn test_handle_processing_deadline_unsent_no_attempt_increment(#[case] adapter: &str) { let store = create_test_store(adapter).await; let mut batch = make_activations(1); - batch[0].status = InflightActivationStatus::Sending; + batch[0].status = InflightActivationStatus::Claimed; batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); assert!(store.store(batch.clone()).await.is_ok()); let count = store.handle_processing_deadline().await.unwrap(); @@ -1303,7 +1303,7 @@ async fn test_handle_processing_deadline_unsent_no_attempt_increment(#[case] ada async fn test_handle_processing_deadline_at_most_once_unsent_failure(#[case] adapter: &str) { let store = create_test_store(adapter).await; let mut batch = make_activations(1); - batch[0].status = InflightActivationStatus::Sending; + batch[0].status = InflightActivationStatus::Claimed; batch[0].at_most_once = true; batch[0].processing_deadline = Some(Utc.with_ymd_and_hms(2020, 1, 1, 1, 1, 1).unwrap()); assert!(store.store(batch.clone()).await.is_ok()); diff --git a/src/store/postgres_activation_store.rs b/src/store/postgres_activation_store.rs index a68c9a98..d5bc91fd 100644 --- a/src/store/postgres_activation_store.rs +++ b/src/store/postgres_activation_store.rs @@ -332,9 +332,9 @@ impl InflightActivationStore for PostgresActivationStore { } #[instrument(skip_all)] - async fn mark_activation_sent(&self, id: &str) -> Result<(), Error> { + async fn mark_activation_processing(&self, id: &str) -> Result<(), Error> { let mut conn = self - .acquire_write_conn_metric("mark_activation_sent") + .acquire_write_conn_metric("mark_activation_processing") .await?; let result = sqlx::query( @@ -342,7 +342,7 @@ impl InflightActivationStore for PostgresActivationStore { ) .bind(InflightActivationStatus::Processing.to_string()) .bind(id) - .bind(InflightActivationStatus::Sending.to_string()) + .bind(InflightActivationStatus::Claimed.to_string()) .execute(&mut *conn) .await?; @@ -412,14 +412,14 @@ impl InflightActivationStore for PostgresActivationStore { let sql = " SELECT COUNT(*) FILTER (WHERE status = $1) AS pending, COUNT(*) FILTER (WHERE status = $2) AS delay, - COUNT(*) FILTER (WHERE status = $3) AS sending, + COUNT(*) FILTER (WHERE status = $3) AS claimed, COUNT(*) FILTER (WHERE status = $4) AS processing FROM inflight_taskactivations"; let row: (i64, i64, i64, i64) = sqlx::query_as(sql) .bind(InflightActivationStatus::Pending.to_string()) .bind(InflightActivationStatus::Delay.to_string()) - .bind(InflightActivationStatus::Sending.to_string()) + .bind(InflightActivationStatus::Claimed.to_string()) .bind(InflightActivationStatus::Processing.to_string()) .fetch_one(&self.read_pool) .await?; @@ -427,7 +427,7 @@ impl InflightActivationStore for PostgresActivationStore { Ok(DepthCounts { pending: row.0 as usize, delay: row.1 as usize, - sending: row.2 as usize, + claimed: row.2 as usize, processing: row.3 as usize, }) } @@ -544,11 +544,11 @@ impl InflightActivationStore for PostgresActivationStore { .bind(InflightActivationStatus::Failure.to_string()) .bind(now) .bind(InflightActivationStatus::Processing.to_string()) - .bind(InflightActivationStatus::Sending.to_string()) + .bind(InflightActivationStatus::Claimed.to_string()) .execute(&mut *atomic) .await?; - // Revert activations that weren't delivered back to 'pending' without consuming an attempt + // Revert activations that weren't delivered back to 'pending' without consuming an attempt (release claims) let unsent = sqlx::query( "UPDATE inflight_taskactivations SET processing_deadline = null, @@ -559,7 +559,7 @@ impl InflightActivationStore for PostgresActivationStore { ) .bind(InflightActivationStatus::Pending.to_string()) .bind(now) - .bind(InflightActivationStatus::Sending.to_string()) + .bind(InflightActivationStatus::Claimed.to_string()) .execute(&mut *atomic) .await?; diff --git a/src/test_utils.rs b/src/test_utils.rs index 22931a1f..dfc6f46f 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -396,7 +396,7 @@ pub fn replace_retry_state(inflight: &mut InflightActivation, retry: Option Date: Fri, 10 Apr 2026 11:56:01 -0700 Subject: [PATCH 11/11] Tweak Logs --- src/push/mod.rs | 2 +- src/store/postgres_activation_store.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/push/mod.rs b/src/push/mod.rs index 50089044..c552e7b5 100644 --- a/src/push/mod.rs +++ b/src/push/mod.rs @@ -209,7 +209,7 @@ impl PushPool { error!( task_id = %id, error = ?e, - "Failed to mark activation as sent after push" + "Failed to mark activation as processing after push" ); } } diff --git a/src/store/postgres_activation_store.rs b/src/store/postgres_activation_store.rs index ed1f537a..b956e8f3 100644 --- a/src/store/postgres_activation_store.rs +++ b/src/store/postgres_activation_store.rs @@ -360,7 +360,7 @@ impl InflightActivationStore for PostgresActivationStore { if result.rows_affected() == 0 { warn!( task_id = %id, - "Activation could not be marked as sent, it may be missing or its status may have already changed" + "Activation could not be marked as processing, it may be missing or its status may have already changed" ); }