Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/commands/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ pub async fn output_job_detail(
.collect();
gsettings
.printer()
.print_job_detail(jobs, worker_map, &response.server_uid);
.print_job_detail(jobs, &worker_map, &response.server_uid);
Ok(())
}

Expand Down
9 changes: 6 additions & 3 deletions crates/hyperqueue/src/client/commands/journal/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,23 +192,26 @@ impl SubmitDescFormatter<'_> {
ids,
entries: _,
task_desc,
resource_rq,
} => {
let TaskDescription {
kind: _,
resources,
time_limit,
priority,
crash_limit,
} = task_desc;
json!({
"ids": ids,
"resources": resources,
"resources": resource_rq,
"time_limit": time_limit,
"priority": priority,
"crash_limit": crash_limit
})
}
JobTaskDescription::Graph { tasks } => {
JobTaskDescription::Graph {
resource_rqs: _,
tasks,
} => {
json!({
"n_tasks": tasks.len()
})
Expand Down
36 changes: 26 additions & 10 deletions crates/hyperqueue/src/client/commands/journal/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::common::utils::time::parse_hms_or_human_time;
use crate::server::autoalloc::AllocationId;
use crate::server::event::journal::JournalReader;
use crate::server::event::payload::EventPayload;
use crate::transfer::messages::{JobTaskDescription, SubmitRequest};
use crate::transfer::messages::{JobTaskDescription, LocalResourceRqId, SubmitRequest};
use anyhow::anyhow;
use chrono::{DateTime, Duration, TimeDelta, Utc};
use clap::{Parser, ValueHint};
Expand All @@ -15,7 +15,7 @@ use std::path::PathBuf;
use tako::gateway::{ResourceRequest, ResourceRequestVariants};
use tako::resources::ResourceAmount;
use tako::worker::WorkerConfiguration;
use tako::{JobId, JobTaskId, ResourceVariantId, TaskId, WorkerId};
use tako::{JobId, JobTaskId, Map, ResourceVariantId, TaskId, WorkerId};

#[derive(Parser)]
pub(crate) struct JournalReportOpts {
Expand Down Expand Up @@ -113,7 +113,10 @@ impl ResCount {

enum JobResourceRq {
Array(ResourceRequestVariants),
TaskGraph(HashMap<JobTaskId, ResourceRequestVariants>),
TaskGraph {
resource_rqs: Vec<ResourceRequestVariants>,
task_rqs: Map<JobTaskId, LocalResourceRqId>,
},
}

struct TaskDuration {
Expand Down Expand Up @@ -335,7 +338,12 @@ impl JournalStats {
let jrq = self.job_requests.get(&task_id.job_id()).unwrap();
let rq = match jrq {
JobResourceRq::Array(rq) => rq,
JobResourceRq::TaskGraph(map) => map.get(&task_id.job_task_id()).unwrap(),
JobResourceRq::TaskGraph {
resource_rqs,
task_rqs,
} => resource_rqs
.get(task_rqs.get(&task_id.job_task_id()).unwrap().as_usize())
.unwrap(),
};
let rq = &rq.variants[rv_id.as_usize()];
if rq.n_nodes > 0 {
Expand Down Expand Up @@ -390,15 +398,23 @@ impl JournalStats {

fn new_submit(&mut self, job_id: JobId, submit: SubmitRequest) {
let rq = match submit.submit_desc.task_desc {
JobTaskDescription::Array { task_desc, .. } => {
JobResourceRq::Array(task_desc.resources)
}
JobTaskDescription::Graph { tasks } => {
JobTaskDescription::Array {
task_desc: _,
resource_rq,
..
} => JobResourceRq::Array(resource_rq),
JobTaskDescription::Graph {
tasks,
resource_rqs,
} => {
let map = tasks
.into_iter()
.map(|t| (t.id, t.task_desc.resources))
.map(|t| (t.id, t.resource_rq_id))
.collect();
JobResourceRq::TaskGraph(map)
JobResourceRq::TaskGraph {
task_rqs: map,
resource_rqs: resource_rqs.clone(),
}
}
};
self.job_requests.insert(job_id, rq);
Expand Down
14 changes: 6 additions & 8 deletions crates/hyperqueue/src/client/commands/submit/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,10 @@ pub async fn submit_computation(
.unwrap_or_else(|| "job".to_string())
};

// Force task_dir for multi node tasks (for a place where to create node file)
let task_dir = task_dir | (resources.n_nodes > 0);
let resources = ResourceRequestVariants::new(smallvec![resources]);

let args: Vec<BString> = commands.into_iter().map(|arg| arg.into()).collect();

let stdout = create_stdio(stdout, &stream, DEFAULT_STDOUT_PATH);
Expand All @@ -715,21 +719,14 @@ pub async fn submit_computation(
stdin: stdin.unwrap_or_default(),
};

// Force task_dir for multi node tasks (for a place where to create node file)
let task_dir = if resources.n_nodes > 0 {
true
} else {
task_dir
};

let task_kind = TaskKind::ExternalProgram(TaskKindProgram {
program: program_def,
pin_mode: pin.map(|arg| arg.into()).unwrap_or(PinMode::None),
task_dir,
});

let task_desc = TaskDescription {
kind: task_kind,
resources: ResourceRequestVariants::new(smallvec![resources]),
priority,
time_limit,
crash_limit,
Expand All @@ -739,6 +736,7 @@ pub async fn submit_computation(
ids,
entries,
task_desc,
resource_rq: resources,
};

let request = SubmitRequest {
Expand Down
62 changes: 49 additions & 13 deletions crates/hyperqueue/src/client/commands/submit/jobfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use crate::common::arraydef::IntArray;
use crate::common::utils::fs::get_current_dir;
use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{
JobDescription, JobSubmitDescription, JobTaskDescription, PinMode, SubmitRequest,
TaskDescription, TaskKind, TaskKindProgram, TaskWithDependencies,
JobDescription, JobSubmitDescription, JobTaskDescription, LocalResourceRqId, PinMode,
SubmitRequest, TaskDescription, TaskKind, TaskKindProgram, TaskWithDependencies,
};
use clap::Parser;
use smallvec::smallvec;
Expand Down Expand Up @@ -54,6 +54,19 @@ fn create_stdio(def: Option<StdioDefInput>, default: &str, has_streaming: bool)
}
}

fn build_resource_request(cfg: &mut TaskConfigDef) -> ResourceRequestVariants {
ResourceRequestVariants {
variants: if cfg.request.is_empty() {
smallvec![ResourceRequest::default()]
} else {
std::mem::take(&mut cfg.request)
.into_iter()
.map(|r| r.into_request())
.collect()
},
}
}

fn build_task_description(cfg: TaskConfigDef, has_streaming: bool) -> TaskDescription {
TaskDescription {
kind: TaskKind::ExternalProgram(TaskKindProgram {
Expand All @@ -72,39 +85,40 @@ fn build_task_description(cfg: TaskConfigDef, has_streaming: bool) -> TaskDescri
},
task_dir: cfg.task_dir,
}),
resources: ResourceRequestVariants {
variants: if cfg.request.is_empty() {
smallvec![ResourceRequest::default()]
} else {
cfg.request.into_iter().map(|r| r.into_request()).collect()
},
},
time_limit: cfg.time_limit,
priority: cfg.priority,
crash_limit: cfg.crash_limit,
}
}

fn build_task(
tdef: TaskDef,
mut tdef: TaskDef,
max_id: &mut JobTaskId,
resource_map: &mut Map<ResourceRequestVariants, LocalResourceRqId>,
data_flags: TaskDataFlags,
has_streaming: bool,
) -> TaskWithDependencies {
let id = tdef.id.unwrap_or_else(|| {
*max_id = JobTaskId::new(max_id.as_num() + 1);
*max_id
});
let resource = build_resource_request(&mut tdef.config);
let resource_rq_id = resource_map.get(&resource).copied().unwrap_or_else(|| {
let new_id = LocalResourceRqId::new(resource_map.len() as u32);
resource_map.insert(resource, new_id);
new_id
});
TaskWithDependencies {
id,
data_flags,
task_desc: build_task_description(tdef.config, has_streaming),
resource_rq_id,
task_deps: tdef.deps,
data_deps: tdef.data_deps,
}
}

fn build_job_desc_array(array: ArrayDef, has_streaming: bool) -> JobTaskDescription {
fn build_job_desc_array(mut array: ArrayDef, has_streaming: bool) -> JobTaskDescription {
let ids = array
.ids
.unwrap_or_else(|| IntArray::from_range(0, array.entries.len() as JobTaskCount));
Expand All @@ -119,9 +133,11 @@ fn build_job_desc_array(array: ArrayDef, has_streaming: bool) -> JobTaskDescript
.collect(),
)
};
let resources = build_resource_request(&mut array.config);
JobTaskDescription::Array {
ids,
entries,
resource_rq: resources,
task_desc: build_task_description(array.config, has_streaming),
}
}
Expand All @@ -144,8 +160,15 @@ fn build_job_desc_individual_tasks(
let mut unprocessed_tasks = Map::new();
let mut in_degrees = Map::new();
let mut consumers: Map<JobTaskId, Vec<_>> = Map::new();
let mut resource_map: Map<ResourceRequestVariants, LocalResourceRqId> = Map::new();
for task in tasks {
let t = build_task(task, &mut max_id, data_flags, has_streaming);
let t = build_task(
task,
&mut max_id,
&mut resource_map,
data_flags,
has_streaming,
);
if in_degrees.insert(t.id, t.task_deps.len()).is_some() {
return Err(crate::Error::GenericError(format!(
"Task {} is defined multiple times",
Expand Down Expand Up @@ -187,7 +210,10 @@ fn build_job_desc_individual_tasks(
)));
}

Ok(JobTaskDescription::Graph { tasks: new_tasks })
Ok(JobTaskDescription::Graph {
tasks: new_tasks,
resource_rqs: resource_rq_map_to_vec(resource_map),
})
}

fn build_job_submit(jdef: JobDef, job_id: Option<JobId>) -> crate::Result<SubmitRequest> {
Expand Down Expand Up @@ -228,3 +254,13 @@ pub async fn submit_computation_from_job_file(
let request = build_job_submit(jdef, opts.job)?;
send_submit_request(gsettings, session, request, false, false, None).await
}

pub fn resource_rq_map_to_vec(
map: Map<ResourceRequestVariants, LocalResourceRqId>,
) -> Vec<ResourceRequestVariants> {
let mut result = vec![None; map.len()];
for (rq, id) in map.into_iter() {
result[id.as_num() as usize] = Some(rq);
}
result.into_iter().map(|x| x.unwrap()).collect()
}
2 changes: 1 addition & 1 deletion crates/hyperqueue/src/client/commands/submit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ mod jobfile;
pub use command::SubmitJobTaskConfOpts;
pub use command::{JobSubmitOpts, submit_computation};

pub use jobfile::{JobSubmitFileOpts, submit_computation_from_job_file};
pub use jobfile::{JobSubmitFileOpts, resource_rq_map_to_vec, submit_computation_from_job_file};
8 changes: 4 additions & 4 deletions crates/hyperqueue/src/client/commands/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,8 @@ pub async fn get_worker_list(
) -> crate::Result<Vec<WorkerInfo>> {
let msg = rpc_call!(
session.connection(),
FromClientMessage::WorkerList,
ToClientMessage::WorkerListResponse(r) => r
FromClientMessage::GetList { workers: true },
ToClientMessage::GetListResponse(r) => r
)
.await?;

Expand Down Expand Up @@ -577,8 +577,8 @@ pub async fn wait_for_workers(
async fn get_workers_status(session: &mut ClientSession) -> anyhow::Result<(u32, u32)> {
let msg = rpc_call!(
session.connection(),
FromClientMessage::WorkerList,
ToClientMessage::WorkerListResponse(r) => r
FromClientMessage::GetList { workers: true },
ToClientMessage::GetListResponse(r) => r
)
.await?;

Expand Down
22 changes: 14 additions & 8 deletions crates/hyperqueue/src/client/job.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
use crate::rpc_call;
use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{FromClientMessage, ToClientMessage};
use crate::transfer::messages::{FromClientMessage, GetListResponse, ToClientMessage};
use tako::{Map, WorkerId};

/// Maps worker IDs to hostnames.
pub type WorkerMap = Map<WorkerId, String>;

pub async fn get_worker_map(session: &mut ClientSession) -> anyhow::Result<WorkerMap> {
let message = FromClientMessage::WorkerList;
pub async fn get_remote_lists(
session: &mut ClientSession,
workers: bool,
) -> anyhow::Result<GetListResponse> {
let message = FromClientMessage::GetList { workers };
let response =
rpc_call!(session.connection(), message, ToClientMessage::WorkerListResponse(r) => r)
.await?;
let map = response
rpc_call!(session.connection(), message, ToClientMessage::GetListResponse(r) => r).await?;
Ok(response)
}

pub async fn get_worker_map(session: &mut ClientSession) -> anyhow::Result<WorkerMap> {
let response = get_remote_lists(session, true).await?;
Ok(response
.workers
.into_iter()
.map(|w| (w.id, w.configuration.hostname))
.collect();
Ok(map)
.collect())
}
Loading
Loading