diff --git a/migrations/0005_add_bucket.sql b/migrations/0005_add_bucket.sql new file mode 100644 index 00000000..eb5d5bab --- /dev/null +++ b/migrations/0005_add_bucket.sql @@ -0,0 +1 @@ +ALTER TABLE inflight_taskactivations ADD COLUMN bucket INTEGER NOT NULL DEFAULT 0; diff --git a/pg_migrations/0001_create_inflight_activations.sql b/pg_migrations/0001_create_inflight_activations.sql index ee8b26a4..56a60a5a 100644 --- a/pg_migrations/0001_create_inflight_activations.sql +++ b/pg_migrations/0001_create_inflight_activations.sql @@ -1,4 +1,3 @@ --- PostgreSQL equivalent of the inflight_taskactivations table CREATE TABLE IF NOT EXISTS inflight_taskactivations ( id TEXT NOT NULL PRIMARY KEY, activation BYTEA NOT NULL, @@ -16,5 +15,8 @@ CREATE TABLE IF NOT EXISTS inflight_taskactivations ( application TEXT NOT NULL, namespace TEXT NOT NULL, taskname TEXT NOT NULL, - on_attempts_exceeded INTEGER NOT NULL DEFAULT 1 + on_attempts_exceeded INTEGER NOT NULL DEFAULT 1, + bucket SMALLINT NOT NULL ); + +CREATE INDEX idx_activation_claim ON inflight_taskactivations (status, bucket); diff --git a/src/fetch/mod.rs b/src/fetch/mod.rs new file mode 100644 index 00000000..00fd58e2 --- /dev/null +++ b/src/fetch/mod.rs @@ -0,0 +1 @@ +pub const MAX_FETCH_THREADS: u128 = 256; diff --git a/src/kafka/deserialize_activation.rs b/src/kafka/deserialize_activation.rs index d0ea263e..41692958 100644 --- a/src/kafka/deserialize_activation.rs +++ b/src/kafka/deserialize_activation.rs @@ -1,6 +1,7 @@ use std::{sync::Arc, time::Duration}; use crate::config::Config; +use crate::fetch::MAX_FETCH_THREADS; use crate::store::inflight_activation::{InflightActivation, InflightActivationStatus}; use anyhow::{Error, anyhow}; use chrono::{DateTime, Utc}; @@ -8,6 +9,7 @@ use prost::Message as _; use rdkafka::{Message, message::OwnedMessage}; use sentry_protos::taskbroker::v1::OnAttemptsExceeded; use sentry_protos::taskbroker::v1::TaskActivation; +use uuid::Uuid; pub struct DeserializeActivationConfig { pub max_delayed_allowed: u64, @@ -21,6 +23,13 @@ impl DeserializeActivationConfig { } } +/// Use the UUID of an activation to determine its bucket (a value between 0 and 255, inclusive). +pub fn bucket_from_id(id: &str) -> i16 { + Uuid::parse_str(id) + .map(|u| (u.as_u128() % MAX_FETCH_THREADS) as i16) + .unwrap_or(0) +} + pub fn new( config: DeserializeActivationConfig, ) -> impl Fn(Arc) -> Result { @@ -78,6 +87,8 @@ pub fn new( .try_into() .unwrap(); + let bucket = bucket_from_id(&activation.id); + Ok(InflightActivation { id: activation.id.clone(), activation: payload.to_vec(), @@ -96,6 +107,7 @@ pub fn new( namespace, taskname, on_attempts_exceeded, + bucket, }) } } diff --git a/src/lib.rs b/src/lib.rs index 33567944..fc34ba44 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ use clap::Parser; use std::fs; pub mod config; +pub mod fetch; pub mod grpc; pub mod kafka; pub mod logging; diff --git a/src/store/inflight_activation.rs b/src/store/inflight_activation.rs index 9de6bf2b..6e6a249b 100644 --- a/src/store/inflight_activation.rs +++ b/src/store/inflight_activation.rs @@ -27,6 +27,8 @@ use tracing::{instrument, warn}; use crate::config::Config; +pub type BucketRange = (i16, i16); + /// The members of this enum should be synced with the members /// of InflightActivationStatus in sentry_protos #[derive(Clone, Copy, Debug, PartialEq, Eq, Type)] @@ -173,6 +175,10 @@ pub struct InflightActivation { /// are exceeded. #[builder(default = false)] pub at_most_once: bool, + + /// Bucket derived from activation ID (UUID as number % 256). Set once on ingestion. + #[builder(setter(skip), default = "0")] + pub bucket: i16, } impl InflightActivation { @@ -235,6 +241,7 @@ pub struct TableRow { pub taskname: String, #[sqlx(try_from = "i32")] pub on_attempts_exceeded: OnAttemptsExceeded, + pub bucket: i16, } impl TryFrom for TableRow { @@ -259,6 +266,7 @@ impl TryFrom for TableRow { namespace: value.namespace, taskname: value.taskname, on_attempts_exceeded: value.on_attempts_exceeded, + bucket: value.bucket, }) } } @@ -283,6 +291,7 @@ impl From for InflightActivation { namespace: value.namespace, taskname: value.taskname, on_attempts_exceeded: value.on_attempts_exceeded, + bucket: value.bucket, } } } @@ -369,7 +378,12 @@ pub trait InflightActivationStore: Send + Sync { return Ok(None); } let result = self - .get_pending_activations_from_namespaces(application, namespaces.as_deref(), Some(1)) + .get_pending_activations_from_namespaces( + application, + namespaces.as_deref(), + Some(1), + None, + ) .await?; if result.is_empty() { return Ok(None); @@ -383,6 +397,7 @@ pub trait InflightActivationStore: Send + Sync { application: Option<&str>, namespaces: Option<&[String]>, limit: Option, + bucket: Option, ) -> Result, Error>; /// Get the age of the oldest pending activation in seconds @@ -698,7 +713,8 @@ impl InflightActivationStore for SqliteActivationStore { application, namespace, taskname, - on_attempts_exceeded + on_attempts_exceeded, + bucket FROM inflight_taskactivations WHERE id = $1 ", @@ -739,7 +755,8 @@ impl InflightActivationStore for SqliteActivationStore { application, namespace, taskname, - on_attempts_exceeded + on_attempts_exceeded, + bucket ) ", ); @@ -772,6 +789,7 @@ impl InflightActivationStore for SqliteActivationStore { b.push_bind(row.namespace); b.push_bind(row.taskname); b.push_bind(row.on_attempts_exceeded as i32); + b.push_bind(row.bucket); }) .push(" ON CONFLICT(id) DO NOTHING") .build(); @@ -809,6 +827,7 @@ impl InflightActivationStore for SqliteActivationStore { application: Option<&str>, namespaces: Option<&[String]>, limit: Option, + _bucket: Option, ) -> Result, Error> { let now = Utc::now(); @@ -989,7 +1008,8 @@ impl InflightActivationStore for SqliteActivationStore { application, namespace, taskname, - on_attempts_exceeded + on_attempts_exceeded, + bucket FROM inflight_taskactivations WHERE status = $1 ", diff --git a/src/store/inflight_activation_tests.rs b/src/store/inflight_activation_tests.rs index 9113b26c..d761cea0 100644 --- a/src/store/inflight_activation_tests.rs +++ b/src/store/inflight_activation_tests.rs @@ -293,7 +293,7 @@ async fn test_get_pending_activation_from_multiple_namespaces(#[case] adapter: & // Get activation from multiple namespaces (should get oldest) let namespaces = vec!["ns2".to_string(), "ns3".to_string()]; let result = store - .get_pending_activations_from_namespaces(None, Some(&namespaces), None) + .get_pending_activations_from_namespaces(None, Some(&namespaces), None, None) .await .unwrap(); @@ -329,7 +329,7 @@ async fn test_get_pending_activation_with_namespace_requires_application(#[case] // We allow no application in this method because of usage in upkeep let namespaces = vec!["other_namespace".to_string()]; let activations = store - .get_pending_activations_from_namespaces(None, Some(namespaces).as_deref(), Some(2)) + .get_pending_activations_from_namespaces(None, Some(namespaces).as_deref(), Some(2), None) .await .unwrap(); assert_eq!( diff --git a/src/store/postgres_activation_store.rs b/src/store/postgres_activation_store.rs index bd01acdc..ae0ff445 100644 --- a/src/store/postgres_activation_store.rs +++ b/src/store/postgres_activation_store.rs @@ -1,6 +1,6 @@ use crate::store::inflight_activation::{ - FailedTasksForwarder, InflightActivation, InflightActivationStatus, InflightActivationStore, - QueryResult, TableRow, + BucketRange, FailedTasksForwarder, InflightActivation, InflightActivationStatus, + InflightActivationStore, QueryResult, TableRow, }; use anyhow::{Error, anyhow}; use async_trait::async_trait; @@ -175,7 +175,8 @@ impl InflightActivationStore for PostgresActivationStore { application, namespace, taskname, - on_attempts_exceeded + on_attempts_exceeded, + bucket FROM inflight_taskactivations WHERE id = $1 ", @@ -216,7 +217,8 @@ impl InflightActivationStore for PostgresActivationStore { application, namespace, taskname, - on_attempts_exceeded + on_attempts_exceeded, + bucket ) ", ); @@ -248,6 +250,7 @@ impl InflightActivationStore for PostgresActivationStore { b.push_bind(row.namespace); b.push_bind(row.taskname); b.push_bind(row.on_attempts_exceeded as i32); + b.push_bind(row.bucket); }) .push(" ON CONFLICT(id) DO NOTHING") .build(); @@ -264,11 +267,12 @@ impl InflightActivationStore for PostgresActivationStore { application: Option<&str>, namespaces: Option<&[String]>, limit: Option, + bucket: Option, ) -> Result, Error> { let now = Utc::now(); let grace_period = self.config.processing_deadline_grace_sec; - let mut query_builder = QueryBuilder::new( + let mut query_builder = QueryBuilder::::new( "WITH selected_activations AS ( SELECT id FROM inflight_taskactivations @@ -294,6 +298,15 @@ impl InflightActivationStore for PostgresActivationStore { } query_builder.push(")"); } + + if let Some((min, max)) = bucket { + query_builder.push(" AND bucket >= "); + query_builder.push_bind(min); + + query_builder.push(" AND bucket <= "); + query_builder.push_bind(max); + } + query_builder.push(" ORDER BY added_at"); if let Some(limit) = limit { query_builder.push(" LIMIT "); @@ -443,7 +456,8 @@ impl InflightActivationStore for PostgresActivationStore { application, namespace, taskname, - on_attempts_exceeded + on_attempts_exceeded, + bucket FROM inflight_taskactivations WHERE status = $1 ", diff --git a/src/upkeep.rs b/src/upkeep.rs index 58eaf986..4eedb469 100644 --- a/src/upkeep.rs +++ b/src/upkeep.rs @@ -304,7 +304,7 @@ pub async fn do_upkeep( .expect("Could not create kafka producer in upkeep"), ); if let Ok(tasks) = store - .get_pending_activations_from_namespaces(None, Some(&demoted_namespaces), None) + .get_pending_activations_from_namespaces(None, Some(&demoted_namespaces), None, None) .await { // Produce tasks to Kafka with updated namespace