Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
* We now ensure that after a successful modifying client's operation (submit, cancel, open/close job, queues
modification),
the operation is immediately a part of the written journal.
* `hq worker info` can now be used with a selector, to display information about multiple workers at once.
* Note that this is a breaking change for the JSON format, as it now outputs the worker infos as an array of objects. Before it was a single object.

### Changes

Expand Down
31 changes: 22 additions & 9 deletions crates/hyperqueue/src/bin/hq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,21 @@ async fn command_worker_info(
opts: WorkerInfoOpts,
) -> anyhow::Result<()> {
let mut session = get_client_session(gsettings.server_directory()).await?;
let response = get_worker_info(&mut session, opts.worker_id, true).await?;
let mut responses = get_worker_info(&mut session, opts.selector, true).await?;
responses.sort_unstable_by_key(|(id, _)| *id);

let workers: Vec<_> = responses
.into_iter()
.filter_map(|(id, worker)| match worker {
Some(w) => Some(w),
None => {
log::error!("Worker {id} not found");
None
}
})
.collect();

if let Some(worker) = response {
gsettings.printer().print_worker_info(worker);
} else {
log::error!("Worker {} not found", opts.worker_id);
}
gsettings.printer().print_worker_info(workers);
Ok(())
}

Expand Down Expand Up @@ -312,11 +320,16 @@ async fn command_worker_address(
gsettings: &GlobalSettings,
opts: WorkerAddressOpts,
) -> anyhow::Result<()> {
use hyperqueue::common::arraydef::IntArray;
use hyperqueue::transfer::messages::IdSelector;

let mut session = get_client_session(gsettings.server_directory()).await?;
let response = get_worker_info(&mut session, opts.worker_id, false).await?;
let selector = IdSelector::Specific(IntArray::from_id(opts.worker_id.as_num()));
let response = get_worker_info(&mut session, selector, false).await?;

match response {
Some(info) => println!("{}", info.configuration.hostname),
match response.into_iter().next() {
Some((_, Some(info))) => println!("{}", info.configuration.hostname),
Some((id, None)) => anyhow::bail!("Worker {} not found", id),
None => anyhow::bail!("Worker {} not found", opts.worker_id),
}

Expand Down
10 changes: 5 additions & 5 deletions crates/hyperqueue/src/client/commands/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,14 +310,14 @@ pub async fn start_hq_worker(

let worker = initialize_worker(gsettings.server_directory(), configuration).await?;

gsettings.printer().print_worker_info(WorkerInfo {
gsettings.printer().print_worker_info(vec![WorkerInfo {
id: worker.id,
configuration: worker.configuration.clone(),
started: Utc::now(),
ended: None,
runtime_info: None,
last_task_started: None,
});
}]);
worker.run().await?;
log::info!("Worker stopping");
Ok(())
Expand Down Expand Up @@ -527,13 +527,13 @@ pub async fn get_worker_list(

pub async fn get_worker_info(
session: &mut ClientSession,
worker_id: WorkerId,
selector: IdSelector,
runtime_info: bool,
) -> crate::Result<Option<WorkerInfo>> {
) -> crate::Result<Vec<(WorkerId, Option<WorkerInfo>)>> {
let msg = rpc_call!(
session.connection(),
FromClientMessage::WorkerInfo(WorkerInfoRequest {
worker_id,
selector,
runtime_info,
}),
ToClientMessage::WorkerInfoResponse(r) => r
Expand Down
259 changes: 132 additions & 127 deletions crates/hyperqueue/src/client/output/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,137 +267,142 @@ impl Output for CliOutput {
self.print_horizontal_table(rows, header);
}

fn print_worker_info(&self, worker_info: WorkerInfo) {
let state = worker_status(&worker_info);
let WorkerInfo {
id,
configuration,
started,
ended: _ended,
runtime_info,
last_task_started,
} = worker_info;

let manager_info = configuration.get_manager_info();
let mut rows = vec![
vec!["Worker".cell().bold(true), id.cell()],
vec!["State".cell().bold(true), state],
vec!["Hostname".cell().bold(true), configuration.hostname.cell()],
vec!["Started".cell().bold(true), format_datetime(started).cell()],
vec![
"Data provider".cell().bold(true),
configuration.listen_address.cell(),
],
vec![
"Working directory".cell().bold(true),
configuration.work_dir.display().cell(),
],
vec![
"Heartbeat".cell().bold(true),
format_duration(configuration.heartbeat_interval)
.to_string()
.cell(),
],
vec![
"Idle timeout".cell().bold(true),
configuration
.idle_timeout
.map(|x| format_duration(x).to_string())
.unwrap_or_else(|| "None".to_string())
.cell(),
],
vec![
"Overview interval".cell().bold(true),
configuration
.overview_configuration
.send_interval
.map(|x| format_duration(x).to_string())
.unwrap_or_else(|| "None".to_string())
.cell(),
],
vec![
"Resources".cell().bold(true),
resources_summary(&configuration.resources, true).cell(),
],
vec![
"Time Limit".cell().bold(true),
configuration
.time_limit
.map(|x| format_duration(x).to_string())
.unwrap_or_else(|| "None".to_string())
.cell(),
],
vec![
"Process pid".cell().bold(true),
configuration
.extra
.get(WORKER_EXTRA_PROCESS_PID)
.cloned()
.unwrap_or_else(|| "N/A".to_string())
.cell(),
],
vec!["Group".cell().bold(true), configuration.group.cell()],
vec![
"Downloads".cell().bold(true),
format!(
"{} parallel; max {} fails + {} delay",
configuration.max_parallel_downloads,
configuration.max_download_tries,
format_duration(configuration.wait_between_download_tries)
)
.cell(),
],
vec![
"Manager".cell().bold(true),
manager_info
.as_ref()
.map(|info| info.manager.to_string())
.unwrap_or_else(|| "None".to_string())
.cell(),
],
vec![
"Manager Job ID".cell().bold(true),
manager_info
.as_ref()
.map(|info| info.allocation_id.as_str())
.unwrap_or("N/A")
fn print_worker_info(&self, workers: Vec<WorkerInfo>) {
for (i, worker_info) in workers.into_iter().enumerate() {
if i > 0 {
println!();
}
let state = worker_status(&worker_info);
let WorkerInfo {
id,
configuration,
started,
ended: _ended,
runtime_info,
last_task_started,
} = worker_info;

let manager_info = configuration.get_manager_info();
let mut rows = vec![
vec!["Worker".cell().bold(true), id.cell()],
vec!["State".cell().bold(true), state],
vec!["Hostname".cell().bold(true), configuration.hostname.cell()],
vec!["Started".cell().bold(true), format_datetime(started).cell()],
vec![
"Data provider".cell().bold(true),
configuration.listen_address.cell(),
],
vec![
"Working directory".cell().bold(true),
configuration.work_dir.display().cell(),
],
vec![
"Heartbeat".cell().bold(true),
format_duration(configuration.heartbeat_interval)
.to_string()
.cell(),
],
vec![
"Idle timeout".cell().bold(true),
configuration
.idle_timeout
.map(|x| format_duration(x).to_string())
.unwrap_or_else(|| "None".to_string())
.cell(),
],
vec![
"Overview interval".cell().bold(true),
configuration
.overview_configuration
.send_interval
.map(|x| format_duration(x).to_string())
.unwrap_or_else(|| "None".to_string())
.cell(),
],
vec![
"Resources".cell().bold(true),
resources_summary(&configuration.resources, true).cell(),
],
vec![
"Time Limit".cell().bold(true),
configuration
.time_limit
.map(|x| format_duration(x).to_string())
.unwrap_or_else(|| "None".to_string())
.cell(),
],
vec![
"Process pid".cell().bold(true),
configuration
.extra
.get(WORKER_EXTRA_PROCESS_PID)
.cloned()
.unwrap_or_else(|| "N/A".to_string())
.cell(),
],
vec!["Group".cell().bold(true), configuration.group.cell()],
vec![
"Downloads".cell().bold(true),
format!(
"{} parallel; max {} fails + {} delay",
configuration.max_parallel_downloads,
configuration.max_download_tries,
format_duration(configuration.wait_between_download_tries)
)
.cell(),
],
vec![
"Last task started".cell().bold(true),
last_task_started
.map(|t| format!("{}; Time: {}", t.task_id, format_datetime(t.time)).cell())
.unwrap_or_else(|| "".cell()),
],
];
if let Some(runtime_info) = runtime_info {
let mut s = String::with_capacity(60);
match runtime_info {
WorkerRuntimeInfo::SingleNodeTasks {
running_tasks,
assigned_tasks,
is_reserved,
} => {
write!(s, "assigned tasks: {assigned_tasks}").unwrap();
if running_tasks > 0 {
write!(s, "; running tasks: {running_tasks}").unwrap();
}
if is_reserved {
write!(s, "; reserved for a multi-node task").unwrap();
],
vec![
"Manager".cell().bold(true),
manager_info
.as_ref()
.map(|info| info.manager.to_string())
.unwrap_or_else(|| "None".to_string())
.cell(),
],
vec![
"Manager Job ID".cell().bold(true),
manager_info
.as_ref()
.map(|info| info.allocation_id.as_str())
.unwrap_or("N/A")
.cell(),
],
vec![
"Last task started".cell().bold(true),
last_task_started
.map(|t| format!("{}; Time: {}", t.task_id, format_datetime(t.time)).cell())
.unwrap_or_else(|| "".cell()),
],
];
if let Some(runtime_info) = runtime_info {
let mut s = String::with_capacity(60);
match runtime_info {
WorkerRuntimeInfo::SingleNodeTasks {
running_tasks,
assigned_tasks,
is_reserved,
} => {
write!(s, "assigned tasks: {assigned_tasks}").unwrap();
if running_tasks > 0 {
write!(s, "; running tasks: {running_tasks}").unwrap();
}
if is_reserved {
write!(s, "; reserved for a multi-node task").unwrap();
}
}
}
WorkerRuntimeInfo::MultiNodeTask { main_node } => {
write!(s, "running multinode task; ").unwrap();
if main_node {
write!(s, "main node").unwrap();
} else {
write!(s, "secondary node").unwrap();
WorkerRuntimeInfo::MultiNodeTask { main_node } => {
write!(s, "running multinode task; ").unwrap();
if main_node {
write!(s, "main node").unwrap();
} else {
write!(s, "secondary node").unwrap();
}
}
}
};
rows.push(vec!["Runtime Info".cell().bold(true), s.cell()]);
};
rows.push(vec!["Runtime Info".cell().bold(true), s.cell()]);
}
self.print_vertical_table(rows);
}
self.print_vertical_table(rows);
}

fn print_server_info(&self, server_dir: Option<&Path>, info: &ServerInfo) {
Expand Down
4 changes: 2 additions & 2 deletions crates/hyperqueue/src/client/output/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ impl Output for JsonOutput {
fn print_worker_list(&self, workers: Vec<WorkerInfo>) {
self.print(workers.into_iter().map(format_worker_info).collect());
}
fn print_worker_info(&self, worker_info: WorkerInfo) {
self.print(format_worker_info(worker_info));
fn print_worker_info(&self, workers: Vec<WorkerInfo>) {
self.print(workers.into_iter().map(format_worker_info).collect());
}

fn print_server_info(&self, server_dir: Option<&Path>, info: &ServerInfo) {
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/output/outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub enum OutputStream {
pub trait Output {
// Workers
fn print_worker_list(&self, workers: Vec<WorkerInfo>);
fn print_worker_info(&self, worker_info: WorkerInfo);
fn print_worker_info(&self, workers: Vec<WorkerInfo>);

// Server
fn print_server_info(&self, server_dir: Option<&Path>, record: &ServerInfo);
Expand Down
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/output/quiet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Output for Quiet {
println!("{} {}", worker.id, worker_status)
}
}
fn print_worker_info(&self, _worker_info: WorkerInfo) {}
fn print_worker_info(&self, _workers: Vec<WorkerInfo>) {}

// Server
fn print_server_info(&self, server_dir: Option<&Path>, _record: &ServerInfo) {
Expand Down
Loading
Loading