From b77b2eae988325d5104c876c3c81fd10157f77e0 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Wed, 25 Mar 2026 12:21:54 -0500 Subject: [PATCH 1/4] feat(admin-api): add manual GC job scheduling endpoint and ampctl command Replace automatic periodic GC scheduling with a manual trigger. Add POST /gc/schedule endpoint to the admin API that accepts a location_id and schedules a GC job for that physical table revision. Add corresponding `ampctl gc schedule ` command and admin client method. The automatic scheduling loop, config, dedup/throttle logic, and scheduler tests are removed. Periodic scheduling will be designed as a generic scheduler feature in a follow-up. --- crates/bin/ampctl/src/cmd.rs | 1 + crates/bin/ampctl/src/cmd/gc.rs | 18 +++ crates/bin/ampctl/src/cmd/gc/schedule.rs | 79 ++++++++++ crates/bin/ampctl/src/main.rs | 5 + crates/clients/admin/src/gc.rs | 137 ++++++++++++++++++ crates/clients/admin/src/lib.rs | 6 + crates/services/admin-api/src/handlers.rs | 1 + crates/services/admin-api/src/handlers/gc.rs | 1 + .../admin-api/src/handlers/gc/schedule.rs | 118 +++++++++++++++ crates/services/admin-api/src/lib.rs | 3 +- 10 files changed, 368 insertions(+), 1 deletion(-) create mode 100644 crates/bin/ampctl/src/cmd/gc.rs create mode 100644 crates/bin/ampctl/src/cmd/gc/schedule.rs create mode 100644 crates/clients/admin/src/gc.rs create mode 100644 crates/services/admin-api/src/handlers/gc.rs create mode 100644 crates/services/admin-api/src/handlers/gc/schedule.rs diff --git a/crates/bin/ampctl/src/cmd.rs b/crates/bin/ampctl/src/cmd.rs index cd5ed39dc..8c571c2da 100644 --- a/crates/bin/ampctl/src/cmd.rs +++ b/crates/bin/ampctl/src/cmd.rs @@ -1,4 +1,5 @@ pub mod dataset; +pub mod gc; pub mod job; pub mod manifest; pub mod provider; diff --git a/crates/bin/ampctl/src/cmd/gc.rs b/crates/bin/ampctl/src/cmd/gc.rs new file mode 100644 index 000000000..483e3e4f3 --- /dev/null +++ b/crates/bin/ampctl/src/cmd/gc.rs @@ -0,0 +1,18 @@ +//! Garbage collection commands + +pub mod schedule; + +/// GC subcommands. +#[derive(Debug, clap::Subcommand)] +pub enum Commands { + /// Schedule a GC job for a physical table revision + Schedule(schedule::Args), +} + +/// Execute the GC command with the given subcommand. +pub async fn run(command: Commands) -> anyhow::Result<()> { + match command { + Commands::Schedule(args) => schedule::run(args).await?, + } + Ok(()) +} diff --git a/crates/bin/ampctl/src/cmd/gc/schedule.rs b/crates/bin/ampctl/src/cmd/gc/schedule.rs new file mode 100644 index 000000000..5b9176f7c --- /dev/null +++ b/crates/bin/ampctl/src/cmd/gc/schedule.rs @@ -0,0 +1,79 @@ +//! GC schedule command. +//! +//! Schedules a garbage collection job for a physical table revision +//! through the admin API. + +use monitoring::logging; + +use crate::args::GlobalArgs; + +/// Command-line arguments for the `gc schedule` command. +#[derive(Debug, clap::Args)] +pub struct Args { + #[command(flatten)] + pub global: GlobalArgs, + + /// The location ID of the physical table revision to garbage collect + pub location_id: i64, +} + +/// Schedule a GC job via the admin API. +#[tracing::instrument(skip_all, fields(admin_url = %global.admin_url, location_id = %location_id))] +pub async fn run( + Args { + global, + location_id, + }: Args, +) -> Result<(), Error> { + let client = global.build_client().map_err(Error::ClientBuildError)?; + + tracing::debug!("Scheduling GC job via admin API"); + + let job_id = client.gc().schedule(location_id).await.map_err(|err| { + tracing::error!(error = %err, error_source = logging::error_source(&err), "Failed to schedule GC job"); + Error::ScheduleError(err) + })?; + + let result = ScheduleResult { + job_id, + location_id, + }; + global.print(&result).map_err(Error::JsonSerialization)?; + + Ok(()) +} + +/// Result of a GC schedule operation. +#[derive(serde::Serialize)] +struct ScheduleResult { + job_id: amp_worker_core::jobs::job_id::JobId, + location_id: i64, +} + +impl std::fmt::Display for ScheduleResult { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + writeln!( + f, + "{} GC job {} scheduled for location {}", + console::style("✓").green().bold(), + self.job_id, + self.location_id, + ) + } +} + +/// Errors for GC schedule operations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Failed to build client + #[error("failed to build admin API client")] + ClientBuildError(#[source] crate::args::BuildClientError), + + /// Error scheduling GC job via admin API + #[error("failed to schedule GC job")] + ScheduleError(#[source] amp_client_admin::gc::ScheduleError), + + /// Failed to serialize result to JSON + #[error("failed to serialize result to JSON")] + JsonSerialization(#[source] serde_json::Error), +} diff --git a/crates/bin/ampctl/src/main.rs b/crates/bin/ampctl/src/main.rs index beaaeeea7..40d1a6698 100644 --- a/crates/bin/ampctl/src/main.rs +++ b/crates/bin/ampctl/src/main.rs @@ -56,6 +56,10 @@ enum Commands { #[command(long_about = include_str!("cmd/dataset__long_about.md"))] Dataset(cmd::dataset::Commands), + /// Manage garbage collection jobs + #[command(subcommand)] + Gc(cmd::gc::Commands), + /// Manage table revisions #[command(subcommand)] #[command(alias = "tables")] @@ -86,6 +90,7 @@ async fn run() -> anyhow::Result<()> { Commands::Provider(command) => cmd::provider::run(command).await?, Commands::Job(command) => cmd::job::run(command).await?, Commands::Dataset(command) => cmd::dataset::run(command).await?, + Commands::Gc(command) => cmd::gc::run(command).await?, Commands::Table(command) => cmd::table::run(command).await?, Commands::Worker(command) => cmd::worker::run(command).await?, Commands::Trace(command) => cmd::trace::run(command).await?, diff --git a/crates/clients/admin/src/gc.rs b/crates/clients/admin/src/gc.rs new file mode 100644 index 000000000..ef24f2519 --- /dev/null +++ b/crates/clients/admin/src/gc.rs @@ -0,0 +1,137 @@ +//! GC management API client. +//! +//! Provides methods for interacting with the `/gc` endpoints of the admin API. + +use amp_worker_core::jobs::job_id::JobId; + +use super::{ + Client, + error::{ApiError, ErrorResponse}, +}; + +/// Build URL path for scheduling a GC job. +/// +/// POST `/gc/schedule` +fn gc_schedule() -> &'static str { + "gc/schedule" +} + +/// Client for GC-related API operations. +pub struct GcClient<'a> { + client: &'a Client, +} + +impl<'a> GcClient<'a> { + pub fn new(client: &'a Client) -> Self { + Self { client } + } + + /// Schedule a GC job for a physical table revision. + pub async fn schedule(&self, location_id: i64) -> Result { + let url = self + .client + .base_url() + .join(gc_schedule()) + .expect("valid URL"); + + let body = serde_json::json!({ "location_id": location_id }); + + tracing::debug!(%location_id, "scheduling GC job"); + + let response = self + .client + .http() + .post(url.as_str()) + .json(&body) + .send() + .await + .map_err(|err| ScheduleError::Network { + url: url.to_string(), + source: err, + })?; + + let status = response.status(); + tracing::debug!(status = %status, "received API response"); + + match status.as_u16() { + 202 => { + let resp: ScheduleResponse = + response + .json() + .await + .map_err(|err| ScheduleError::ResponseParse { + url: url.to_string(), + source: err, + })?; + Ok(resp.job_id) + } + 400 | 409 | 500 => { + let text = response + .text() + .await + .map_err(|err| ScheduleError::ResponseParse { + url: url.to_string(), + source: err, + })?; + let error_response: ErrorResponse = + serde_json::from_str(&text).map_err(|_| ScheduleError::UnexpectedResponse { + status: status.as_u16(), + body: text.clone(), + })?; + match error_response.error_code.as_str() { + "INVALID_BODY" | "INVALID_LOCATION_ID" => { + Err(ScheduleError::InvalidRequest(error_response.into())) + } + "NO_WORKERS_AVAILABLE" => { + Err(ScheduleError::NoWorkersAvailable(error_response.into())) + } + "ACTIVE_JOB_CONFLICT" => { + Err(ScheduleError::ActiveJobConflict(error_response.into())) + } + _ => Err(ScheduleError::Api(error_response.into())), + } + } + _ => Err(ScheduleError::UnexpectedResponse { + status: status.as_u16(), + body: response.text().await.unwrap_or_default(), + }), + } + } +} + +#[derive(Debug, serde::Deserialize)] +struct ScheduleResponse { + job_id: JobId, +} + +#[derive(Debug, thiserror::Error)] +pub enum ScheduleError { + #[error("network error contacting {url}")] + Network { + url: String, + #[source] + source: reqwest::Error, + }, + + #[error("failed to parse response from {url}")] + ResponseParse { + url: String, + #[source] + source: reqwest::Error, + }, + + #[error("invalid request")] + InvalidRequest(#[source] ApiError), + + #[error("no workers available")] + NoWorkersAvailable(#[source] ApiError), + + #[error("active GC job already exists for this location")] + ActiveJobConflict(#[source] ApiError), + + #[error("API error")] + Api(#[source] ApiError), + + #[error("unexpected response: status={status}, body={body}")] + UnexpectedResponse { status: u16, body: String }, +} diff --git a/crates/clients/admin/src/lib.rs b/crates/clients/admin/src/lib.rs index 6e3a016e4..2a1d64e9c 100644 --- a/crates/clients/admin/src/lib.rs +++ b/crates/clients/admin/src/lib.rs @@ -7,6 +7,7 @@ pub mod auth; pub mod datasets; pub mod end_block; pub mod error; +pub mod gc; pub mod jobs; pub mod manifests; pub mod providers; @@ -78,6 +79,11 @@ impl Client { manifests::ManifestsClient::new(self) } + /// Get a GC client for garbage collection operations. + pub fn gc(&self) -> gc::GcClient<'_> { + gc::GcClient::new(self) + } + /// Get a jobs client for jobs-related operations. pub fn jobs(&self) -> jobs::JobsClient<'_> { jobs::JobsClient::new(self) diff --git a/crates/services/admin-api/src/handlers.rs b/crates/services/admin-api/src/handlers.rs index 29be15661..126b9c4c6 100644 --- a/crates/services/admin-api/src/handlers.rs +++ b/crates/services/admin-api/src/handlers.rs @@ -3,6 +3,7 @@ pub mod error; pub mod datasets; pub mod files; +pub mod gc; pub mod jobs; pub mod manifests; pub mod providers; diff --git a/crates/services/admin-api/src/handlers/gc.rs b/crates/services/admin-api/src/handlers/gc.rs new file mode 100644 index 000000000..67098a010 --- /dev/null +++ b/crates/services/admin-api/src/handlers/gc.rs @@ -0,0 +1 @@ +pub mod schedule; diff --git a/crates/services/admin-api/src/handlers/gc/schedule.rs b/crates/services/admin-api/src/handlers/gc/schedule.rs new file mode 100644 index 000000000..b20c776c5 --- /dev/null +++ b/crates/services/admin-api/src/handlers/gc/schedule.rs @@ -0,0 +1,118 @@ +//! GC job scheduling handler + +use amp_worker_core::jobs::job_id::JobId; +use axum::{Json, extract::State, http::StatusCode}; +use metadata_db::physical_table_revision::LocationId; +use monitoring::logging; + +use crate::{ + ctx::Ctx, + handlers::error::{ErrorResponse, IntoErrorResponse}, + scheduler, +}; + +/// Request body for scheduling a GC job. +#[derive(Debug, serde::Deserialize)] +pub struct ScheduleGcRequest { + /// The location ID of the physical table revision to garbage collect. + pub location_id: i64, +} + +/// Response body for a scheduled GC job. +#[derive(Debug, serde::Serialize)] +pub struct ScheduleGcResponse { + /// The ID of the scheduled GC job. + pub job_id: JobId, +} + +/// Handler for the `POST /gc/schedule` endpoint +/// +/// Schedules a garbage collection job for a specific physical table revision. +/// The job will be picked up by an available worker for execution. +/// +/// ## Request Body +/// - `location_id`: The location ID of the physical table revision to collect +/// +/// ## Response +/// - **202 Accepted**: GC job scheduled successfully +/// - **400 Bad Request**: Invalid request body +/// - **409 Conflict**: An active GC job already exists for this location +/// - **500 Internal Server Error**: Scheduler error +#[tracing::instrument(skip_all, err)] +pub async fn handler( + State(ctx): State, + json: Result, axum::extract::rejection::JsonRejection>, +) -> Result<(StatusCode, Json), ErrorResponse> { + let Json(req) = json.map_err(|err| { + tracing::debug!(error = %err, "invalid request body"); + Error::InvalidBody(err) + })?; + + let location_id = LocationId::try_from(req.location_id).map_err(|_| { + tracing::debug!(location_id = req.location_id, "invalid location ID"); + Error::InvalidLocationId(req.location_id) + })?; + + let idempotency_key = amp_worker_gc::job_key::idempotency_key(location_id); + let descriptor = scheduler::JobDescriptor::from(amp_worker_gc::job_descriptor::JobDescriptor { + location_id, + }); + + let job_id = ctx + .scheduler + .schedule_job(idempotency_key.into(), descriptor, None) + .await + .map_err(|err| { + tracing::error!( + error = %err, + error_source = logging::error_source(&err), + %location_id, + "failed to schedule GC job" + ); + Error::Scheduler(err) + })?; + + tracing::info!(%job_id, %location_id, "GC job scheduled"); + Ok((StatusCode::ACCEPTED, Json(ScheduleGcResponse { job_id }))) +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Invalid request body + #[error("invalid request body: {0}")] + InvalidBody(#[source] axum::extract::rejection::JsonRejection), + + /// Invalid location ID + #[error("invalid location ID: {0}")] + InvalidLocationId(i64), + + /// Scheduler error + #[error("failed to schedule GC job")] + Scheduler(#[source] scheduler::ScheduleJobError), +} + +impl IntoErrorResponse for Error { + fn error_code(&self) -> &'static str { + match self { + Error::InvalidBody(_) => "INVALID_BODY", + Error::InvalidLocationId(_) => "INVALID_LOCATION_ID", + Error::Scheduler(err) => match err { + scheduler::ScheduleJobError::NoWorkersAvailable => "NO_WORKERS_AVAILABLE", + scheduler::ScheduleJobError::ActiveJobConflict { .. } => "ACTIVE_JOB_CONFLICT", + _ => "SCHEDULER_ERROR", + }, + } + } + + fn status_code(&self) -> StatusCode { + match self { + Error::InvalidBody(_) => StatusCode::BAD_REQUEST, + Error::InvalidLocationId(_) => StatusCode::BAD_REQUEST, + Error::Scheduler(err) => match err { + scheduler::ScheduleJobError::NoWorkersAvailable => StatusCode::BAD_REQUEST, + scheduler::ScheduleJobError::ActiveJobConflict { .. } => StatusCode::CONFLICT, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }, + } + } +} diff --git a/crates/services/admin-api/src/lib.rs b/crates/services/admin-api/src/lib.rs index f5323fab9..2db3ed60f 100644 --- a/crates/services/admin-api/src/lib.rs +++ b/crates/services/admin-api/src/lib.rs @@ -11,7 +11,7 @@ pub mod handlers; pub mod scheduler; use ctx::Ctx; -use handlers::{datasets, files, jobs, manifests, providers, revisions, schema, workers}; +use handlers::{datasets, files, gc, jobs, manifests, providers, revisions, schema, workers}; /// Create the admin API router with all routes registered /// @@ -114,6 +114,7 @@ pub fn router(ctx: Ctx) -> Router<()> { "/providers/{name}", get(providers::get_by_id::handler).delete(providers::delete_by_id::handler), ) + .route("/gc/schedule", post(gc::schedule::handler)) .route("/schema", post(schema::handler)) .route("/workers", get(workers::get_all::handler)) .route("/workers/{id}", get(workers::get_by_id::handler)) From 86ac0fb64abec0d17c13789db43b69658c5c32fb Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Wed, 25 Mar 2026 13:54:04 -0500 Subject: [PATCH 2/4] refactor: reorder module members to follow member ordering guideline Move main functions and error types before helper types per docs/code/rust-modules-members.md ordering convention. --- crates/bin/ampctl/src/cmd/gc/schedule.rs | 52 +++++++++---------- .../admin-api/src/handlers/gc/schedule.rs | 28 +++++----- 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/crates/bin/ampctl/src/cmd/gc/schedule.rs b/crates/bin/ampctl/src/cmd/gc/schedule.rs index 5b9176f7c..6cd4a2eb9 100644 --- a/crates/bin/ampctl/src/cmd/gc/schedule.rs +++ b/crates/bin/ampctl/src/cmd/gc/schedule.rs @@ -7,16 +7,6 @@ use monitoring::logging; use crate::args::GlobalArgs; -/// Command-line arguments for the `gc schedule` command. -#[derive(Debug, clap::Args)] -pub struct Args { - #[command(flatten)] - pub global: GlobalArgs, - - /// The location ID of the physical table revision to garbage collect - pub location_id: i64, -} - /// Schedule a GC job via the admin API. #[tracing::instrument(skip_all, fields(admin_url = %global.admin_url, location_id = %location_id))] pub async fn run( @@ -43,6 +33,32 @@ pub async fn run( Ok(()) } +/// Errors for GC schedule operations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Failed to build client + #[error("failed to build admin API client")] + ClientBuildError(#[source] crate::args::BuildClientError), + + /// Error scheduling GC job via admin API + #[error("failed to schedule GC job")] + ScheduleError(#[source] amp_client_admin::gc::ScheduleError), + + /// Failed to serialize result to JSON + #[error("failed to serialize result to JSON")] + JsonSerialization(#[source] serde_json::Error), +} + +/// Command-line arguments for the `gc schedule` command. +#[derive(Debug, clap::Args)] +pub struct Args { + #[command(flatten)] + pub global: GlobalArgs, + + /// The location ID of the physical table revision to garbage collect + pub location_id: i64, +} + /// Result of a GC schedule operation. #[derive(serde::Serialize)] struct ScheduleResult { @@ -61,19 +77,3 @@ impl std::fmt::Display for ScheduleResult { ) } } - -/// Errors for GC schedule operations. -#[derive(Debug, thiserror::Error)] -pub enum Error { - /// Failed to build client - #[error("failed to build admin API client")] - ClientBuildError(#[source] crate::args::BuildClientError), - - /// Error scheduling GC job via admin API - #[error("failed to schedule GC job")] - ScheduleError(#[source] amp_client_admin::gc::ScheduleError), - - /// Failed to serialize result to JSON - #[error("failed to serialize result to JSON")] - JsonSerialization(#[source] serde_json::Error), -} diff --git a/crates/services/admin-api/src/handlers/gc/schedule.rs b/crates/services/admin-api/src/handlers/gc/schedule.rs index b20c776c5..3378bf648 100644 --- a/crates/services/admin-api/src/handlers/gc/schedule.rs +++ b/crates/services/admin-api/src/handlers/gc/schedule.rs @@ -11,20 +11,6 @@ use crate::{ scheduler, }; -/// Request body for scheduling a GC job. -#[derive(Debug, serde::Deserialize)] -pub struct ScheduleGcRequest { - /// The location ID of the physical table revision to garbage collect. - pub location_id: i64, -} - -/// Response body for a scheduled GC job. -#[derive(Debug, serde::Serialize)] -pub struct ScheduleGcResponse { - /// The ID of the scheduled GC job. - pub job_id: JobId, -} - /// Handler for the `POST /gc/schedule` endpoint /// /// Schedules a garbage collection job for a specific physical table revision. @@ -116,3 +102,17 @@ impl IntoErrorResponse for Error { } } } + +/// Request body for scheduling a GC job. +#[derive(Debug, serde::Deserialize)] +pub struct ScheduleGcRequest { + /// The location ID of the physical table revision to garbage collect. + pub location_id: i64, +} + +/// Response body for a scheduled GC job. +#[derive(Debug, serde::Serialize)] +pub struct ScheduleGcResponse { + /// The ID of the scheduled GC job. + pub job_id: JobId, +} From d5e1dc6c5ac827d26d50f12d21d08cfb23815e42 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Wed, 25 Mar 2026 18:06:43 -0500 Subject: [PATCH 3/4] refactor(admin-api): use path parameter for GC schedule endpoint Change POST /gc/schedule (JSON body) to POST /gc/{location_id}/schedule (path parameter) to follow the existing /{resource}/{id}/{action} REST convention used by /jobs/{id}/stop, /revisions/{id}/restore, etc. --- crates/clients/admin/src/gc.rs | 13 +++--- .../admin-api/src/handlers/gc/schedule.rs | 43 +++++++++---------- crates/services/admin-api/src/lib.rs | 2 +- 3 files changed, 26 insertions(+), 32 deletions(-) diff --git a/crates/clients/admin/src/gc.rs b/crates/clients/admin/src/gc.rs index ef24f2519..a3367975e 100644 --- a/crates/clients/admin/src/gc.rs +++ b/crates/clients/admin/src/gc.rs @@ -11,9 +11,9 @@ use super::{ /// Build URL path for scheduling a GC job. /// -/// POST `/gc/schedule` -fn gc_schedule() -> &'static str { - "gc/schedule" +/// POST `/gc/{location_id}/schedule` +fn gc_schedule(location_id: i64) -> String { + format!("gc/{location_id}/schedule") } /// Client for GC-related API operations. @@ -31,18 +31,15 @@ impl<'a> GcClient<'a> { let url = self .client .base_url() - .join(gc_schedule()) + .join(&gc_schedule(location_id)) .expect("valid URL"); - let body = serde_json::json!({ "location_id": location_id }); - tracing::debug!(%location_id, "scheduling GC job"); let response = self .client .http() .post(url.as_str()) - .json(&body) .send() .await .map_err(|err| ScheduleError::Network { @@ -79,7 +76,7 @@ impl<'a> GcClient<'a> { body: text.clone(), })?; match error_response.error_code.as_str() { - "INVALID_BODY" | "INVALID_LOCATION_ID" => { + "INVALID_PATH" | "INVALID_LOCATION_ID" => { Err(ScheduleError::InvalidRequest(error_response.into())) } "NO_WORKERS_AVAILABLE" => { diff --git a/crates/services/admin-api/src/handlers/gc/schedule.rs b/crates/services/admin-api/src/handlers/gc/schedule.rs index 3378bf648..0e7b0fbe7 100644 --- a/crates/services/admin-api/src/handlers/gc/schedule.rs +++ b/crates/services/admin-api/src/handlers/gc/schedule.rs @@ -1,7 +1,11 @@ //! GC job scheduling handler use amp_worker_core::jobs::job_id::JobId; -use axum::{Json, extract::State, http::StatusCode}; +use axum::{ + Json, + extract::{Path, State, rejection::PathRejection}, + http::StatusCode, +}; use metadata_db::physical_table_revision::LocationId; use monitoring::logging; @@ -11,32 +15,32 @@ use crate::{ scheduler, }; -/// Handler for the `POST /gc/schedule` endpoint +/// Handler for the `POST /gc/{location_id}/schedule` endpoint /// /// Schedules a garbage collection job for a specific physical table revision. /// The job will be picked up by an available worker for execution. /// -/// ## Request Body +/// ## Path Parameters /// - `location_id`: The location ID of the physical table revision to collect /// /// ## Response /// - **202 Accepted**: GC job scheduled successfully -/// - **400 Bad Request**: Invalid request body +/// - **400 Bad Request**: Invalid location ID /// - **409 Conflict**: An active GC job already exists for this location /// - **500 Internal Server Error**: Scheduler error #[tracing::instrument(skip_all, err)] pub async fn handler( State(ctx): State, - json: Result, axum::extract::rejection::JsonRejection>, + path: Result, PathRejection>, ) -> Result<(StatusCode, Json), ErrorResponse> { - let Json(req) = json.map_err(|err| { - tracing::debug!(error = %err, "invalid request body"); - Error::InvalidBody(err) + let Path(raw_location_id) = path.map_err(|err| { + tracing::debug!(error = %err, "invalid path parameters"); + Error::InvalidPath(err) })?; - let location_id = LocationId::try_from(req.location_id).map_err(|_| { - tracing::debug!(location_id = req.location_id, "invalid location ID"); - Error::InvalidLocationId(req.location_id) + let location_id = LocationId::try_from(raw_location_id).map_err(|_| { + tracing::debug!(location_id = raw_location_id, "invalid location ID"); + Error::InvalidLocationId(raw_location_id) })?; let idempotency_key = amp_worker_gc::job_key::idempotency_key(location_id); @@ -64,9 +68,9 @@ pub async fn handler( #[derive(Debug, thiserror::Error)] pub enum Error { - /// Invalid request body - #[error("invalid request body: {0}")] - InvalidBody(#[source] axum::extract::rejection::JsonRejection), + /// Invalid path parameters + #[error("invalid path parameters: {0}")] + InvalidPath(#[source] PathRejection), /// Invalid location ID #[error("invalid location ID: {0}")] @@ -80,7 +84,7 @@ pub enum Error { impl IntoErrorResponse for Error { fn error_code(&self) -> &'static str { match self { - Error::InvalidBody(_) => "INVALID_BODY", + Error::InvalidPath(_) => "INVALID_PATH", Error::InvalidLocationId(_) => "INVALID_LOCATION_ID", Error::Scheduler(err) => match err { scheduler::ScheduleJobError::NoWorkersAvailable => "NO_WORKERS_AVAILABLE", @@ -92,7 +96,7 @@ impl IntoErrorResponse for Error { fn status_code(&self) -> StatusCode { match self { - Error::InvalidBody(_) => StatusCode::BAD_REQUEST, + Error::InvalidPath(_) => StatusCode::BAD_REQUEST, Error::InvalidLocationId(_) => StatusCode::BAD_REQUEST, Error::Scheduler(err) => match err { scheduler::ScheduleJobError::NoWorkersAvailable => StatusCode::BAD_REQUEST, @@ -103,13 +107,6 @@ impl IntoErrorResponse for Error { } } -/// Request body for scheduling a GC job. -#[derive(Debug, serde::Deserialize)] -pub struct ScheduleGcRequest { - /// The location ID of the physical table revision to garbage collect. - pub location_id: i64, -} - /// Response body for a scheduled GC job. #[derive(Debug, serde::Serialize)] pub struct ScheduleGcResponse { diff --git a/crates/services/admin-api/src/lib.rs b/crates/services/admin-api/src/lib.rs index 2db3ed60f..c3199cdee 100644 --- a/crates/services/admin-api/src/lib.rs +++ b/crates/services/admin-api/src/lib.rs @@ -114,7 +114,7 @@ pub fn router(ctx: Ctx) -> Router<()> { "/providers/{name}", get(providers::get_by_id::handler).delete(providers::delete_by_id::handler), ) - .route("/gc/schedule", post(gc::schedule::handler)) + .route("/gc/{location_id}/schedule", post(gc::schedule::handler)) .route("/schema", post(schema::handler)) .route("/workers", get(workers::get_all::handler)) .route("/workers/{id}", get(workers::get_by_id::handler)) From 75164062e6942aed41bfaf01f44482f739a7e6e9 Mon Sep 17 00:00:00 2001 From: Mitchell Spencer Date: Thu, 26 Mar 2026 12:25:42 -0500 Subject: [PATCH 4/4] refactor(admin-api): move GC scheduling to POST /jobs endpoint Replace the dedicated /gc/{location_id}/schedule endpoint with a POST /jobs endpoint that accepts a job descriptor body. The request body uses the same serde-tagged enum as the worker's JobDescriptor: {"kind": "gc", "location_id": 42} This aligns GC job creation with the existing job management API and allows future job types to be added to the same endpoint. The ampctl command changes from `ampctl gc schedule ` to `ampctl job create gc `. --- crates/bin/ampctl/src/cmd.rs | 1 - crates/bin/ampctl/src/cmd/gc.rs | 18 --- crates/bin/ampctl/src/cmd/gc/schedule.rs | 79 ---------- crates/bin/ampctl/src/cmd/job.rs | 5 + crates/bin/ampctl/src/cmd/job/create.rs | 92 ++++++++++++ crates/bin/ampctl/src/main.rs | 5 - crates/clients/admin/src/gc.rs | 134 ----------------- crates/clients/admin/src/jobs.rs | 125 ++++++++++++++++ crates/clients/admin/src/lib.rs | 6 - crates/services/admin-api/src/handlers.rs | 1 - crates/services/admin-api/src/handlers/gc.rs | 1 - .../admin-api/src/handlers/gc/schedule.rs | 115 --------------- .../services/admin-api/src/handlers/jobs.rs | 1 + .../admin-api/src/handlers/jobs/create.rs | 135 ++++++++++++++++++ crates/services/admin-api/src/lib.rs | 7 +- 15 files changed, 362 insertions(+), 363 deletions(-) delete mode 100644 crates/bin/ampctl/src/cmd/gc.rs delete mode 100644 crates/bin/ampctl/src/cmd/gc/schedule.rs create mode 100644 crates/bin/ampctl/src/cmd/job/create.rs delete mode 100644 crates/clients/admin/src/gc.rs delete mode 100644 crates/services/admin-api/src/handlers/gc.rs delete mode 100644 crates/services/admin-api/src/handlers/gc/schedule.rs create mode 100644 crates/services/admin-api/src/handlers/jobs/create.rs diff --git a/crates/bin/ampctl/src/cmd.rs b/crates/bin/ampctl/src/cmd.rs index 8c571c2da..cd5ed39dc 100644 --- a/crates/bin/ampctl/src/cmd.rs +++ b/crates/bin/ampctl/src/cmd.rs @@ -1,5 +1,4 @@ pub mod dataset; -pub mod gc; pub mod job; pub mod manifest; pub mod provider; diff --git a/crates/bin/ampctl/src/cmd/gc.rs b/crates/bin/ampctl/src/cmd/gc.rs deleted file mode 100644 index 483e3e4f3..000000000 --- a/crates/bin/ampctl/src/cmd/gc.rs +++ /dev/null @@ -1,18 +0,0 @@ -//! Garbage collection commands - -pub mod schedule; - -/// GC subcommands. -#[derive(Debug, clap::Subcommand)] -pub enum Commands { - /// Schedule a GC job for a physical table revision - Schedule(schedule::Args), -} - -/// Execute the GC command with the given subcommand. -pub async fn run(command: Commands) -> anyhow::Result<()> { - match command { - Commands::Schedule(args) => schedule::run(args).await?, - } - Ok(()) -} diff --git a/crates/bin/ampctl/src/cmd/gc/schedule.rs b/crates/bin/ampctl/src/cmd/gc/schedule.rs deleted file mode 100644 index 6cd4a2eb9..000000000 --- a/crates/bin/ampctl/src/cmd/gc/schedule.rs +++ /dev/null @@ -1,79 +0,0 @@ -//! GC schedule command. -//! -//! Schedules a garbage collection job for a physical table revision -//! through the admin API. - -use monitoring::logging; - -use crate::args::GlobalArgs; - -/// Schedule a GC job via the admin API. -#[tracing::instrument(skip_all, fields(admin_url = %global.admin_url, location_id = %location_id))] -pub async fn run( - Args { - global, - location_id, - }: Args, -) -> Result<(), Error> { - let client = global.build_client().map_err(Error::ClientBuildError)?; - - tracing::debug!("Scheduling GC job via admin API"); - - let job_id = client.gc().schedule(location_id).await.map_err(|err| { - tracing::error!(error = %err, error_source = logging::error_source(&err), "Failed to schedule GC job"); - Error::ScheduleError(err) - })?; - - let result = ScheduleResult { - job_id, - location_id, - }; - global.print(&result).map_err(Error::JsonSerialization)?; - - Ok(()) -} - -/// Errors for GC schedule operations. -#[derive(Debug, thiserror::Error)] -pub enum Error { - /// Failed to build client - #[error("failed to build admin API client")] - ClientBuildError(#[source] crate::args::BuildClientError), - - /// Error scheduling GC job via admin API - #[error("failed to schedule GC job")] - ScheduleError(#[source] amp_client_admin::gc::ScheduleError), - - /// Failed to serialize result to JSON - #[error("failed to serialize result to JSON")] - JsonSerialization(#[source] serde_json::Error), -} - -/// Command-line arguments for the `gc schedule` command. -#[derive(Debug, clap::Args)] -pub struct Args { - #[command(flatten)] - pub global: GlobalArgs, - - /// The location ID of the physical table revision to garbage collect - pub location_id: i64, -} - -/// Result of a GC schedule operation. -#[derive(serde::Serialize)] -struct ScheduleResult { - job_id: amp_worker_core::jobs::job_id::JobId, - location_id: i64, -} - -impl std::fmt::Display for ScheduleResult { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - writeln!( - f, - "{} GC job {} scheduled for location {}", - console::style("✓").green().bold(), - self.job_id, - self.location_id, - ) - } -} diff --git a/crates/bin/ampctl/src/cmd/job.rs b/crates/bin/ampctl/src/cmd/job.rs index e43f127cf..51ad805e2 100644 --- a/crates/bin/ampctl/src/cmd/job.rs +++ b/crates/bin/ampctl/src/cmd/job.rs @@ -1,5 +1,6 @@ //! Job management commands +pub mod create; pub mod events; pub mod inspect; pub mod list; @@ -11,6 +12,9 @@ pub mod stop; /// Job management subcommands. #[derive(Debug, clap::Subcommand)] pub enum Commands { + /// Create a new job + Create(create::Args), + /// Get lifecycle events for a job #[command(after_help = include_str!("job/events__after_help.md"))] Events(events::Args), @@ -46,6 +50,7 @@ pub enum Commands { /// Execute the job command with the given subcommand. pub async fn run(command: Commands) -> anyhow::Result<()> { match command { + Commands::Create(args) => create::run(args).await?, Commands::Events(args) => events::run(args).await?, Commands::List(args) => list::run(args).await?, Commands::Inspect(args) => inspect::run(args).await?, diff --git a/crates/bin/ampctl/src/cmd/job/create.rs b/crates/bin/ampctl/src/cmd/job/create.rs new file mode 100644 index 000000000..c728ad09a --- /dev/null +++ b/crates/bin/ampctl/src/cmd/job/create.rs @@ -0,0 +1,92 @@ +//! Job creation command. +//! +//! Creates a new job via the admin API's `POST /jobs` endpoint. + +use amp_client_admin::jobs::{CreateError, CreateJobRequest}; +use monitoring::logging; + +use crate::args::GlobalArgs; + +/// Create a new job via the admin API. +#[tracing::instrument(skip_all, fields(admin_url = %global.admin_url))] +pub async fn run(Args { global, kind }: Args) -> Result<(), Error> { + let client = global.build_client().map_err(Error::ClientBuildError)?; + + let request = match kind { + JobKind::Gc(gc_args) => CreateJobRequest::Gc { + location_id: gc_args.location_id, + }, + }; + + tracing::debug!("Creating job via admin API"); + + let job_id = client.jobs().create(&request).await.map_err(|err| { + tracing::error!(error = %err, error_source = logging::error_source(&err), "Failed to create job"); + Error::CreateError(err) + })?; + + let result = CreateResult { job_id, request }; + global.print(&result).map_err(Error::JsonSerialization)?; + + Ok(()) +} + +/// Errors for job creation operations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Failed to build client + #[error("failed to build admin API client")] + ClientBuildError(#[source] crate::args::BuildClientError), + + /// Error creating job via admin API + #[error("failed to create job")] + CreateError(#[source] CreateError), + + /// Failed to serialize result to JSON + #[error("failed to serialize result to JSON")] + JsonSerialization(#[source] serde_json::Error), +} + +/// Command-line arguments for the `job create` command. +#[derive(Debug, clap::Args)] +pub struct Args { + #[command(flatten)] + pub global: GlobalArgs, + + /// The job kind to create + #[command(subcommand)] + pub kind: JobKind, +} + +/// Supported job kinds for creation. +#[derive(Debug, clap::Subcommand)] +pub enum JobKind { + /// Schedule a garbage collection job for a physical table revision + Gc(GcArgs), +} + +/// Arguments for creating a GC job. +#[derive(Debug, clap::Args)] +pub struct GcArgs { + /// The location ID of the physical table revision to garbage collect + pub location_id: i64, +} + +/// Result of a job creation operation. +#[derive(serde::Serialize)] +struct CreateResult { + job_id: amp_worker_core::jobs::job_id::JobId, + #[serde(flatten)] + request: CreateJobRequest, +} + +impl std::fmt::Display for CreateResult { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + writeln!( + f, + "{} Job {} created", + console::style("✓").green().bold(), + self.job_id, + ) + } +} diff --git a/crates/bin/ampctl/src/main.rs b/crates/bin/ampctl/src/main.rs index 40d1a6698..beaaeeea7 100644 --- a/crates/bin/ampctl/src/main.rs +++ b/crates/bin/ampctl/src/main.rs @@ -56,10 +56,6 @@ enum Commands { #[command(long_about = include_str!("cmd/dataset__long_about.md"))] Dataset(cmd::dataset::Commands), - /// Manage garbage collection jobs - #[command(subcommand)] - Gc(cmd::gc::Commands), - /// Manage table revisions #[command(subcommand)] #[command(alias = "tables")] @@ -90,7 +86,6 @@ async fn run() -> anyhow::Result<()> { Commands::Provider(command) => cmd::provider::run(command).await?, Commands::Job(command) => cmd::job::run(command).await?, Commands::Dataset(command) => cmd::dataset::run(command).await?, - Commands::Gc(command) => cmd::gc::run(command).await?, Commands::Table(command) => cmd::table::run(command).await?, Commands::Worker(command) => cmd::worker::run(command).await?, Commands::Trace(command) => cmd::trace::run(command).await?, diff --git a/crates/clients/admin/src/gc.rs b/crates/clients/admin/src/gc.rs deleted file mode 100644 index a3367975e..000000000 --- a/crates/clients/admin/src/gc.rs +++ /dev/null @@ -1,134 +0,0 @@ -//! GC management API client. -//! -//! Provides methods for interacting with the `/gc` endpoints of the admin API. - -use amp_worker_core::jobs::job_id::JobId; - -use super::{ - Client, - error::{ApiError, ErrorResponse}, -}; - -/// Build URL path for scheduling a GC job. -/// -/// POST `/gc/{location_id}/schedule` -fn gc_schedule(location_id: i64) -> String { - format!("gc/{location_id}/schedule") -} - -/// Client for GC-related API operations. -pub struct GcClient<'a> { - client: &'a Client, -} - -impl<'a> GcClient<'a> { - pub fn new(client: &'a Client) -> Self { - Self { client } - } - - /// Schedule a GC job for a physical table revision. - pub async fn schedule(&self, location_id: i64) -> Result { - let url = self - .client - .base_url() - .join(&gc_schedule(location_id)) - .expect("valid URL"); - - tracing::debug!(%location_id, "scheduling GC job"); - - let response = self - .client - .http() - .post(url.as_str()) - .send() - .await - .map_err(|err| ScheduleError::Network { - url: url.to_string(), - source: err, - })?; - - let status = response.status(); - tracing::debug!(status = %status, "received API response"); - - match status.as_u16() { - 202 => { - let resp: ScheduleResponse = - response - .json() - .await - .map_err(|err| ScheduleError::ResponseParse { - url: url.to_string(), - source: err, - })?; - Ok(resp.job_id) - } - 400 | 409 | 500 => { - let text = response - .text() - .await - .map_err(|err| ScheduleError::ResponseParse { - url: url.to_string(), - source: err, - })?; - let error_response: ErrorResponse = - serde_json::from_str(&text).map_err(|_| ScheduleError::UnexpectedResponse { - status: status.as_u16(), - body: text.clone(), - })?; - match error_response.error_code.as_str() { - "INVALID_PATH" | "INVALID_LOCATION_ID" => { - Err(ScheduleError::InvalidRequest(error_response.into())) - } - "NO_WORKERS_AVAILABLE" => { - Err(ScheduleError::NoWorkersAvailable(error_response.into())) - } - "ACTIVE_JOB_CONFLICT" => { - Err(ScheduleError::ActiveJobConflict(error_response.into())) - } - _ => Err(ScheduleError::Api(error_response.into())), - } - } - _ => Err(ScheduleError::UnexpectedResponse { - status: status.as_u16(), - body: response.text().await.unwrap_or_default(), - }), - } - } -} - -#[derive(Debug, serde::Deserialize)] -struct ScheduleResponse { - job_id: JobId, -} - -#[derive(Debug, thiserror::Error)] -pub enum ScheduleError { - #[error("network error contacting {url}")] - Network { - url: String, - #[source] - source: reqwest::Error, - }, - - #[error("failed to parse response from {url}")] - ResponseParse { - url: String, - #[source] - source: reqwest::Error, - }, - - #[error("invalid request")] - InvalidRequest(#[source] ApiError), - - #[error("no workers available")] - NoWorkersAvailable(#[source] ApiError), - - #[error("active GC job already exists for this location")] - ActiveJobConflict(#[source] ApiError), - - #[error("API error")] - Api(#[source] ApiError), - - #[error("unexpected response: status={status}, body={body}")] - UnexpectedResponse { status: u16, body: String }, -} diff --git a/crates/clients/admin/src/jobs.rs b/crates/clients/admin/src/jobs.rs index 9f34b26ad..2fcbfe4a0 100644 --- a/crates/clients/admin/src/jobs.rs +++ b/crates/clients/admin/src/jobs.rs @@ -82,6 +82,80 @@ impl<'a> JobsClient<'a> { Self { client } } + /// Create a new job. + /// + /// POSTs to `/jobs` endpoint with a job descriptor. + /// + /// # Errors + /// + /// Returns [`CreateError`] for network errors, invalid requests, + /// active job conflicts, or unexpected responses. + #[tracing::instrument(skip(self))] + pub async fn create(&self, request: &CreateJobRequest) -> Result { + let url = self.client.base_url().join(jobs_list()).expect("valid URL"); + + tracing::debug!("Sending POST request to create job"); + + let response = self + .client + .http() + .post(url.as_str()) + .json(request) + .send() + .await + .map_err(|err| CreateError::Network { + url: url.to_string(), + source: err, + })?; + + let status = response.status(); + tracing::debug!(status = %status, "received API response"); + + match status.as_u16() { + 202 => { + let resp: CreateJobResponse = + response + .json() + .await + .map_err(|err| CreateError::ResponseParse { + url: url.to_string(), + source: err, + })?; + Ok(resp.job_id) + } + 400 | 409 | 500 => { + let text = response + .text() + .await + .map_err(|err| CreateError::ResponseParse { + url: url.to_string(), + source: err, + })?; + let error_response: ErrorResponse = + serde_json::from_str(&text).map_err(|_| CreateError::UnexpectedResponse { + status: status.as_u16(), + body: text.clone(), + })?; + match error_response.error_code.as_str() { + "INVALID_BODY" | "INVALID_LOCATION_ID" => { + Err(CreateError::InvalidRequest(error_response.into())) + } + "NO_WORKERS_AVAILABLE" => { + Err(CreateError::NoWorkersAvailable(error_response.into())) + } + "ACTIVE_JOB_CONFLICT" => { + Err(CreateError::ActiveJobConflict(error_response.into())) + } + _ => Err(CreateError::Api(error_response.into())), + } + } + _ => Err(CreateError::UnexpectedResponse { + status: status.as_u16(), + body: response.text().await.unwrap_or_default(), + }), + } + } + /// Get a job by ID. /// /// GETs from `/jobs/{id}` endpoint. @@ -1276,3 +1350,54 @@ pub enum GetProgressError { #[error("unexpected response (status {status}): {message}")] UnexpectedResponse { status: u16, message: String }, } + +/// Request body for creating a job via `POST /jobs`. +/// +/// Dispatches on the `kind` field to determine the job type. +#[derive(Debug, serde::Serialize)] +#[serde(tag = "kind", rename_all = "kebab-case")] +pub enum CreateJobRequest { + /// Schedule a garbage collection job for a physical table revision. + Gc { + /// The location ID of the physical table revision to garbage collect. + location_id: i64, + }, +} + +#[derive(Debug, serde::Deserialize)] +struct CreateJobResponse { + job_id: JobId, +} + +/// Errors from [`JobsClient::create`]. +#[derive(Debug, thiserror::Error)] +pub enum CreateError { + #[error("network error contacting {url}")] + Network { + url: String, + #[source] + source: reqwest::Error, + }, + + #[error("failed to parse response from {url}")] + ResponseParse { + url: String, + #[source] + source: reqwest::Error, + }, + + #[error("invalid request")] + InvalidRequest(#[source] ApiError), + + #[error("no workers available")] + NoWorkersAvailable(#[source] ApiError), + + #[error("active job already exists")] + ActiveJobConflict(#[source] ApiError), + + #[error("API error")] + Api(#[source] ApiError), + + #[error("unexpected response: status={status}, body={body}")] + UnexpectedResponse { status: u16, body: String }, +} diff --git a/crates/clients/admin/src/lib.rs b/crates/clients/admin/src/lib.rs index 2a1d64e9c..6e3a016e4 100644 --- a/crates/clients/admin/src/lib.rs +++ b/crates/clients/admin/src/lib.rs @@ -7,7 +7,6 @@ pub mod auth; pub mod datasets; pub mod end_block; pub mod error; -pub mod gc; pub mod jobs; pub mod manifests; pub mod providers; @@ -79,11 +78,6 @@ impl Client { manifests::ManifestsClient::new(self) } - /// Get a GC client for garbage collection operations. - pub fn gc(&self) -> gc::GcClient<'_> { - gc::GcClient::new(self) - } - /// Get a jobs client for jobs-related operations. pub fn jobs(&self) -> jobs::JobsClient<'_> { jobs::JobsClient::new(self) diff --git a/crates/services/admin-api/src/handlers.rs b/crates/services/admin-api/src/handlers.rs index 126b9c4c6..29be15661 100644 --- a/crates/services/admin-api/src/handlers.rs +++ b/crates/services/admin-api/src/handlers.rs @@ -3,7 +3,6 @@ pub mod error; pub mod datasets; pub mod files; -pub mod gc; pub mod jobs; pub mod manifests; pub mod providers; diff --git a/crates/services/admin-api/src/handlers/gc.rs b/crates/services/admin-api/src/handlers/gc.rs deleted file mode 100644 index 67098a010..000000000 --- a/crates/services/admin-api/src/handlers/gc.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod schedule; diff --git a/crates/services/admin-api/src/handlers/gc/schedule.rs b/crates/services/admin-api/src/handlers/gc/schedule.rs deleted file mode 100644 index 0e7b0fbe7..000000000 --- a/crates/services/admin-api/src/handlers/gc/schedule.rs +++ /dev/null @@ -1,115 +0,0 @@ -//! GC job scheduling handler - -use amp_worker_core::jobs::job_id::JobId; -use axum::{ - Json, - extract::{Path, State, rejection::PathRejection}, - http::StatusCode, -}; -use metadata_db::physical_table_revision::LocationId; -use monitoring::logging; - -use crate::{ - ctx::Ctx, - handlers::error::{ErrorResponse, IntoErrorResponse}, - scheduler, -}; - -/// Handler for the `POST /gc/{location_id}/schedule` endpoint -/// -/// Schedules a garbage collection job for a specific physical table revision. -/// The job will be picked up by an available worker for execution. -/// -/// ## Path Parameters -/// - `location_id`: The location ID of the physical table revision to collect -/// -/// ## Response -/// - **202 Accepted**: GC job scheduled successfully -/// - **400 Bad Request**: Invalid location ID -/// - **409 Conflict**: An active GC job already exists for this location -/// - **500 Internal Server Error**: Scheduler error -#[tracing::instrument(skip_all, err)] -pub async fn handler( - State(ctx): State, - path: Result, PathRejection>, -) -> Result<(StatusCode, Json), ErrorResponse> { - let Path(raw_location_id) = path.map_err(|err| { - tracing::debug!(error = %err, "invalid path parameters"); - Error::InvalidPath(err) - })?; - - let location_id = LocationId::try_from(raw_location_id).map_err(|_| { - tracing::debug!(location_id = raw_location_id, "invalid location ID"); - Error::InvalidLocationId(raw_location_id) - })?; - - let idempotency_key = amp_worker_gc::job_key::idempotency_key(location_id); - let descriptor = scheduler::JobDescriptor::from(amp_worker_gc::job_descriptor::JobDescriptor { - location_id, - }); - - let job_id = ctx - .scheduler - .schedule_job(idempotency_key.into(), descriptor, None) - .await - .map_err(|err| { - tracing::error!( - error = %err, - error_source = logging::error_source(&err), - %location_id, - "failed to schedule GC job" - ); - Error::Scheduler(err) - })?; - - tracing::info!(%job_id, %location_id, "GC job scheduled"); - Ok((StatusCode::ACCEPTED, Json(ScheduleGcResponse { job_id }))) -} - -#[derive(Debug, thiserror::Error)] -pub enum Error { - /// Invalid path parameters - #[error("invalid path parameters: {0}")] - InvalidPath(#[source] PathRejection), - - /// Invalid location ID - #[error("invalid location ID: {0}")] - InvalidLocationId(i64), - - /// Scheduler error - #[error("failed to schedule GC job")] - Scheduler(#[source] scheduler::ScheduleJobError), -} - -impl IntoErrorResponse for Error { - fn error_code(&self) -> &'static str { - match self { - Error::InvalidPath(_) => "INVALID_PATH", - Error::InvalidLocationId(_) => "INVALID_LOCATION_ID", - Error::Scheduler(err) => match err { - scheduler::ScheduleJobError::NoWorkersAvailable => "NO_WORKERS_AVAILABLE", - scheduler::ScheduleJobError::ActiveJobConflict { .. } => "ACTIVE_JOB_CONFLICT", - _ => "SCHEDULER_ERROR", - }, - } - } - - fn status_code(&self) -> StatusCode { - match self { - Error::InvalidPath(_) => StatusCode::BAD_REQUEST, - Error::InvalidLocationId(_) => StatusCode::BAD_REQUEST, - Error::Scheduler(err) => match err { - scheduler::ScheduleJobError::NoWorkersAvailable => StatusCode::BAD_REQUEST, - scheduler::ScheduleJobError::ActiveJobConflict { .. } => StatusCode::CONFLICT, - _ => StatusCode::INTERNAL_SERVER_ERROR, - }, - } - } -} - -/// Response body for a scheduled GC job. -#[derive(Debug, serde::Serialize)] -pub struct ScheduleGcResponse { - /// The ID of the scheduled GC job. - pub job_id: JobId, -} diff --git a/crates/services/admin-api/src/handlers/jobs.rs b/crates/services/admin-api/src/handlers/jobs.rs index 2a1389b2b..b2c78aa3f 100644 --- a/crates/services/admin-api/src/handlers/jobs.rs +++ b/crates/services/admin-api/src/handlers/jobs.rs @@ -1,5 +1,6 @@ //! Jobs HTTP handlers +pub mod create; pub mod delete; pub mod delete_by_id; pub mod event_by_id; diff --git a/crates/services/admin-api/src/handlers/jobs/create.rs b/crates/services/admin-api/src/handlers/jobs/create.rs new file mode 100644 index 000000000..2b44171ce --- /dev/null +++ b/crates/services/admin-api/src/handlers/jobs/create.rs @@ -0,0 +1,135 @@ +//! Job creation handler + +use amp_worker_core::jobs::job_id::JobId; +use axum::{Json, extract::State, http::StatusCode}; +use metadata_db::physical_table_revision::LocationId; +use monitoring::logging; + +use crate::{ + ctx::Ctx, + handlers::error::{ErrorResponse, IntoErrorResponse}, + scheduler, +}; + +/// Handler for the `POST /jobs` endpoint +/// +/// Schedules a new job for execution by an available worker. +/// Currently supports GC jobs only. +/// +/// ## Request Body +/// - `kind`: The job type (currently only `"gc"`) +/// - Additional fields depend on the job kind +/// +/// ### GC job +/// ```json +/// { "kind": "gc", "location_id": 42 } +/// ``` +/// +/// ## Response +/// - **202 Accepted**: Job scheduled successfully +/// - **400 Bad Request**: Invalid request body or unsupported job kind +/// - **409 Conflict**: An active job with the same idempotency key already exists +/// - **500 Internal Server Error**: Scheduler error +#[tracing::instrument(skip_all, err)] +pub async fn handler( + State(ctx): State, + json: Result, axum::extract::rejection::JsonRejection>, +) -> Result<(StatusCode, Json), ErrorResponse> { + let Json(req) = json.map_err(|err| { + tracing::debug!(error = %err, "invalid request body"); + Error::InvalidBody(err) + })?; + + let (idempotency_key, descriptor) = match req { + CreateJobRequest::Gc { location_id } => { + let location_id = LocationId::try_from(location_id).map_err(|_| { + tracing::debug!(location_id, "invalid location ID"); + Error::InvalidLocationId(location_id) + })?; + + let key = amp_worker_gc::job_key::idempotency_key(location_id); + let desc = + scheduler::JobDescriptor::from(amp_worker_gc::job_descriptor::JobDescriptor { + location_id, + }); + (key, desc) + } + }; + + let job_id = ctx + .scheduler + .schedule_job(idempotency_key.into(), descriptor, None) + .await + .map_err(|err| { + tracing::error!( + error = %err, + error_source = logging::error_source(&err), + "failed to schedule job" + ); + Error::Scheduler(err) + })?; + + tracing::info!(%job_id, "job scheduled"); + Ok((StatusCode::ACCEPTED, Json(CreateJobResponse { job_id }))) +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Invalid request body + #[error("invalid request body: {0}")] + InvalidBody(#[source] axum::extract::rejection::JsonRejection), + + /// Invalid location ID + #[error("invalid location ID: {0}")] + InvalidLocationId(i64), + + /// Scheduler error + #[error("failed to schedule job")] + Scheduler(#[source] scheduler::ScheduleJobError), +} + +impl IntoErrorResponse for Error { + fn error_code(&self) -> &'static str { + match self { + Error::InvalidBody(_) => "INVALID_BODY", + Error::InvalidLocationId(_) => "INVALID_LOCATION_ID", + Error::Scheduler(err) => match err { + scheduler::ScheduleJobError::NoWorkersAvailable => "NO_WORKERS_AVAILABLE", + scheduler::ScheduleJobError::ActiveJobConflict { .. } => "ACTIVE_JOB_CONFLICT", + _ => "SCHEDULER_ERROR", + }, + } + } + + fn status_code(&self) -> StatusCode { + match self { + Error::InvalidBody(_) => StatusCode::BAD_REQUEST, + Error::InvalidLocationId(_) => StatusCode::BAD_REQUEST, + Error::Scheduler(err) => match err { + scheduler::ScheduleJobError::NoWorkersAvailable => StatusCode::BAD_REQUEST, + scheduler::ScheduleJobError::ActiveJobConflict { .. } => StatusCode::CONFLICT, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }, + } + } +} + +/// Request body for creating a job. +/// +/// Dispatches on the `kind` field to determine the job type. +#[derive(Debug, serde::Deserialize)] +#[serde(tag = "kind", rename_all = "kebab-case")] +pub enum CreateJobRequest { + /// Schedule a garbage collection job for a physical table revision. + Gc { + /// The location ID of the physical table revision to garbage collect. + location_id: i64, + }, +} + +/// Response body for a created job. +#[derive(Debug, serde::Serialize)] +pub struct CreateJobResponse { + /// The ID of the scheduled job. + pub job_id: JobId, +} diff --git a/crates/services/admin-api/src/lib.rs b/crates/services/admin-api/src/lib.rs index c3199cdee..957134219 100644 --- a/crates/services/admin-api/src/lib.rs +++ b/crates/services/admin-api/src/lib.rs @@ -11,7 +11,7 @@ pub mod handlers; pub mod scheduler; use ctx::Ctx; -use handlers::{datasets, files, gc, jobs, manifests, providers, revisions, schema, workers}; +use handlers::{datasets, files, jobs, manifests, providers, revisions, schema, workers}; /// Create the admin API router with all routes registered /// @@ -79,7 +79,9 @@ pub fn router(ctx: Ctx) -> Router<()> { .route("/files/{file_id}", get(files::get_by_id::handler)) .route( "/jobs", - get(jobs::get_all::handler).delete(jobs::delete::handler), + get(jobs::get_all::handler) + .post(jobs::create::handler) + .delete(jobs::delete::handler), ) .route( "/jobs/{id}", @@ -114,7 +116,6 @@ pub fn router(ctx: Ctx) -> Router<()> { "/providers/{name}", get(providers::get_by_id::handler).delete(providers::delete_by_id::handler), ) - .route("/gc/{location_id}/schedule", post(gc::schedule::handler)) .route("/schema", post(schema::handler)) .route("/workers", get(workers::get_all::handler)) .route("/workers/{id}", get(workers::get_by_id::handler))