Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions crates/hyperqueue/src/client/commands/autoalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::transfer::connection::ClientSession;
use crate::transfer::messages::{
AutoAllocRequest, AutoAllocResponse, FromClientMessage, QueueCreateResponse, ToClientMessage,
};
use clap::Parser;
use clap::{Parser, ValueEnum};
use humantime::format_duration;
use tako::resources::{CPU_RESOURCE_NAME, ResourceDescriptor, ResourceDescriptorItem};

Expand Down Expand Up @@ -346,6 +346,7 @@ wasted allocation duration."
no_hyper_threading,
idle_timeout,
overview_interval,
env_propagation_mode,
} = worker_args;

if cpus.is_none() && resource.is_empty() {
Expand All @@ -354,7 +355,14 @@ wasted allocation duration."
)
}

let mut worker_args = vec![];
let mut worker_args = vec![
"--env".to_string(),
env_propagation_mode
.to_possible_value()
.unwrap()
.get_name()
.to_owned(),
];
if let Some(cpus) = cpus {
worker_args.extend([
"--cpus".to_string(),
Expand Down
24 changes: 24 additions & 0 deletions crates/hyperqueue/src/client/commands/worker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::client::commands::duration_doc;
use crate::tako::internal::worker::configuration::EnvPropagationMode;
use anyhow::{Context, bail};
use chrono::Utc;
use clap::builder::{PossibleValue, TypedValueParser};
Expand Down Expand Up @@ -148,6 +149,27 @@ pub struct SharedWorkerStartOpts {
/// size.
#[arg(long, value_parser = passthrough_parser(parse_human_time))]
pub overview_interval: Option<PassThroughArgument<Duration>>,

/// How should environment variables be propagated from the worker to tasks.
#[arg(long("env"), default_value = "propagate")]
pub env_propagation_mode: EnvPropagationModeCli,
}

#[derive(Debug, Clone, clap::ValueEnum)]
pub enum EnvPropagationModeCli {
/// Propagate environment variables from the worker to tasks.
Propagate,
/// Do not propagate environment variables
Isolate,
}

impl From<EnvPropagationModeCli> for EnvPropagationMode {
fn from(value: EnvPropagationModeCli) -> Self {
match value {
EnvPropagationModeCli::Propagate => EnvPropagationMode::Propagate,
EnvPropagationModeCli::Isolate => EnvPropagationMode::Isolate,
}
}
}

/// Parses resource detection options (all, none or a comma-separated list of
Expand Down Expand Up @@ -337,6 +359,7 @@ fn gather_configuration(opts: WorkerStartOpts) -> anyhow::Result<WorkerConfigura
no_hyper_threading,
idle_timeout,
overview_interval,
env_propagation_mode,
},
heartbeat,
time_limit,
Expand Down Expand Up @@ -483,6 +506,7 @@ fn gather_configuration(opts: WorkerStartOpts) -> anyhow::Result<WorkerConfigura
max_parallel_downloads,
max_download_tries,
wait_between_download_tries,
env_propagation_mode: env_propagation_mode.into(),
})
}

Expand Down
6 changes: 6 additions & 0 deletions crates/hyperqueue/src/client/output/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::transfer::messages::{
AutoAllocListQueuesResponse, JobDetail, JobInfo, JobTaskDescription, PinMode, QueueData,
ServerInfo, TaskDescription, TaskKind, TaskKindProgram, WaitForJobsResponse, WorkerInfo,
};
use tako::internal::worker::configuration::EnvPropagationMode;
use tako::server::TaskExplanation;
use tako::{JobId, JobTaskId};

