diff --git a/gocd/templates/bash/run-migrations.sh b/gocd/templates/bash/run-migrations.sh new file mode 100644 index 00000000..650702ff --- /dev/null +++ b/gocd/templates/bash/run-migrations.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +eval $(regions-project-env-vars --region="${SENTRY_REGION}") +/devinfra/scripts/get-cluster-credentials + +k8s-spawn-job \ + --label-selector="${LABEL_SELECTOR}" \ + --image="us-central1-docker.pkg.dev/sentryio/taskbroker/image:${GO_REVISION_TASKBROKER_REPO}" \ + --container-name="taskbroker" \ + --name="taskbroker-migrations" \ + -- \ + /opt/taskbroker \ + --run migrations diff --git a/gocd/templates/pipelines/taskbroker.libsonnet b/gocd/templates/pipelines/taskbroker.libsonnet index a90c31be..3a0b08f5 100644 --- a/gocd/templates/pipelines/taskbroker.libsonnet +++ b/gocd/templates/pipelines/taskbroker.libsonnet @@ -18,6 +18,24 @@ local checks_stage = { }, }; +local run_migrations_stage = { + 'run-migrations': { + fetch_materials: true, + jobs: { + 'run-migrations': { + timeout: 60, + elastic_profile_id: 'taskbroker', + environment_variables: { + LABEL_SELECTOR: 'service=taskbroker', + }, + tasks: [ + gocdtasks.script(importstr '../bash/run-migrations.sh'), + ], + }, + }, + }, +}; + local deploy_canary_stage(region) = if region == 'us' || region == 'de' then [ @@ -73,5 +91,7 @@ function(region) { }, }, lock_behavior: 'unlockWhenFinished', - stages: [checks_stage] + deploy_canary_stage(region) + [deployPrimaryStage], + stages: [checks_stage, run_migrations_stage] + + deploy_canary_stage(region) + + [deployPrimaryStage], } diff --git a/src/config.rs b/src/config.rs index b1d46ca1..79b8e14f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -13,6 +13,7 @@ use validator::{Validate, ValidationError}; use crate::Args; use crate::fetch::MAX_FETCH_THREADS; use crate::logging::LogFormat; +use crate::store::adapters::postgres; /// Configuration for a single Kafka topic in multi-topic mode. #[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] @@ -130,6 +131,18 @@ pub enum DatabaseAdapter { Postgres, } +impl DatabaseAdapter { + pub async fn migrate(&self, config: &Config) -> Result<()> { + match self { + Self::Postgres => postgres::migrate(config).await, + Self::Sqlite => { + warn!("Standalone migration not supported for SQLite"); + Ok(()) + } + } + } +} + /// How the taskbroker delivers tasks to workers. #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Deserialize, Serialize)] #[serde(rename_all = "lowercase")] @@ -303,12 +316,18 @@ pub struct Config { /// The port of the postgres database to use for the activation store. pub pg_port: u16, + // User permitted to run DDL operations. + pub pg_ddl_username: String, + /// The username of the postgres database to use for the activation store. pub pg_username: String, /// The password of the postgres database to use for the activation store. pub pg_password: String, + /// Password for the user permitted to run DDL operations. + pub pg_ddl_password: String, + /// The name of the postgres database to use for the activation store. pub pg_database_name: String, @@ -552,6 +571,8 @@ impl Default for Config { run_migrations: false, pg_host: "sentry-postgres-1".to_owned(), pg_port: 5432, + pg_ddl_username: "postgres".to_owned(), + pg_ddl_password: "password".to_owned(), pg_username: "postgres".to_owned(), pg_password: "password".to_owned(), pg_database_name: "default".to_owned(), @@ -1141,8 +1162,8 @@ mod tests { use figment::Jail; use validator::Validate; - use crate::Args; use crate::logging::LogFormat; + use crate::{Args, Run}; use super::{Config, DatabaseAdapter, DeliveryMode}; @@ -1283,6 +1304,7 @@ mod tests { jail.set_env("TASKBROKER_LOG_FILTER", "error"); let args = Args { + run: Run::Broker, config: Some("config.yaml".to_owned()), }; let config = Config::from_args(&args).unwrap(); @@ -1338,7 +1360,10 @@ mod tests { jail.set_env("TASKBROKER_DATABASE_ADAPTER", "postgres"); jail.set_env("TASKBROKER_MAX_PROCESSING_ATTEMPTS", "5"); - let args = Args { config: None }; + let args = Args { + run: Run::Broker, + config: None, + }; let config = Config::from_args(&args).unwrap(); assert_eq!(config.log_filter, "error"); assert_eq!(config.database_adapter, DatabaseAdapter::Postgres); @@ -1355,7 +1380,10 @@ mod tests { jail.set_env("TASKBROKER_MAX_PROCESSING_ATTEMPTS", "5"); jail.set_env("TASKBROKER_DEFAULT_METRICS_TAGS", "{key=value}"); - let args = Args { config: None }; + let args = Args { + run: Run::Broker, + config: None, + }; let config = Config::from_args(&args).unwrap(); assert_eq!(config.sentry_dsn, None); assert_eq!(config.sentry_env, None); @@ -1398,7 +1426,10 @@ mod tests { "{sentry=http://127.0.0.1:60052,launchpad=http://127.0.0.1:60053}", ); - let args = Args { config: None }; + let args = Args { + run: Run::Broker, + config: None, + }; let config = Config::from_args(&args).unwrap(); assert_eq!( config.worker_map, @@ -1414,7 +1445,10 @@ mod tests { #[test] fn test_kafka_consumer_config() { - let args = Args { config: None }; + let args = Args { + run: Run::Broker, + config: None, + }; let config = Config::from_args(&args).unwrap(); let consumer_config = config.kafka_consumer_config_for("taskworker"); @@ -1434,7 +1468,10 @@ mod tests { jail.set_env("TASKBROKER_KAFKA_SASL_USERNAME", "taskbroker"); jail.set_env("TASKBROKER_KAFKA_SASL_PASSWORD", "secret-tech"); - let args = Args { config: None }; + let args = Args { + run: Run::Broker, + config: None, + }; let config = Config::from_args(&args).unwrap(); let consumer_config = config.kafka_consumer_config_for("taskworker"); @@ -1469,7 +1506,10 @@ mod tests { "/etc/ssl/taskbroker/private.key", ); - let args = Args { config: None }; + let args = Args { + run: Run::Broker, + config: None, + }; let config = Config::from_args(&args).unwrap(); let consumer_config = config.kafka_consumer_config_for("taskworker"); @@ -1492,7 +1532,10 @@ mod tests { #[test] fn test_kafka_producer_config() { - let args = Args { config: None }; + let args = Args { + run: Run::Broker, + config: None, + }; let config = Config::from_args(&args).unwrap(); let producer_config = config.kafka_producer_config(); @@ -1518,7 +1561,10 @@ mod tests { jail.set_env("TASKBROKER_KAFKA_DEADLETTER_SASL_USERNAME", "taskbroker"); jail.set_env("TASKBROKER_KAFKA_DEADLETTER_SASL_PASSWORD", "secret-tech"); - let args = Args { config: None }; + let args = Args { + run: Run::Broker, + config: None, + }; let config = Config::from_args(&args).unwrap(); let producer_config = config.kafka_producer_config(); @@ -1553,7 +1599,10 @@ mod tests { "/etc/ssl/taskbroker/private.key", ); - let args = Args { config: None }; + let args = Args { + run: Run::Broker, + config: None, + }; let config = Config::from_args(&args).unwrap(); let producer_config = config.kafka_producer_config(); @@ -1585,7 +1634,10 @@ mod tests { Jail::expect_with(|jail| { jail.set_env("TASKBROKER_DELIVERY_MODE", "push"); - let args = Args { config: None }; + let args = Args { + run: Run::Broker, + config: None, + }; let config = Config::from_args(&args).unwrap(); assert_eq!(config.delivery_mode, DeliveryMode::Push); @@ -1599,6 +1651,7 @@ mod tests { jail.create_file("config.yaml", "delivery_mode: push")?; let args = Args { + run: Run::Broker, config: Some("config.yaml".to_owned()), }; let config = Config::from_args(&args).unwrap(); @@ -1647,6 +1700,7 @@ kafka_clusters: )?; let args = Args { + run: Run::Broker, config: Some("config.yaml".to_owned()), }; let config = Config::from_args(&args).unwrap(); @@ -1741,6 +1795,7 @@ kafka_clusters: )?; let args = Args { + run: Run::Broker, config: Some("config.yaml".to_owned()), }; // A clean error, not a panic. @@ -1786,7 +1841,10 @@ kafka_clusters: "10.0.0.2:9092", ); - let args = Args { config: None }; + let args = Args { + run: Run::Broker, + config: None, + }; let config = Config::from_args(&args).unwrap(); let topics = &config.kafka_topics; @@ -1831,6 +1889,7 @@ kafka_clusters: )?; let args = Args { + run: Run::Broker, config: Some("config.yaml".to_owned()), }; let err = Config::from_args(&args).unwrap_err(); @@ -1859,6 +1918,7 @@ kafka_topics: )?; let args = Args { + run: Run::Broker, config: Some("config.yaml".to_owned()), }; let err = Config::from_args(&args).unwrap_err(); @@ -1890,6 +1950,7 @@ kafka_clusters: )?; let args = Args { + run: Run::Broker, config: Some("config.yaml".to_owned()), }; let err = Config::from_args(&args).unwrap_err(); @@ -1939,6 +2000,7 @@ kafka_clusters: )?; let args = Args { + run: Run::Broker, config: Some("config.yaml".to_owned()), }; let err = Config::from_args(&args).unwrap_err(); @@ -1982,6 +2044,7 @@ kafka_clusters: )?; let args = Args { + run: Run::Broker, config: Some("config.yaml".to_owned()), }; let config = Config::from_args(&args).unwrap(); @@ -2022,6 +2085,7 @@ kafka_clusters: )?; let args = Args { + run: Run::Broker, config: Some("config.yaml".to_owned()), }; let err = Config::from_args(&args).unwrap_err(); @@ -2054,6 +2118,7 @@ kafka_clusters: )?; let args = Args { + run: Run::Broker, config: Some("config.yaml".to_owned()), }; let err = Config::from_args(&args).unwrap_err(); @@ -2078,7 +2143,10 @@ kafka_clusters: jail.set_env("TASKBROKER_KAFKA_TOPIC", "taskworker"); jail.set_env("TASKBROKER_KAFKA_DEADLETTER_TOPIC", "taskworker"); - let args = Args { config: None }; + let args = Args { + run: Run::Broker, + config: None, + }; let err = Config::from_args(&args).unwrap_err(); assert!( err.to_string().contains( @@ -2102,7 +2170,10 @@ kafka_clusters: jail.set_env("TASKBROKER_KAFKA_RETRY_TOPIC", "taskworker-dlq"); jail.set_env("TASKBROKER_KAFKA_DEADLETTER_TOPIC", "taskworker-dlq"); - let args = Args { config: None }; + let args = Args { + run: Run::Broker, + config: None, + }; let err = Config::from_args(&args).unwrap_err(); assert!( err.to_string().contains( @@ -2146,6 +2217,7 @@ kafka_clusters: )?; let args = Args { + run: Run::Broker, config: Some("config.yaml".to_owned()), }; let config = Config::from_args(&args).unwrap(); @@ -2193,6 +2265,7 @@ kafka_clusters: )?; let args = Args { + run: Run::Broker, config: Some("config.yaml".to_owned()), }; let config = Config::from_args(&args).unwrap(); @@ -2241,6 +2314,7 @@ kafka_clusters: )?; let args = Args { + run: Run::Broker, config: Some("config.yaml".to_owned()), }; let err = Config::from_args(&args).unwrap_err(); @@ -2285,6 +2359,7 @@ kafka_clusters: )?; let args = Args { + run: Run::Broker, config: Some("config.yaml".to_owned()), }; let err = Config::from_args(&args).unwrap_err(); @@ -2314,7 +2389,10 @@ kafka_clusters: jail.set_env("TASKBROKER_KAFKA_DEADLETTER_TOPIC", "taskworker-ingest-dlq"); jail.set_env("TASKBROKER_KAFKA_DEADLETTER_CLUSTER", "kafka-small:9092"); - let args = Args { config: None }; + let args = Args { + run: Run::Broker, + config: None, + }; let config = Config::from_args(&args).expect("legacy retry config should validate"); // The retry topic resolves to the deadletter cluster (where the diff --git a/src/lib.rs b/src/lib.rs index a0f8bd99..23b562ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,4 @@ -use clap::Parser; +use clap::{Parser, ValueEnum}; use std::fs; pub mod config; @@ -25,10 +25,21 @@ pub fn get_version() -> &'static str { Box::leak(release_name.into_boxed_str()) } +/// What are we running? +#[derive(Debug, Clone, Copy, ValueEnum, PartialEq)] +pub enum Run { + Migrations, + Broker, +} + #[derive(Parser, Debug)] pub struct Args { - /// Path to the configuration file - #[arg(short, long, help = "The path to a config file")] + /// What are we running? + #[arg(short, long, default_value = "broker")] + pub run: Run, + + /// Path to the configuration file. + #[arg(short, long)] pub config: Option, } diff --git a/src/main.rs b/src/main.rs index 93f971d1..8ad579c9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,16 +25,16 @@ use taskbroker::kafka::admin::create_missing_topics; use taskbroker::kafka::consumer::start_consumer; use taskbroker::kafka::deserialize::{self, DeserializeConfig}; use taskbroker::kafka::os_stream_writer::{OsStream, OsStreamWriter}; -use taskbroker::logging; use taskbroker::metrics; use taskbroker::processing_strategy; use taskbroker::push::PushPool; use taskbroker::runtime_config::RuntimeConfigManager; -use taskbroker::store::adapters::postgres::{PostgresStore, PostgresStoreConfig}; +use taskbroker::store::adapters::postgres::{self, PostgresStore, PostgresStoreConfig}; use taskbroker::store::adapters::sqlite::{SqliteStore, SqliteStoreConfig}; use taskbroker::store::traits::ActivationStore; use taskbroker::upkeep::upkeep; use taskbroker::{Args, get_version}; +use taskbroker::{Run, logging}; use taskbroker::{SERVICE_NAME, flusher}; async fn log_task_completion>(name: T, task: JoinHandle>) { @@ -64,11 +64,19 @@ async fn main() -> Result<(), Error> { logging::init(logging::LoggingConfig::from_config(&config)); metrics::init(metrics::MetricsConfig::from_config(&config)); + if args.run == Run::Migrations { + return config.database_adapter.migrate(&config).await; + } + let store: Arc = match config.database_adapter { DatabaseAdapter::Sqlite => Arc::new( SqliteStore::new(&config.db_path, SqliteStoreConfig::from_config(&config)).await?, ), DatabaseAdapter::Postgres => { + if config.run_migrations { + postgres::migrate(&config).await?; + } + Arc::new(PostgresStore::new(PostgresStoreConfig::from_config(&config)).await?) } }; diff --git a/src/store/adapters/postgres.rs b/src/store/adapters/postgres.rs index 4cfdac14..fd5d1dfb 100644 --- a/src/store/adapters/postgres.rs +++ b/src/store/adapters/postgres.rs @@ -8,7 +8,7 @@ use sqlx::pool::PoolConnection; use sqlx::postgres::{PgConnectOptions, PgPool, PgPoolOptions}; use sqlx::{FromRow, Pool, Postgres, QueryBuilder, Transaction}; -use anyhow::{Error, anyhow}; +use anyhow::{Error, Result, anyhow}; use async_backtrace::framed; use async_trait::async_trait; use chrono::{DateTime, Utc}; @@ -23,6 +23,55 @@ use crate::store::retry::{RetryConfig, retry_query}; use crate::store::traits::ActivationStore; use crate::store::types::{BucketRange, DepthCounts, FailedTasksForwarder}; +/// Run migrations. +pub async fn migrate(config: &Config) -> Result<()> { + let mut conn_opts = PgConnectOptions::new() + .username(&config.pg_ddl_username) + .password(&config.pg_ddl_password) + .host(&config.pg_host) + .port(config.pg_port); + + if let Some(extra_query_params) = config.pg_extra_query_params.as_ref() { + let url = conn_opts.to_url_lossy(); + let new_url = + url.as_ref().split('?').next().unwrap().to_string() + "?" + extra_query_params; + conn_opts = PgConnectOptions::from_str(&new_url).unwrap(); + } + + let default_pool = + create_default_postgres_pool(&conn_opts, &config.pg_default_database_name).await?; + + // Create the database if it doesn't exist + let row: (bool,) = + sqlx::query_as("SELECT EXISTS ( SELECT 1 FROM pg_catalog.pg_database WHERE datname = $1 )") + .bind(&config.pg_database_name) + .fetch_one(&default_pool) + .await?; + + if !row.0 { + println!("Creating database {}", &config.pg_database_name); + sqlx::query(format!("CREATE DATABASE {}", &config.pg_database_name).as_str()) + .execute(&default_pool) + .await?; + } + + default_pool.close().await; + + let migration_pool = PgPoolOptions::new() + .max_connections(1) + .connect_with(conn_opts.database(&config.pg_database_name)) + .await?; + + println!("Running migrations on database"); + sqlx::migrate!("./migrations/postgres") + .run(&migration_pool) + .await?; + + migration_pool.close().await; + + Ok(()) +} + #[derive(Debug, FromRow)] struct TableRow { pub id: String, @@ -152,12 +201,14 @@ impl PostgresStoreConfig { .password(&config.pg_password) .host(&config.pg_host) .port(config.pg_port); + if let Some(extra_query_params) = config.pg_extra_query_params.as_ref() { let url = conn_opts.to_url_lossy(); let new_url = url.as_ref().split('?').next().unwrap().to_string() + "?" + extra_query_params; conn_opts = PgConnectOptions::from_str(&new_url).unwrap(); } + Self { pg_connection: conn_opts, pg_database_name: config.pg_database_name.clone(), @@ -207,42 +258,9 @@ impl PostgresStore { #[framed] pub async fn new(config: PostgresStoreConfig) -> Result { - if config.run_migrations { - let default_pool = create_default_postgres_pool( - &config.pg_connection, - &config.pg_default_database_name, - ) - .await?; - - // Create the database if it doesn't exist - let row: (bool,) = sqlx::query_as( - "SELECT EXISTS ( SELECT 1 FROM pg_catalog.pg_database WHERE datname = $1 )", - ) - .bind(&config.pg_database_name) - .fetch_one(&default_pool) - .await?; - - if !row.0 { - println!("Creating database {}", &config.pg_database_name); - sqlx::query(format!("CREATE DATABASE {}", &config.pg_database_name).as_str()) - .bind(&config.pg_database_name) - .execute(&default_pool) - .await?; - } - // Close the default pool - default_pool.close().await; - } - let (read_pool, write_pool) = create_postgres_pool(&config.pg_connection, &config.pg_database_name).await?; - if config.run_migrations { - println!("Running migrations on database"); - sqlx::migrate!("./migrations/postgres") - .run(&write_pool) - .await?; - } - Ok(Self { read_pool, write_pool, diff --git a/src/test_utils.rs b/src/test_utils.rs index 0d13c4c8..524b8177 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -17,7 +17,7 @@ use uuid::Uuid; use crate::config::Config; use crate::store::activation::{Activation, ActivationBuilder, ActivationStatus}; -use crate::store::adapters::postgres::{PostgresStore, PostgresStoreConfig}; +use crate::store::adapters::postgres::{self, PostgresStore, PostgresStoreConfig}; use crate::store::adapters::sqlite::{SqliteStore, SqliteStoreConfig}; use crate::store::traits::ActivationStore; @@ -284,13 +284,15 @@ pub async fn create_test_store(adapter: &str) -> Arc { .unwrap(), ) as Arc, "postgres" => { + let config = create_integration_config(); + postgres::migrate(&config).await.unwrap(); + let store = Arc::new( - PostgresStore::new(PostgresStoreConfig::from_config( - &create_integration_config(), - )) - .await - .unwrap(), + PostgresStore::new(PostgresStoreConfig::from_config(&config)) + .await + .unwrap(), ) as Arc; + store.assign_partitions(vec![0]).unwrap(); store }