diff --git a/crates/hyperqueue/src/client/commands/autoalloc.rs b/crates/hyperqueue/src/client/commands/autoalloc.rs index 8d0c06dba..4eeb9ee1e 100644 --- a/crates/hyperqueue/src/client/commands/autoalloc.rs +++ b/crates/hyperqueue/src/client/commands/autoalloc.rs @@ -16,7 +16,7 @@ use crate::transfer::connection::ClientSession; use crate::transfer::messages::{ AutoAllocRequest, AutoAllocResponse, FromClientMessage, QueueCreateResponse, ToClientMessage, }; -use clap::Parser; +use clap::{Parser, ValueEnum}; use humantime::format_duration; use tako::resources::{CPU_RESOURCE_NAME, ResourceDescriptor, ResourceDescriptorItem}; @@ -346,6 +346,7 @@ wasted allocation duration." no_hyper_threading, idle_timeout, overview_interval, + env_propagation_mode, } = worker_args; if cpus.is_none() && resource.is_empty() { @@ -354,7 +355,14 @@ wasted allocation duration." ) } - let mut worker_args = vec![]; + let mut worker_args = vec![ + "--env".to_string(), + env_propagation_mode + .to_possible_value() + .unwrap() + .get_name() + .to_owned(), + ]; if let Some(cpus) = cpus { worker_args.extend([ "--cpus".to_string(), diff --git a/crates/hyperqueue/src/client/commands/worker.rs b/crates/hyperqueue/src/client/commands/worker.rs index 02c17af95..cd14beb90 100644 --- a/crates/hyperqueue/src/client/commands/worker.rs +++ b/crates/hyperqueue/src/client/commands/worker.rs @@ -1,4 +1,5 @@ use crate::client::commands::duration_doc; +use crate::tako::internal::worker::configuration::EnvPropagationMode; use anyhow::{Context, bail}; use chrono::Utc; use clap::builder::{PossibleValue, TypedValueParser}; @@ -148,6 +149,27 @@ pub struct SharedWorkerStartOpts { /// size. #[arg(long, value_parser = passthrough_parser(parse_human_time))] pub overview_interval: Option>, + + /// How should environment variables be propagated from the worker to tasks. + #[arg(long("env"), default_value = "propagate")] + pub env_propagation_mode: EnvPropagationModeCli, +} + +#[derive(Debug, Clone, clap::ValueEnum)] +pub enum EnvPropagationModeCli { + /// Propagate environment variables from the worker to tasks. + Propagate, + /// Do not propagate environment variables + Isolate, +} + +impl From for EnvPropagationMode { + fn from(value: EnvPropagationModeCli) -> Self { + match value { + EnvPropagationModeCli::Propagate => EnvPropagationMode::Propagate, + EnvPropagationModeCli::Isolate => EnvPropagationMode::Isolate, + } + } } /// Parses resource detection options (all, none or a comma-separated list of @@ -337,6 +359,7 @@ fn gather_configuration(opts: WorkerStartOpts) -> anyhow::Result anyhow::Result serde_json::Value { max_parallel_downloads, max_download_tries, wait_between_download_tries, + env_propagation_mode, extra: _, }, started, @@ -578,6 +580,10 @@ fn format_worker_info(worker_info: WorkerInfo) -> serde_json::Value { "max_parallel_downloads": max_parallel_downloads, "max_download_tries": max_download_tries, "wait_between_download_tries": format_duration(wait_between_download_tries), + "env_propagation_mode": match env_propagation_mode { + EnvPropagationMode::Propagate => "propagate", + EnvPropagationMode::Isolate => "isolate" + } }), "allocation": manager_info.map(|info| json!({ "manager": FormattedManagerType(info.manager), diff --git a/crates/hyperqueue/src/worker/start/program.rs b/crates/hyperqueue/src/worker/start/program.rs index a8efd25bb..5faf25007 100644 --- a/crates/hyperqueue/src/worker/start/program.rs +++ b/crates/hyperqueue/src/worker/start/program.rs @@ -29,6 +29,7 @@ use crate::worker::start::{RunningTaskContext, SharedTaskDescription}; use crate::worker::streamer::StreamSender; use crate::worker::streamer::StreamerRef; use tako::comm::serialize; +use tako::internal::worker::configuration::EnvPropagationMode; use tako::launcher::{ StopReason, TaskBuildContext, TaskLaunchData, TaskResult, command_from_definitions, }; @@ -199,6 +200,10 @@ pub(super) fn build_program_task( let task_future = create_task_future( streamer_ref.clone(), program, + build_ctx + .worker_configuration() + .env_propagation_mode + .clone(), task_id, instance_id, stop_receiver, @@ -499,13 +504,14 @@ fn check_error_filename(task_dir: TempDir) -> Option { async fn create_task_future( streamer_ref: StreamerRef, program: ProgramDefinition, + env_propagation_mode: EnvPropagationMode, task_id: TaskId, instance_id: InstanceId, end_receiver: Receiver, task_dir: Option, stream_path: Option, ) -> tako::Result { - let mut command = command_from_definitions(&program)?; + let mut command = command_from_definitions(&program, &env_propagation_mode)?; let status_to_result = |status: ExitStatus| { if !status.success() { @@ -622,6 +628,7 @@ fn signal_name(signal: i32) -> &'static str { async fn create_task_future( _streamer_ref: StreamerRef, _program: ProgramDefinition, + _env_propagation_mode: EnvPropagationMode, _task_id: TaskId, _instance_id: InstanceId, _end_receiver: Receiver, diff --git a/crates/tako/src/internal/tests/integration/utils/worker.rs b/crates/tako/src/internal/tests/integration/utils/worker.rs index 7b7691f32..d4c6e1156 100644 --- a/crates/tako/src/internal/tests/integration/utils/worker.rs +++ b/crates/tako/src/internal/tests/integration/utils/worker.rs @@ -9,7 +9,7 @@ use crate::internal::common::error::DsError; use crate::internal::common::resources::ResourceDescriptor; use crate::internal::worker::configuration::{ DEFAULT_MAX_DOWNLOAD_TRIES, DEFAULT_MAX_PARALLEL_DOWNLOADS, - DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES, OverviewConfiguration, + DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES, EnvPropagationMode, OverviewConfiguration, }; use crate::launcher::{StopReason, TaskBuildContext, TaskResult}; use crate::program::ProgramDefinition; @@ -86,6 +86,7 @@ pub fn create_worker_configuration( wait_between_download_tries: DEFAULT_WAIT_BETWEEN_DOWNLOAD_TRIES, time_limit: None, extra, + env_propagation_mode: EnvPropagationMode::Propagate, }, secret_key, ) @@ -230,7 +231,7 @@ pub(super) async fn start_worker( } async fn launcher_main(program: ProgramDefinition) -> crate::Result<()> { - let mut command = command_from_definitions(&program)?; + let mut command = command_from_definitions(&program, &EnvPropagationMode::Propagate)?; let mut process = command.spawn()?; if !program.stdin.is_empty() { diff --git a/crates/tako/src/internal/worker/configuration.rs b/crates/tako/src/internal/worker/configuration.rs index c9a5ed92d..f3187ffab 100644 --- a/crates/tako/src/internal/worker/configuration.rs +++ b/crates/tako/src/internal/worker/configuration.rs @@ -56,9 +56,21 @@ pub struct WorkerConfiguration { pub max_download_tries: u32, pub wait_between_download_tries: Duration, + pub env_propagation_mode: EnvPropagationMode, + pub extra: Map, } +/// How should environment variables be handled when a worker starts a new task. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum EnvPropagationMode { + /// Propagate environment varibles from the worker environment to the task. + Propagate, + /// Isolate the task. Do not propagate any environment variables from the worker except for + /// PATH. + Isolate, +} + /// This function is used from both the server and the worker to keep the same values /// in the worker configuration without the need for repeated configuration exchange. pub(crate) fn sync_worker_configuration( diff --git a/crates/tako/src/launcher.rs b/crates/tako/src/launcher.rs index 513f4f56c..afead172c 100644 --- a/crates/tako/src/launcher.rs +++ b/crates/tako/src/launcher.rs @@ -12,7 +12,7 @@ use tokio::process::Command; use crate::gateway::{EntryType, TaskDataFlags}; use crate::internal::common::resources::map::ResourceMap; -use crate::internal::worker::configuration::WorkerConfiguration; +use crate::internal::worker::configuration::{EnvPropagationMode, WorkerConfiguration}; use crate::internal::worker::localcomm::Token; use crate::internal::worker::resources::map::ResourceLabelMap; use crate::internal::worker::state::WorkerState; @@ -174,7 +174,10 @@ fn create_output_stream(def: &StdioDef, cwd: &Path) -> crate::Result { Ok(stdio) } -pub fn command_from_definitions(definition: &ProgramDefinition) -> crate::Result { +pub fn command_from_definitions( + definition: &ProgramDefinition, + env_propagation_mode: &EnvPropagationMode, +) -> crate::Result { if definition.args.is_empty() { return Result::Err(crate::Error::GenericError( "No command arguments".to_string(), @@ -183,6 +186,14 @@ pub fn command_from_definitions(definition: &ProgramDefinition) -> crate::Result let mut command = Command::new(definition.args[0].to_os_str_lossy()); + match env_propagation_mode { + EnvPropagationMode::Propagate => {} + EnvPropagationMode::Isolate => { + // TODO: propagate path? + command.env_clear(); + } + } + #[cfg(target_os = "linux")] unsafe { command.pre_exec(|| {