Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions crates/bin/ampctl/src/cmd/job.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Job management commands

pub mod create;
pub mod events;
pub mod inspect;
pub mod list;
Expand All @@ -11,6 +12,9 @@ pub mod stop;
/// Job management subcommands.
#[derive(Debug, clap::Subcommand)]
pub enum Commands {
/// Create a new job
Create(create::Args),

/// Get lifecycle events for a job
#[command(after_help = include_str!("job/events__after_help.md"))]
Events(events::Args),
Expand Down Expand Up @@ -46,6 +50,7 @@ pub enum Commands {
/// Execute the job command with the given subcommand.
pub async fn run(command: Commands) -> anyhow::Result<()> {
match command {
Commands::Create(args) => create::run(args).await?,
Commands::Events(args) => events::run(args).await?,
Commands::List(args) => list::run(args).await?,
Commands::Inspect(args) => inspect::run(args).await?,
Expand Down
92 changes: 92 additions & 0 deletions crates/bin/ampctl/src/cmd/job/create.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
//! Job creation command.
//!
//! Creates a new job via the admin API's `POST /jobs` endpoint.

use amp_client_admin::jobs::{CreateError, CreateJobRequest};
use monitoring::logging;

use crate::args::GlobalArgs;

/// Create a new job via the admin API.
#[tracing::instrument(skip_all, fields(admin_url = %global.admin_url))]
pub async fn run(Args { global, kind }: Args) -> Result<(), Error> {
let client = global.build_client().map_err(Error::ClientBuildError)?;

let request = match kind {
JobKind::Gc(gc_args) => CreateJobRequest::Gc {
location_id: gc_args.location_id,
},
};

tracing::debug!("Creating job via admin API");

let job_id = client.jobs().create(&request).await.map_err(|err| {
tracing::error!(error = %err, error_source = logging::error_source(&err), "Failed to create job");
Error::CreateError(err)
})?;

let result = CreateResult { job_id, request };
global.print(&result).map_err(Error::JsonSerialization)?;

Ok(())
}

/// Errors for job creation operations.
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Failed to build client
#[error("failed to build admin API client")]
ClientBuildError(#[source] crate::args::BuildClientError),

/// Error creating job via admin API
#[error("failed to create job")]
CreateError(#[source] CreateError),

/// Failed to serialize result to JSON
#[error("failed to serialize result to JSON")]
JsonSerialization(#[source] serde_json::Error),
}

/// Command-line arguments for the `job create` command.
#[derive(Debug, clap::Args)]
pub struct Args {
#[command(flatten)]
pub global: GlobalArgs,

/// The job kind to create
#[command(subcommand)]
pub kind: JobKind,
}

/// Supported job kinds for creation.
#[derive(Debug, clap::Subcommand)]
pub enum JobKind {
/// Schedule a garbage collection job for a physical table revision
Gc(GcArgs),
}

/// Arguments for creating a GC job.
#[derive(Debug, clap::Args)]
pub struct GcArgs {
/// The location ID of the physical table revision to garbage collect
pub location_id: i64,
}

/// Result of a job creation operation.
#[derive(serde::Serialize)]
struct CreateResult {
job_id: amp_worker_core::jobs::job_id::JobId,
#[serde(flatten)]
request: CreateJobRequest,
}

impl std::fmt::Display for CreateResult {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
writeln!(
f,
"{} Job {} created",
console::style("✓").green().bold(),
self.job_id,
)
}
}
125 changes: 125 additions & 0 deletions crates/clients/admin/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,80 @@ impl<'a> JobsClient<'a> {
Self { client }
}

/// Create a new job.
///
/// POSTs to `/jobs` endpoint with a job descriptor.
///
/// # Errors
///
/// Returns [`CreateError`] for network errors, invalid requests,
/// active job conflicts, or unexpected responses.
#[tracing::instrument(skip(self))]
pub async fn create(&self, request: &CreateJobRequest) -> Result<JobId, CreateError> {
let url = self.client.base_url().join(jobs_list()).expect("valid URL");

tracing::debug!("Sending POST request to create job");

let response = self
.client
.http()
.post(url.as_str())
.json(request)
.send()
.await
.map_err(|err| CreateError::Network {
url: url.to_string(),
source: err,
})?;

let status = response.status();
tracing::debug!(status = %status, "received API response");

match status.as_u16() {
202 => {
let resp: CreateJobResponse =
response
.json()
.await
.map_err(|err| CreateError::ResponseParse {
url: url.to_string(),
source: err,
})?;
Ok(resp.job_id)
}
400 | 409 | 500 => {
let text = response
.text()
.await
.map_err(|err| CreateError::ResponseParse {
url: url.to_string(),
source: err,
})?;
let error_response: ErrorResponse =
serde_json::from_str(&text).map_err(|_| CreateError::UnexpectedResponse {
status: status.as_u16(),
body: text.clone(),
})?;
match error_response.error_code.as_str() {
"INVALID_BODY" | "INVALID_LOCATION_ID" => {
Err(CreateError::InvalidRequest(error_response.into()))
}
"NO_WORKERS_AVAILABLE" => {
Err(CreateError::NoWorkersAvailable(error_response.into()))
}
"ACTIVE_JOB_CONFLICT" => {
Err(CreateError::ActiveJobConflict(error_response.into()))
}
_ => Err(CreateError::Api(error_response.into())),
}
}
_ => Err(CreateError::UnexpectedResponse {
status: status.as_u16(),
body: response.text().await.unwrap_or_default(),
}),
}
}

/// Get a job by ID.
///
/// GETs from `/jobs/{id}` endpoint.
Expand Down Expand Up @@ -1276,3 +1350,54 @@ pub enum GetProgressError {
#[error("unexpected response (status {status}): {message}")]
UnexpectedResponse { status: u16, message: String },
}

/// Request body for creating a job via `POST /jobs`.
///
/// Dispatches on the `kind` field to determine the job type.
#[derive(Debug, serde::Serialize)]
#[serde(tag = "kind", rename_all = "kebab-case")]
pub enum CreateJobRequest {
/// Schedule a garbage collection job for a physical table revision.
Gc {
/// The location ID of the physical table revision to garbage collect.
location_id: i64,
},
}

#[derive(Debug, serde::Deserialize)]
struct CreateJobResponse {
job_id: JobId,
}

/// Errors from [`JobsClient::create`].
#[derive(Debug, thiserror::Error)]
pub enum CreateError {
#[error("network error contacting {url}")]
Network {
url: String,
#[source]
source: reqwest::Error,
},

#[error("failed to parse response from {url}")]
ResponseParse {
url: String,
#[source]
source: reqwest::Error,
},

#[error("invalid request")]
InvalidRequest(#[source] ApiError),

#[error("no workers available")]
NoWorkersAvailable(#[source] ApiError),

#[error("active job already exists")]
ActiveJobConflict(#[source] ApiError),

#[error("API error")]
Api(#[source] ApiError),

#[error("unexpected response: status={status}, body={body}")]
UnexpectedResponse { status: u16, body: String },
}
1 change: 1 addition & 0 deletions crates/services/admin-api/src/handlers/jobs.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Jobs HTTP handlers

pub mod create;
pub mod delete;
pub mod delete_by_id;
pub mod event_by_id;
Expand Down
135 changes: 135 additions & 0 deletions crates/services/admin-api/src/handlers/jobs/create.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
//! Job creation handler

use amp_worker_core::jobs::job_id::JobId;
use axum::{Json, extract::State, http::StatusCode};
use metadata_db::physical_table_revision::LocationId;
use monitoring::logging;

use crate::{
ctx::Ctx,
handlers::error::{ErrorResponse, IntoErrorResponse},
scheduler,
};

/// Handler for the `POST /jobs` endpoint
///
/// Schedules a new job for execution by an available worker.
/// Currently supports GC jobs only.
///
/// ## Request Body
/// - `kind`: The job type (currently only `"gc"`)
/// - Additional fields depend on the job kind
///
/// ### GC job
/// ```json
/// { "kind": "gc", "location_id": 42 }
/// ```
///
/// ## Response
/// - **202 Accepted**: Job scheduled successfully
/// - **400 Bad Request**: Invalid request body or unsupported job kind
/// - **409 Conflict**: An active job with the same idempotency key already exists
/// - **500 Internal Server Error**: Scheduler error
#[tracing::instrument(skip_all, err)]
pub async fn handler(
State(ctx): State<Ctx>,
json: Result<Json<CreateJobRequest>, axum::extract::rejection::JsonRejection>,
) -> Result<(StatusCode, Json<CreateJobResponse>), ErrorResponse> {
let Json(req) = json.map_err(|err| {
tracing::debug!(error = %err, "invalid request body");
Error::InvalidBody(err)
})?;

let (idempotency_key, descriptor) = match req {
CreateJobRequest::Gc { location_id } => {
let location_id = LocationId::try_from(location_id).map_err(|_| {
tracing::debug!(location_id, "invalid location ID");
Error::InvalidLocationId(location_id)
})?;

let key = amp_worker_gc::job_key::idempotency_key(location_id);
let desc =
scheduler::JobDescriptor::from(amp_worker_gc::job_descriptor::JobDescriptor {
location_id,
});
(key, desc)
}
};

let job_id = ctx
.scheduler
.schedule_job(idempotency_key.into(), descriptor, None)
.await
.map_err(|err| {
tracing::error!(
error = %err,
error_source = logging::error_source(&err),
"failed to schedule job"
);
Error::Scheduler(err)
})?;

tracing::info!(%job_id, "job scheduled");
Ok((StatusCode::ACCEPTED, Json(CreateJobResponse { job_id })))
}

#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Invalid request body
#[error("invalid request body: {0}")]
InvalidBody(#[source] axum::extract::rejection::JsonRejection),

/// Invalid location ID
#[error("invalid location ID: {0}")]
InvalidLocationId(i64),

/// Scheduler error
#[error("failed to schedule job")]
Scheduler(#[source] scheduler::ScheduleJobError),
}

impl IntoErrorResponse for Error {
fn error_code(&self) -> &'static str {
match self {
Error::InvalidBody(_) => "INVALID_BODY",
Error::InvalidLocationId(_) => "INVALID_LOCATION_ID",
Error::Scheduler(err) => match err {
scheduler::ScheduleJobError::NoWorkersAvailable => "NO_WORKERS_AVAILABLE",
scheduler::ScheduleJobError::ActiveJobConflict { .. } => "ACTIVE_JOB_CONFLICT",
_ => "SCHEDULER_ERROR",
},
}
}

fn status_code(&self) -> StatusCode {
match self {
Error::InvalidBody(_) => StatusCode::BAD_REQUEST,
Error::InvalidLocationId(_) => StatusCode::BAD_REQUEST,
Error::Scheduler(err) => match err {
scheduler::ScheduleJobError::NoWorkersAvailable => StatusCode::BAD_REQUEST,
scheduler::ScheduleJobError::ActiveJobConflict { .. } => StatusCode::CONFLICT,
_ => StatusCode::INTERNAL_SERVER_ERROR,
},
}
}
}

/// Request body for creating a job.
///
/// Dispatches on the `kind` field to determine the job type.
#[derive(Debug, serde::Deserialize)]
#[serde(tag = "kind", rename_all = "kebab-case")]
pub enum CreateJobRequest {
/// Schedule a garbage collection job for a physical table revision.
Gc {
/// The location ID of the physical table revision to garbage collect.
location_id: i64,
},
}

/// Response body for a created job.
#[derive(Debug, serde::Serialize)]
pub struct CreateJobResponse {
/// The ID of the scheduled job.
pub job_id: JobId,
}
Loading
Loading