Expand Down Expand Up @@ -555,6 +556,7 @@ fn format_worker_info(worker_info: WorkerInfo) -> serde_json::Value {
max_parallel_downloads,
max_download_tries,
wait_between_download_tries,
env_propagation_mode,
extra: _,
},
started,
Expand All @@ -578,6 +580,10 @@ fn format_worker_info(worker_info: WorkerInfo) -> serde_json::Value {
"max_parallel_downloads": max_parallel_downloads,
"max_download_tries": max_download_tries,
"wait_between_download_tries": format_duration(wait_between_download_tries),
"env_propagation_mode": match env_propagation_mode {
EnvPropagationMode::Propagate => "propagate",
EnvPropagationMode::Isolate => "isolate"
}
}),
"allocation": manager_info.map(|info| json!({
"manager": FormattedManagerType(info.manager),
Expand Down
9 changes: 8 additions & 1 deletion crates/hyperqueue/src/worker/start/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::worker::start::{RunningTaskContext, SharedTaskDescription};
use crate::worker::streamer::StreamSender;
use crate::worker::streamer::StreamerRef;
use tako::comm::serialize;
use tako::internal::worker::configuration::EnvPropagationMode;
use tako::launcher::{
StopReason, TaskBuildContext, TaskLaunchData, TaskResult, command_from_definitions,
};
Expand Down Expand Up @@ -199,6 +200,10 @@ pub(super) fn build_program_task(
let task_future = create_task_future(
streamer_ref.clone(),
program,
build_ctx
.worker_configuration()
.env_propagation_mode
.clone(),
task_id,
instance_id,
stop_receiver,
Expand Down Expand Up @@ -499,13 +504,14 @@ fn check_error_filename(task_dir: TempDir) -> Option<tako::Error> {
async fn create_task_future(
streamer_ref: StreamerRef,
program: ProgramDefinition,
env_propagation_mode: EnvPropagationMode,
task_id: TaskId,
instance_id: InstanceId,
end_receiver: Receiver<StopReason>,
task_dir: Option<TempDir>,
stream_path: Option<PathBuf>,
) -> tako::Result<TaskResult> {
let mut command = command_from_definitions(&program)?;
let mut command = command_from_definitions(&program, &env_propagation_mode)?;

let status_to_result = |status: ExitStatus| {
if !status.success() {
Expand Down Expand Up @@ -622,6 +628,7 @@ fn signal_name(signal: i32) -> &'static str {
async fn create_task_future(
_streamer_ref: StreamerRef,
_program: ProgramDefinition,
_env_propagation_mode: EnvPropagationMode,
_task_id: TaskId,
_instance_id: InstanceId,
_end_receiver: Receiver<StopReason>,
Expand Down
5 changes: 3 additions & 2 deletions crates/tako/src/internal/tests/integration/utils/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::internal::common::error::DsError;
use crate::internal::common::resources::ResourceDescriptor;
use crate::internal::worker::configuration::{
DEFAULT_MAX_DOWNLOAD_TRIES, DEFAULT_MAX_PARALLEL_DOWNLOADS,
DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES, OverviewConfiguration,
DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES, EnvPropagationMode, OverviewConfiguration,
};
use crate::launcher::{StopReason, TaskBuildContext, TaskResult};
use crate::program::ProgramDefinition;
Expand Down Expand Up @@ -86,6 +86,7 @@ pub fn create_worker_configuration(
wait_between_download_tries: DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES,
time_limit: None,
extra,
env_propagation_mode: EnvPropagationMode::Propagate,
},
secret_key,
)
Expand Down Expand Up @@ -230,7 +231,7 @@ pub(super) async fn start_worker(
}

async fn launcher_main(program: ProgramDefinition) -> crate::Result<()> {
let mut command = command_from_definitions(&program)?;
let mut command = command_from_definitions(&program, &EnvPropagationMode::Propagate)?;
let mut process = command.spawn()?;

if !program.stdin.is_empty() {
Expand Down
12 changes: 12 additions & 0 deletions crates/tako/src/internal/worker/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,21 @@ pub struct WorkerConfiguration {
pub max_download_tries: u32,
pub wait_between_download_tries: Duration,

pub env_propagation_mode: EnvPropagationMode,

pub extra: Map<String, String>,
}

/// How should environment variables be handled when a worker starts a new task.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum EnvPropagationMode {
/// Propagate environment varibles from the worker environment to the task.
Propagate,
/// Isolate the task. Do not propagate any environment variables from the worker except for
/// PATH.
Isolate,
}

/// This function is used from both the server and the worker to keep the same values
/// in the worker configuration without the need for repeated configuration exchange.
pub(crate) fn sync_worker_configuration(
Expand Down
15 changes: 13 additions & 2 deletions crates/tako/src/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::process::Command;

use crate::gateway::{EntryType, TaskDataFlags};
use crate::internal::common::resources::map::ResourceMap;
use crate::internal::worker::configuration::WorkerConfiguration;
use crate::internal::worker::configuration::{EnvPropagationMode, WorkerConfiguration};
use crate::internal::worker::localcomm::Token;
use crate::internal::worker::resources::map::ResourceLabelMap;
use crate::internal::worker::state::WorkerState;
Expand Down Expand Up @@ -174,7 +174,10 @@ fn create_output_stream(def: &StdioDef, cwd: &Path) -> crate::Result<Stdio> {
Ok(stdio)
}

pub fn command_from_definitions(definition: &ProgramDefinition) -> crate::Result<Command> {
pub fn command_from_definitions(
definition: &ProgramDefinition,
env_propagation_mode: &EnvPropagationMode,
) -> crate::Result<Command> {
if definition.args.is_empty() {
return Result::Err(crate::Error::GenericError(
"No command arguments".to_string(),
Expand All @@ -183,6 +186,14 @@ pub fn command_from_definitions(definition: &ProgramDefinition) -> crate::Result

let mut command = Command::new(definition.args[0].to_os_str_lossy());

match env_propagation_mode {
EnvPropagationMode::Propagate => {}
EnvPropagationMode::Isolate => {
// TODO: propagate path?
command.env_clear();
}
}

#[cfg(target_os = "linux")]
unsafe {
command.pre_exec(|| {
Expand Down
Loading