diff --git a/crates/bin/ampctl/src/cmd/job.rs b/crates/bin/ampctl/src/cmd/job.rs index e43f127cf..51ad805e2 100644 --- a/crates/bin/ampctl/src/cmd/job.rs +++ b/crates/bin/ampctl/src/cmd/job.rs @@ -1,5 +1,6 @@ //! Job management commands +pub mod create; pub mod events; pub mod inspect; pub mod list; @@ -11,6 +12,9 @@ pub mod stop; /// Job management subcommands. #[derive(Debug, clap::Subcommand)] pub enum Commands { + /// Create a new job + Create(create::Args), + /// Get lifecycle events for a job #[command(after_help = include_str!("job/events__after_help.md"))] Events(events::Args), @@ -46,6 +50,7 @@ pub enum Commands { /// Execute the job command with the given subcommand. pub async fn run(command: Commands) -> anyhow::Result<()> { match command { + Commands::Create(args) => create::run(args).await?, Commands::Events(args) => events::run(args).await?, Commands::List(args) => list::run(args).await?, Commands::Inspect(args) => inspect::run(args).await?, diff --git a/crates/bin/ampctl/src/cmd/job/create.rs b/crates/bin/ampctl/src/cmd/job/create.rs new file mode 100644 index 000000000..c728ad09a --- /dev/null +++ b/crates/bin/ampctl/src/cmd/job/create.rs @@ -0,0 +1,92 @@ +//! Job creation command. +//! +//! Creates a new job via the admin API's `POST /jobs` endpoint. + +use amp_client_admin::jobs::{CreateError, CreateJobRequest}; +use monitoring::logging; + +use crate::args::GlobalArgs; + +/// Create a new job via the admin API. +#[tracing::instrument(skip_all, fields(admin_url = %global.admin_url))] +pub async fn run(Args { global, kind }: Args) -> Result<(), Error> { + let client = global.build_client().map_err(Error::ClientBuildError)?; + + let request = match kind { + JobKind::Gc(gc_args) => CreateJobRequest::Gc { + location_id: gc_args.location_id, + }, + }; + + tracing::debug!("Creating job via admin API"); + + let job_id = client.jobs().create(&request).await.map_err(|err| { + tracing::error!(error = %err, error_source = logging::error_source(&err), "Failed to create job"); + Error::CreateError(err) + })?; + + let result = CreateResult { job_id, request }; + global.print(&result).map_err(Error::JsonSerialization)?; + + Ok(()) +} + +/// Errors for job creation operations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Failed to build client + #[error("failed to build admin API client")] + ClientBuildError(#[source] crate::args::BuildClientError), + + /// Error creating job via admin API + #[error("failed to create job")] + CreateError(#[source] CreateError), + + /// Failed to serialize result to JSON + #[error("failed to serialize result to JSON")] + JsonSerialization(#[source] serde_json::Error), +} + +/// Command-line arguments for the `job create` command. +#[derive(Debug, clap::Args)] +pub struct Args { + #[command(flatten)] + pub global: GlobalArgs, + + /// The job kind to create + #[command(subcommand)] + pub kind: JobKind, +} + +/// Supported job kinds for creation. +#[derive(Debug, clap::Subcommand)] +pub enum JobKind { + /// Schedule a garbage collection job for a physical table revision + Gc(GcArgs), +} + +/// Arguments for creating a GC job. +#[derive(Debug, clap::Args)] +pub struct GcArgs { + /// The location ID of the physical table revision to garbage collect + pub location_id: i64, +} + +/// Result of a job creation operation. +#[derive(serde::Serialize)] +struct CreateResult { + job_id: amp_worker_core::jobs::job_id::JobId, + #[serde(flatten)] + request: CreateJobRequest, +} + +impl std::fmt::Display for CreateResult { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + writeln!( + f, + "{} Job {} created", + console::style("✓").green().bold(), + self.job_id, + ) + } +} diff --git a/crates/clients/admin/src/jobs.rs b/crates/clients/admin/src/jobs.rs index 9f34b26ad..2fcbfe4a0 100644 --- a/crates/clients/admin/src/jobs.rs +++ b/crates/clients/admin/src/jobs.rs @@ -82,6 +82,80 @@ impl<'a> JobsClient<'a> { Self { client } } + /// Create a new job. + /// + /// POSTs to `/jobs` endpoint with a job descriptor. + /// + /// # Errors + /// + /// Returns [`CreateError`] for network errors, invalid requests, + /// active job conflicts, or unexpected responses. + #[tracing::instrument(skip(self))] + pub async fn create(&self, request: &CreateJobRequest) -> Result { + let url = self.client.base_url().join(jobs_list()).expect("valid URL"); + + tracing::debug!("Sending POST request to create job"); + + let response = self + .client + .http() + .post(url.as_str()) + .json(request) + .send() + .await + .map_err(|err| CreateError::Network { + url: url.to_string(), + source: err, + })?; + + let status = response.status(); + tracing::debug!(status = %status, "received API response"); + + match status.as_u16() { + 202 => { + let resp: CreateJobResponse = + response + .json() + .await + .map_err(|err| CreateError::ResponseParse { + url: url.to_string(), + source: err, + })?; + Ok(resp.job_id) + } + 400 | 409 | 500 => { + let text = response + .text() + .await + .map_err(|err| CreateError::ResponseParse { + url: url.to_string(), + source: err, + })?; + let error_response: ErrorResponse = + serde_json::from_str(&text).map_err(|_| CreateError::UnexpectedResponse { + status: status.as_u16(), + body: text.clone(), + })?; + match error_response.error_code.as_str() { + "INVALID_BODY" | "INVALID_LOCATION_ID" => { + Err(CreateError::InvalidRequest(error_response.into())) + } + "NO_WORKERS_AVAILABLE" => { + Err(CreateError::NoWorkersAvailable(error_response.into())) + } + "ACTIVE_JOB_CONFLICT" => { + Err(CreateError::ActiveJobConflict(error_response.into())) + } + _ => Err(CreateError::Api(error_response.into())), + } + } + _ => Err(CreateError::UnexpectedResponse { + status: status.as_u16(), + body: response.text().await.unwrap_or_default(), + }), + } + } + /// Get a job by ID. /// /// GETs from `/jobs/{id}` endpoint. @@ -1276,3 +1350,54 @@ pub enum GetProgressError { #[error("unexpected response (status {status}): {message}")] UnexpectedResponse { status: u16, message: String }, } + +/// Request body for creating a job via `POST /jobs`. +/// +/// Dispatches on the `kind` field to determine the job type. +#[derive(Debug, serde::Serialize)] +#[serde(tag = "kind", rename_all = "kebab-case")] +pub enum CreateJobRequest { + /// Schedule a garbage collection job for a physical table revision. + Gc { + /// The location ID of the physical table revision to garbage collect. + location_id: i64, + }, +} + +#[derive(Debug, serde::Deserialize)] +struct CreateJobResponse { + job_id: JobId, +} + +/// Errors from [`JobsClient::create`]. +#[derive(Debug, thiserror::Error)] +pub enum CreateError { + #[error("network error contacting {url}")] + Network { + url: String, + #[source] + source: reqwest::Error, + }, + + #[error("failed to parse response from {url}")] + ResponseParse { + url: String, + #[source] + source: reqwest::Error, + }, + + #[error("invalid request")] + InvalidRequest(#[source] ApiError), + + #[error("no workers available")] + NoWorkersAvailable(#[source] ApiError), + + #[error("active job already exists")] + ActiveJobConflict(#[source] ApiError), + + #[error("API error")] + Api(#[source] ApiError), + + #[error("unexpected response: status={status}, body={body}")] + UnexpectedResponse { status: u16, body: String }, +} diff --git a/crates/services/admin-api/src/handlers/jobs.rs b/crates/services/admin-api/src/handlers/jobs.rs index 2a1389b2b..b2c78aa3f 100644 --- a/crates/services/admin-api/src/handlers/jobs.rs +++ b/crates/services/admin-api/src/handlers/jobs.rs @@ -1,5 +1,6 @@ //! Jobs HTTP handlers +pub mod create; pub mod delete; pub mod delete_by_id; pub mod event_by_id; diff --git a/crates/services/admin-api/src/handlers/jobs/create.rs b/crates/services/admin-api/src/handlers/jobs/create.rs new file mode 100644 index 000000000..2b44171ce --- /dev/null +++ b/crates/services/admin-api/src/handlers/jobs/create.rs @@ -0,0 +1,135 @@ +//! Job creation handler + +use amp_worker_core::jobs::job_id::JobId; +use axum::{Json, extract::State, http::StatusCode}; +use metadata_db::physical_table_revision::LocationId; +use monitoring::logging; + +use crate::{ + ctx::Ctx, + handlers::error::{ErrorResponse, IntoErrorResponse}, + scheduler, +}; + +/// Handler for the `POST /jobs` endpoint +/// +/// Schedules a new job for execution by an available worker. +/// Currently supports GC jobs only. +/// +/// ## Request Body +/// - `kind`: The job type (currently only `"gc"`) +/// - Additional fields depend on the job kind +/// +/// ### GC job +/// ```json +/// { "kind": "gc", "location_id": 42 } +/// ``` +/// +/// ## Response +/// - **202 Accepted**: Job scheduled successfully +/// - **400 Bad Request**: Invalid request body or unsupported job kind +/// - **409 Conflict**: An active job with the same idempotency key already exists +/// - **500 Internal Server Error**: Scheduler error +#[tracing::instrument(skip_all, err)] +pub async fn handler( + State(ctx): State, + json: Result, axum::extract::rejection::JsonRejection>, +) -> Result<(StatusCode, Json), ErrorResponse> { + let Json(req) = json.map_err(|err| { + tracing::debug!(error = %err, "invalid request body"); + Error::InvalidBody(err) + })?; + + let (idempotency_key, descriptor) = match req { + CreateJobRequest::Gc { location_id } => { + let location_id = LocationId::try_from(location_id).map_err(|_| { + tracing::debug!(location_id, "invalid location ID"); + Error::InvalidLocationId(location_id) + })?; + + let key = amp_worker_gc::job_key::idempotency_key(location_id); + let desc = + scheduler::JobDescriptor::from(amp_worker_gc::job_descriptor::JobDescriptor { + location_id, + }); + (key, desc) + } + }; + + let job_id = ctx + .scheduler + .schedule_job(idempotency_key.into(), descriptor, None) + .await + .map_err(|err| { + tracing::error!( + error = %err, + error_source = logging::error_source(&err), + "failed to schedule job" + ); + Error::Scheduler(err) + })?; + + tracing::info!(%job_id, "job scheduled"); + Ok((StatusCode::ACCEPTED, Json(CreateJobResponse { job_id }))) +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Invalid request body + #[error("invalid request body: {0}")] + InvalidBody(#[source] axum::extract::rejection::JsonRejection), + + /// Invalid location ID + #[error("invalid location ID: {0}")] + InvalidLocationId(i64), + + /// Scheduler error + #[error("failed to schedule job")] + Scheduler(#[source] scheduler::ScheduleJobError), +} + +impl IntoErrorResponse for Error { + fn error_code(&self) -> &'static str { + match self { + Error::InvalidBody(_) => "INVALID_BODY", + Error::InvalidLocationId(_) => "INVALID_LOCATION_ID", + Error::Scheduler(err) => match err { + scheduler::ScheduleJobError::NoWorkersAvailable => "NO_WORKERS_AVAILABLE", + scheduler::ScheduleJobError::ActiveJobConflict { .. } => "ACTIVE_JOB_CONFLICT", + _ => "SCHEDULER_ERROR", + }, + } + } + + fn status_code(&self) -> StatusCode { + match self { + Error::InvalidBody(_) => StatusCode::BAD_REQUEST, + Error::InvalidLocationId(_) => StatusCode::BAD_REQUEST, + Error::Scheduler(err) => match err { + scheduler::ScheduleJobError::NoWorkersAvailable => StatusCode::BAD_REQUEST, + scheduler::ScheduleJobError::ActiveJobConflict { .. } => StatusCode::CONFLICT, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }, + } + } +} + +/// Request body for creating a job. +/// +/// Dispatches on the `kind` field to determine the job type. +#[derive(Debug, serde::Deserialize)] +#[serde(tag = "kind", rename_all = "kebab-case")] +pub enum CreateJobRequest { + /// Schedule a garbage collection job for a physical table revision. + Gc { + /// The location ID of the physical table revision to garbage collect. + location_id: i64, + }, +} + +/// Response body for a created job. +#[derive(Debug, serde::Serialize)] +pub struct CreateJobResponse { + /// The ID of the scheduled job. + pub job_id: JobId, +} diff --git a/crates/services/admin-api/src/lib.rs b/crates/services/admin-api/src/lib.rs index f5323fab9..957134219 100644 --- a/crates/services/admin-api/src/lib.rs +++ b/crates/services/admin-api/src/lib.rs @@ -79,7 +79,9 @@ pub fn router(ctx: Ctx) -> Router<()> { .route("/files/{file_id}", get(files::get_by_id::handler)) .route( "/jobs", - get(jobs::get_all::handler).delete(jobs::delete::handler), + get(jobs::get_all::handler) + .post(jobs::create::handler) + .delete(jobs::delete::handler), ) .route( "/jobs/{id}",