diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ace2d07d..82ceebb22 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ * We now ensure that after a successful modifying client's operation (submit, cancel, open/close job, queues modification), the operation is immediately a part of the written journal. +* `hq worker info` can now be used with a selector, to display information about multiple workers at once. + * Note that this is a breaking change for the JSON format, as it now outputs the worker infos as an array of objects. Before it was a single object. ### Changes diff --git a/crates/hyperqueue/src/bin/hq.rs b/crates/hyperqueue/src/bin/hq.rs index 96b0e98be..d7ae56d7f 100644 --- a/crates/hyperqueue/src/bin/hq.rs +++ b/crates/hyperqueue/src/bin/hq.rs @@ -263,13 +263,21 @@ async fn command_worker_info( opts: WorkerInfoOpts, ) -> anyhow::Result<()> { let mut session = get_client_session(gsettings.server_directory()).await?; - let response = get_worker_info(&mut session, opts.worker_id, true).await?; + let mut responses = get_worker_info(&mut session, opts.selector, true).await?; + responses.sort_unstable_by_key(|(id, _)| *id); + + let workers: Vec<_> = responses + .into_iter() + .filter_map(|(id, worker)| match worker { + Some(w) => Some(w), + None => { + log::error!("Worker {id} not found"); + None + } + }) + .collect(); - if let Some(worker) = response { - gsettings.printer().print_worker_info(worker); - } else { - log::error!("Worker {} not found", opts.worker_id); - } + gsettings.printer().print_worker_info(workers); Ok(()) } @@ -312,11 +320,16 @@ async fn command_worker_address( gsettings: &GlobalSettings, opts: WorkerAddressOpts, ) -> anyhow::Result<()> { + use hyperqueue::common::arraydef::IntArray; + use hyperqueue::transfer::messages::IdSelector; + let mut session = get_client_session(gsettings.server_directory()).await?; - let response = get_worker_info(&mut session, opts.worker_id, false).await?; + let selector = IdSelector::Specific(IntArray::from_id(opts.worker_id.as_num())); + let response = get_worker_info(&mut session, selector, false).await?; - match response { - Some(info) => println!("{}", info.configuration.hostname), + match response.into_iter().next() { + Some((_, Some(info))) => println!("{}", info.configuration.hostname), + Some((id, None)) => anyhow::bail!("Worker {} not found", id), None => anyhow::bail!("Worker {} not found", opts.worker_id), } diff --git a/crates/hyperqueue/src/client/commands/worker.rs b/crates/hyperqueue/src/client/commands/worker.rs index 9f95414a0..c1d890e85 100644 --- a/crates/hyperqueue/src/client/commands/worker.rs +++ b/crates/hyperqueue/src/client/commands/worker.rs @@ -310,14 +310,14 @@ pub async fn start_hq_worker( let worker = initialize_worker(gsettings.server_directory(), configuration).await?; - gsettings.printer().print_worker_info(WorkerInfo { + gsettings.printer().print_worker_info(vec![WorkerInfo { id: worker.id, configuration: worker.configuration.clone(), started: Utc::now(), ended: None, runtime_info: None, last_task_started: None, - }); + }]); worker.run().await?; log::info!("Worker stopping"); Ok(()) @@ -527,13 +527,13 @@ pub async fn get_worker_list( pub async fn get_worker_info( session: &mut ClientSession, - worker_id: WorkerId, + selector: IdSelector, runtime_info: bool, -) -> crate::Result> { +) -> crate::Result)>> { let msg = rpc_call!( session.connection(), FromClientMessage::WorkerInfo(WorkerInfoRequest { - worker_id, + selector, runtime_info, }), ToClientMessage::WorkerInfoResponse(r) => r diff --git a/crates/hyperqueue/src/client/output/cli.rs b/crates/hyperqueue/src/client/output/cli.rs index 9c623dab8..5dc0c92ba 100644 --- a/crates/hyperqueue/src/client/output/cli.rs +++ b/crates/hyperqueue/src/client/output/cli.rs @@ -267,137 +267,142 @@ impl Output for CliOutput { self.print_horizontal_table(rows, header); } - fn print_worker_info(&self, worker_info: WorkerInfo) { - let state = worker_status(&worker_info); - let WorkerInfo { - id, - configuration, - started, - ended: _ended, - runtime_info, - last_task_started, - } = worker_info; - - let manager_info = configuration.get_manager_info(); - let mut rows = vec![ - vec!["Worker".cell().bold(true), id.cell()], - vec!["State".cell().bold(true), state], - vec!["Hostname".cell().bold(true), configuration.hostname.cell()], - vec!["Started".cell().bold(true), format_datetime(started).cell()], - vec![ - "Data provider".cell().bold(true), - configuration.listen_address.cell(), - ], - vec![ - "Working directory".cell().bold(true), - configuration.work_dir.display().cell(), - ], - vec![ - "Heartbeat".cell().bold(true), - format_duration(configuration.heartbeat_interval) - .to_string() - .cell(), - ], - vec![ - "Idle timeout".cell().bold(true), - configuration - .idle_timeout - .map(|x| format_duration(x).to_string()) - .unwrap_or_else(|| "None".to_string()) - .cell(), - ], - vec![ - "Overview interval".cell().bold(true), - configuration - .overview_configuration - .send_interval - .map(|x| format_duration(x).to_string()) - .unwrap_or_else(|| "None".to_string()) - .cell(), - ], - vec![ - "Resources".cell().bold(true), - resources_summary(&configuration.resources, true).cell(), - ], - vec![ - "Time Limit".cell().bold(true), - configuration - .time_limit - .map(|x| format_duration(x).to_string()) - .unwrap_or_else(|| "None".to_string()) - .cell(), - ], - vec![ - "Process pid".cell().bold(true), - configuration - .extra - .get(WORKER_EXTRA_PROCESS_PID) - .cloned() - .unwrap_or_else(|| "N/A".to_string()) - .cell(), - ], - vec!["Group".cell().bold(true), configuration.group.cell()], - vec![ - "Downloads".cell().bold(true), - format!( - "{} parallel; max {} fails + {} delay", - configuration.max_parallel_downloads, - configuration.max_download_tries, - format_duration(configuration.wait_between_download_tries) - ) - .cell(), - ], - vec![ - "Manager".cell().bold(true), - manager_info - .as_ref() - .map(|info| info.manager.to_string()) - .unwrap_or_else(|| "None".to_string()) - .cell(), - ], - vec![ - "Manager Job ID".cell().bold(true), - manager_info - .as_ref() - .map(|info| info.allocation_id.as_str()) - .unwrap_or("N/A") + fn print_worker_info(&self, workers: Vec) { + for (i, worker_info) in workers.into_iter().enumerate() { + if i > 0 { + println!(); + } + let state = worker_status(&worker_info); + let WorkerInfo { + id, + configuration, + started, + ended: _ended, + runtime_info, + last_task_started, + } = worker_info; + + let manager_info = configuration.get_manager_info(); + let mut rows = vec![ + vec!["Worker".cell().bold(true), id.cell()], + vec!["State".cell().bold(true), state], + vec!["Hostname".cell().bold(true), configuration.hostname.cell()], + vec!["Started".cell().bold(true), format_datetime(started).cell()], + vec![ + "Data provider".cell().bold(true), + configuration.listen_address.cell(), + ], + vec![ + "Working directory".cell().bold(true), + configuration.work_dir.display().cell(), + ], + vec![ + "Heartbeat".cell().bold(true), + format_duration(configuration.heartbeat_interval) + .to_string() + .cell(), + ], + vec![ + "Idle timeout".cell().bold(true), + configuration + .idle_timeout + .map(|x| format_duration(x).to_string()) + .unwrap_or_else(|| "None".to_string()) + .cell(), + ], + vec![ + "Overview interval".cell().bold(true), + configuration + .overview_configuration + .send_interval + .map(|x| format_duration(x).to_string()) + .unwrap_or_else(|| "None".to_string()) + .cell(), + ], + vec![ + "Resources".cell().bold(true), + resources_summary(&configuration.resources, true).cell(), + ], + vec![ + "Time Limit".cell().bold(true), + configuration + .time_limit + .map(|x| format_duration(x).to_string()) + .unwrap_or_else(|| "None".to_string()) + .cell(), + ], + vec![ + "Process pid".cell().bold(true), + configuration + .extra + .get(WORKER_EXTRA_PROCESS_PID) + .cloned() + .unwrap_or_else(|| "N/A".to_string()) + .cell(), + ], + vec!["Group".cell().bold(true), configuration.group.cell()], + vec![ + "Downloads".cell().bold(true), + format!( + "{} parallel; max {} fails + {} delay", + configuration.max_parallel_downloads, + configuration.max_download_tries, + format_duration(configuration.wait_between_download_tries) + ) .cell(), - ], - vec![ - "Last task started".cell().bold(true), - last_task_started - .map(|t| format!("{}; Time: {}", t.task_id, format_datetime(t.time)).cell()) - .unwrap_or_else(|| "".cell()), - ], - ]; - if let Some(runtime_info) = runtime_info { - let mut s = String::with_capacity(60); - match runtime_info { - WorkerRuntimeInfo::SingleNodeTasks { - running_tasks, - assigned_tasks, - is_reserved, - } => { - write!(s, "assigned tasks: {assigned_tasks}").unwrap(); - if running_tasks > 0 { - write!(s, "; running tasks: {running_tasks}").unwrap(); - } - if is_reserved { - write!(s, "; reserved for a multi-node task").unwrap(); + ], + vec![ + "Manager".cell().bold(true), + manager_info + .as_ref() + .map(|info| info.manager.to_string()) + .unwrap_or_else(|| "None".to_string()) + .cell(), + ], + vec![ + "Manager Job ID".cell().bold(true), + manager_info + .as_ref() + .map(|info| info.allocation_id.as_str()) + .unwrap_or("N/A") + .cell(), + ], + vec![ + "Last task started".cell().bold(true), + last_task_started + .map(|t| format!("{}; Time: {}", t.task_id, format_datetime(t.time)).cell()) + .unwrap_or_else(|| "".cell()), + ], + ]; + if let Some(runtime_info) = runtime_info { + let mut s = String::with_capacity(60); + match runtime_info { + WorkerRuntimeInfo::SingleNodeTasks { + running_tasks, + assigned_tasks, + is_reserved, + } => { + write!(s, "assigned tasks: {assigned_tasks}").unwrap(); + if running_tasks > 0 { + write!(s, "; running tasks: {running_tasks}").unwrap(); + } + if is_reserved { + write!(s, "; reserved for a multi-node task").unwrap(); + } } - } - WorkerRuntimeInfo::MultiNodeTask { main_node } => { - write!(s, "running multinode task; ").unwrap(); - if main_node { - write!(s, "main node").unwrap(); - } else { - write!(s, "secondary node").unwrap(); + WorkerRuntimeInfo::MultiNodeTask { main_node } => { + write!(s, "running multinode task; ").unwrap(); + if main_node { + write!(s, "main node").unwrap(); + } else { + write!(s, "secondary node").unwrap(); + } } - } - }; - rows.push(vec!["Runtime Info".cell().bold(true), s.cell()]); + }; + rows.push(vec!["Runtime Info".cell().bold(true), s.cell()]); + } + self.print_vertical_table(rows); } - self.print_vertical_table(rows); } fn print_server_info(&self, server_dir: Option<&Path>, info: &ServerInfo) { diff --git a/crates/hyperqueue/src/client/output/json.rs b/crates/hyperqueue/src/client/output/json.rs index 816f6094e..49ef291c7 100644 --- a/crates/hyperqueue/src/client/output/json.rs +++ b/crates/hyperqueue/src/client/output/json.rs @@ -52,8 +52,8 @@ impl Output for JsonOutput { fn print_worker_list(&self, workers: Vec) { self.print(workers.into_iter().map(format_worker_info).collect()); } - fn print_worker_info(&self, worker_info: WorkerInfo) { - self.print(format_worker_info(worker_info)); + fn print_worker_info(&self, workers: Vec) { + self.print(workers.into_iter().map(format_worker_info).collect()); } fn print_server_info(&self, server_dir: Option<&Path>, info: &ServerInfo) { diff --git a/crates/hyperqueue/src/client/output/outputs.rs b/crates/hyperqueue/src/client/output/outputs.rs index 9debc813b..e071cdc88 100644 --- a/crates/hyperqueue/src/client/output/outputs.rs +++ b/crates/hyperqueue/src/client/output/outputs.rs @@ -36,7 +36,7 @@ pub enum OutputStream { pub trait Output { // Workers fn print_worker_list(&self, workers: Vec); - fn print_worker_info(&self, worker_info: WorkerInfo); + fn print_worker_info(&self, workers: Vec); // Server fn print_server_info(&self, server_dir: Option<&Path>, record: &ServerInfo); diff --git a/crates/hyperqueue/src/client/output/quiet.rs b/crates/hyperqueue/src/client/output/quiet.rs index 0317e5717..9bd8d81eb 100644 --- a/crates/hyperqueue/src/client/output/quiet.rs +++ b/crates/hyperqueue/src/client/output/quiet.rs @@ -57,7 +57,7 @@ impl Output for Quiet { println!("{} {}", worker.id, worker_status) } } - fn print_worker_info(&self, _worker_info: WorkerInfo) {} + fn print_worker_info(&self, _workers: Vec) {} // Server fn print_server_info(&self, server_dir: Option<&Path>, _record: &ServerInfo) { diff --git a/crates/hyperqueue/src/common/cli.rs b/crates/hyperqueue/src/common/cli.rs index e3bfa933c..fb5280d50 100644 --- a/crates/hyperqueue/src/common/cli.rs +++ b/crates/hyperqueue/src/common/cli.rs @@ -301,8 +301,9 @@ pub struct WorkerAddressOpts { #[derive(Parser)] pub struct WorkerInfoOpts { - /// Worker ID - pub worker_id: WorkerId, + /// Selects worker(s) to display + #[arg(value_parser = parse_last_all_range)] + pub selector: IdSelector, } #[derive(Parser)] diff --git a/crates/hyperqueue/src/server/client/mod.rs b/crates/hyperqueue/src/server/client/mod.rs index 521b0d7f9..35eb8422a 100644 --- a/crates/hyperqueue/src/server/client/mod.rs +++ b/crates/hyperqueue/src/server/client/mod.rs @@ -20,7 +20,7 @@ use crate::transfer::connection::accept_client; use crate::transfer::messages::{ CancelJobResponse, CloseJobResponse, FromClientMessage, IdSelector, JobDetail, JobDetailResponse, JobInfoResponse, JobSubmitDescription, StopWorkerResponse, StreamEvents, - SubmitRequest, SubmitResponse, TaskSelector, ToClientMessage, + SubmitRequest, SubmitResponse, TaskSelector, ToClientMessage, WorkerInfo, }; use crate::transfer::messages::{ForgetJobResponse, GetListResponse}; use tako::{JobId, JobTaskCount, WorkerId}; @@ -265,7 +265,7 @@ pub async fn client_rpc_loop< } FromClientMessage::GetList { workers } => handle_get_list(&state_ref, workers), FromClientMessage::WorkerInfo(msg) => { - handle_worker_info(&state_ref, senders, msg.worker_id, msg.runtime_info) + handle_worker_info(&state_ref, senders, msg.selector, msg.runtime_info) } FromClientMessage::StopWorker(msg) => { handle_worker_stop(&state_ref, senders, msg.selector) @@ -791,17 +791,34 @@ fn handle_get_list(state_ref: &StateRef, workers: bool) -> ToClientMessage { fn handle_worker_info( state_ref: &StateRef, senders: &Senders, - worker_id: WorkerId, + selector: IdSelector, runtime_info: bool, ) -> ToClientMessage { let state = state_ref.get(); - ToClientMessage::WorkerInfoResponse(state.get_worker(worker_id).map(|w| { - w.make_info(if runtime_info && w.is_running() { - senders.server_control.worker_info(worker_id) - } else { - None - }) - })) + + let worker_ids: Vec = match selector { + IdSelector::Specific(array) => array.iter().map(|id| id.into()).collect(), + IdSelector::All => state.get_workers().keys().copied().collect(), + IdSelector::LastN(n) => { + let mut ids: Vec<_> = state.get_workers().keys().copied().collect(); + ids.sort_by_key(|&k| std::cmp::Reverse(k)); + ids.truncate(n as usize); + ids + } + }; + + let mut responses: Vec<(WorkerId, Option)> = Vec::new(); + for worker_id in worker_ids { + let worker_info = state.get_worker(worker_id).map(|w| { + w.make_info(if runtime_info && w.is_running() { + senders.server_control.worker_info(worker_id) + } else { + None + }) + }); + responses.push((worker_id, worker_info)); + } + ToClientMessage::WorkerInfoResponse(responses) } pub(crate) fn handle_server_dump(senders: &Senders, path: &Path) -> ToClientMessage { diff --git a/crates/hyperqueue/src/transfer/messages.rs b/crates/hyperqueue/src/transfer/messages.rs index 0bc17b391..59ac32309 100644 --- a/crates/hyperqueue/src/transfer/messages.rs +++ b/crates/hyperqueue/src/transfer/messages.rs @@ -328,7 +328,7 @@ pub struct StopWorkerMessage { #[derive(Serialize, Deserialize, Debug)] pub struct WorkerInfoRequest { - pub worker_id: WorkerId, + pub selector: IdSelector, pub runtime_info: bool, } @@ -400,7 +400,7 @@ pub enum ToClientMessage { SubmitResponse(SubmitResponse), ResourceRqIdResponse(Vec), GetListResponse(GetListResponse), - WorkerInfoResponse(Option), + WorkerInfoResponse(Vec<(WorkerId, Option)>), StopWorkerResponse(Vec<(WorkerId, StopWorkerResponse)>), CancelJobResponse(Vec<(JobId, CancelJobResponse)>), ForgetJobResponse(ForgetJobResponse), diff --git a/docs/deployment/worker.md b/docs/deployment/worker.md index 4f23c1fa4..9b82cdb21 100644 --- a/docs/deployment/worker.md +++ b/docs/deployment/worker.md @@ -192,10 +192,10 @@ If you also want to include workers that are offline (i.e. that have crashed or ### Display worker information -You can display information about a specific worker using the [`hq worker info`](cli:hq.worker.info) command: +You can display information about a set of workers using the [`hq worker info`](cli:hq.worker.info) command: ```bash -$ hq worker info +$ hq worker info ``` ### Detect hardware resources diff --git a/tests/output/test_json.py b/tests/output/test_json.py index 68dd44747..25e2c1c27 100644 --- a/tests/output/test_json.py +++ b/tests/output/test_json.py @@ -39,39 +39,38 @@ def test_print_worker_info(hq_env: HqEnv): output = parse_json_output(hq_env, ["--output-mode=json", "worker", "info", "1"]) schema = Schema( - { - "configuration": { - "heartbeat_interval": 8.0, - "hostname": "worker1", - "idle_timeout": None, - "listen_address": str, - "resources": {"resources": RESOURCE_DESCRIPTOR_SCHEMA, "coupling": {"weights": list}}, - "time_limit": None, - "work_dir": str, - "group": str, - "on_server_lost": "stop", - "max_download_tries": int, - "max_parallel_downloads": int, - "wait_between_download_tries": float, - }, - "allocation": None, - "started": str, - "ended": None, - "runtime_info": {"SingleNodeTasks": {"assigned_tasks": 0, "is_reserved": False, "running_tasks": 0}}, - "last_task_started": None, - "id": 1, - } + [ + { + "configuration": { + "heartbeat_interval": 8.0, + "hostname": "worker1", + "idle_timeout": None, + "listen_address": str, + "resources": {"resources": RESOURCE_DESCRIPTOR_SCHEMA, "coupling": {"weights": list}}, + "time_limit": None, + "work_dir": str, + "group": str, + "on_server_lost": "stop", + "max_download_tries": int, + "max_parallel_downloads": int, + "wait_between_download_tries": float, + }, + "allocation": None, + "started": str, + "ended": None, + "runtime_info": {"SingleNodeTasks": {"assigned_tasks": 0, "is_reserved": False, "running_tasks": 0}}, + "last_task_started": None, + "id": 1, + } + ] ) - import json - - print(json.dumps(output, indent=2, sort_keys=True)) schema.validate(output) def test_print_worker_info_pbs_allocation(hq_env: HqEnv): hq_env.start_server() hq_env.start_worker(env={"PBS_ENVIRONMENT": "PBS_BATCH", "PBS_JOBID": "x1234"}) - output = parse_json_output(hq_env, ["--output-mode=json", "worker", "info", "1"]) + output = parse_json_output(hq_env, ["--output-mode=json", "worker", "info", "1"])[0] schema = Schema( { diff --git a/tests/test_coupling.py b/tests/test_coupling.py index 7429508f0..bfb3050c0 100644 --- a/tests/test_coupling.py +++ b/tests/test_coupling.py @@ -42,7 +42,7 @@ def test_coupling_output(hq_env: HqEnv): table = hq_env.command(["worker", "info", "1"], as_table=True) table.check_row_value("Resources", "cpus: 2x3 [coupled]\nddd: 123\nfoo: 2x3 [coupled]") - result = hq_env.command(["--output-mode=json", "worker", "info", "1"], as_json=True) + result = hq_env.command(["--output-mode=json", "worker", "info", "1"], as_json=True)[0] assert result["configuration"]["resources"]["coupling"] == { "weights": [ {"group1_idx": 0, "group2_idx": 0, "resource1_idx": 0, "resource2_idx": 2, "weight": 256}, diff --git a/tests/test_resources.py b/tests/test_resources.py index af0d370b3..d73bb97e5 100644 --- a/tests/test_resources.py +++ b/tests/test_resources.py @@ -379,7 +379,7 @@ def test_worker_no_detect_cpus_error(hq_env: HqEnv): def get_worker_resources(hq_env: HqEnv, id=1) -> Dict[str, Any]: - resources = hq_env.command(["worker", "info", str(id), "--output-mode", "json"], as_json=True) + resources = hq_env.command(["worker", "info", str(id), "--output-mode", "json"], as_json=True)[0] resources = resources["configuration"]["resources"]["resources"] return {r["name"]: r for r in resources} diff --git a/tests/test_worker.py b/tests/test_worker.py index 809852a99..c88dff52f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -218,6 +218,26 @@ def test_worker_info(hq_env: HqEnv): table.check_row_value("Group", "default") +def test_worker_info_selector_range(hq_env: HqEnv): + hq_env.start_server() + hq_env.start_workers(3) + wait_for_worker_state(hq_env, [1, 2, 3], "RUNNING") + + table = hq_env.command(["worker", "info", "1-3"], as_table=True) + table[0].check_row_value("Worker", "1") + table[1].check_row_value("Worker", "2") + table[2].check_row_value("Worker", "3") + + +def test_worker_info_selector_last(hq_env: HqEnv): + hq_env.start_server() + hq_env.start_workers(3) + wait_for_worker_state(hq_env, [1, 2, 3], "RUNNING") + + table = hq_env.command(["worker", "info", "last"], as_table=True) + table.check_row_value("Worker", "3") + + def test_worker_group(hq_env: HqEnv): hq_env.start_server() hq_env.start_worker(cpus="10", args=["--group", "test_1"])