diff --git a/crates/devolutions-agent-shared/src/windows/registry.rs b/crates/devolutions-agent-shared/src/windows/registry.rs index 801eb055f..400c4b283 100644 --- a/crates/devolutions-agent-shared/src/windows/registry.rs +++ b/crates/devolutions-agent-shared/src/windows/registry.rs @@ -87,6 +87,10 @@ pub fn get_installed_product_version( })?; // Convert encoded MSI version number to human-readable date. + // The high byte encodes the year as an offset: + // - Agent builds use a base year of 2000 (year = high_byte + 2000). + // - RDM MSI packages use 0x700 (1792) as base, found empirically. + // This offset must be preserved to correctly decode existing RDM installations. let short_year = match version_encoding { ProductVersionEncoding::Agent => (product_version >> 24) + 2000, ProductVersionEncoding::Rdm => (product_version >> 24) + 0x700, diff --git a/devolutions-session/src/dvc/io.rs b/devolutions-session/src/dvc/io.rs index 94941b003..87b871223 100644 --- a/devolutions-session/src/dvc/io.rs +++ b/devolutions-session/src/dvc/io.rs @@ -32,7 +32,7 @@ pub fn run_dvc_io( trace!("DVC channel opened"); // All DVC messages should be under CHANNEL_CHUNK_LENGTH size, but sometimes RDP stack - // a few messages together; 128Kb buffer should be enough to hold a few dozen messages. + // sends a few messages together; 128Kb buffer should be enough to hold a few dozen messages. let mut pdu_chunk_buffer = [0u8; 128 * 1024]; let mut overlapped = OVERLAPPED::default(); let mut bytes_read: u32 = 0; diff --git a/devolutions-session/src/dvc/rdm.rs b/devolutions-session/src/dvc/rdm.rs index c476e1ee8..3a106eb89 100644 --- a/devolutions-session/src/dvc/rdm.rs +++ b/devolutions-session/src/dvc/rdm.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::mem::size_of; use std::sync::Arc; use std::sync::atomic::{AtomicU8, Ordering}; @@ -89,7 +88,7 @@ impl RdmPipeConnection { /// The ready event is dropped after connection, allowing RDM to own it. async fn connect(timeout_secs: u32, ready_event: Event) -> anyhow::Result { let pipe_name = get_rdm_pipe_name()?; - let timeout_ms = timeout_secs * 1000; + let timeout_ms = timeout_secs.saturating_mul(1000); info!(pipe_name, timeout_secs, "Waiting for RDM and connecting to pipe"); @@ -207,7 +206,9 @@ fn validate_capset_response(message: NowMessage<'_>) -> anyhow::Result { - info!(pipe_name, "Forwarding message to RDM: {:?}", message); - if let Err(error) = pipe.send_message(&message).await { - error!(%error, pipe_name, "Failed to send message to RDM pipe"); - connection_state.store(RdmConnectionState::Disconnected.as_u8(), Ordering::Release); - break; + message_opt = dvc_rx.recv() => { + match message_opt { + Some(message) => { + info!(pipe_name, "Forwarding message to RDM: {:?}", message); + if let Err(error) = pipe.send_message(&message).await { + error!(%error, pipe_name, "Failed to send message to RDM pipe"); + connection_state.store(RdmConnectionState::Disconnected.as_u8(), Ordering::Release); + break; + } + } + None => { + info!(pipe_name, "DVC receiver channel closed; terminating RDM passthrough task"); + break; + } } } } @@ -420,25 +429,50 @@ impl RdmMessageProcessor { /// - If RDM is not started, launch RDM and start connection process. /// - Spawns a background task to handle the connection process. pub fn process_app_start(&mut self, rdm_app_start_msg: NowRdmAppStartMsg, agent_caps: NowChannelCapsetMsg) { - let current_state = RdmConnectionState::from_u8(self.connection_state.load(Ordering::Acquire)); + let mut current_state = RdmConnectionState::from_u8(self.connection_state.load(Ordering::Acquire)); - match current_state { - RdmConnectionState::Ready => { - info!("RDM already connected and ready, sending immediate READY notification"); - let dvc_tx = self.dvc_tx.clone(); - tokio::spawn(async move { - let _ = send_rdm_app_notify(&dvc_tx, NowRdmAppState::READY, NowRdmReason::NOT_SPECIFIED).await; - }); - return; - } - RdmConnectionState::Connecting => { - info!("RDM connection already in progress, ignoring duplicate app_start"); - return; - } - RdmConnectionState::Disconnected => { - info!("Starting RDM connection process"); - self.connection_state - .store(RdmConnectionState::Connecting.as_u8(), Ordering::Release); + // Ensure that the transition from Disconnected to Connecting is done atomically + // so that only one task is spawned to handle app_start. + loop { + match current_state { + RdmConnectionState::Ready => { + info!("RDM already connected and ready, sending immediate READY notification"); + let dvc_tx = self.dvc_tx.clone(); + tokio::spawn(async move { + if let Err(error) = + send_rdm_app_notify(&dvc_tx, NowRdmAppState::READY, NowRdmReason::NOT_SPECIFIED).await + { + error!(%error, "Failed to send immediate RDM READY notification"); + } + }); + return; + } + RdmConnectionState::Connecting => { + info!("RDM connection already in progress, ignoring duplicate app_start"); + return; + } + RdmConnectionState::Disconnected => { + info!("Starting RDM connection process"); + let disconnected = RdmConnectionState::Disconnected.as_u8(); + let connecting = RdmConnectionState::Connecting.as_u8(); + + match self.connection_state.compare_exchange( + disconnected, + connecting, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => { + // Successfully claimed responsibility for starting the connection. + break; + } + Err(actual) => { + // State changed concurrently; re-evaluate the new state. + current_state = RdmConnectionState::from_u8(actual); + continue; + } + } + } } } @@ -451,9 +485,11 @@ impl RdmMessageProcessor { tokio::spawn(async move { if let Err(error) = - process_app_start_impl(rdm_app_start_msg, agent_caps, dvc_tx, connection_state, rdm_rx).await + process_app_start_impl(rdm_app_start_msg, agent_caps, dvc_tx, connection_state.clone(), rdm_rx).await { error!(%error, "RDM app start failed"); + // Ensure connection_state is reset so future app_start attempts are possible + connection_state.store(RdmConnectionState::Disconnected.as_u8(), Ordering::Release); } }); } @@ -587,21 +623,6 @@ async fn launch_rdm_process(rdm_app_start_msg: &NowRdmAppStartMsg) -> anyhow::Re .to_string_lossy() .to_string(); - let mut env_vars = HashMap::new(); - - if rdm_app_start_msg.is_fullscreen() { - env_vars.insert("RDM_OPT_FULLSCREEN".to_owned(), "1".to_owned()); - info!("Starting RDM in fullscreen mode"); - } - - if rdm_app_start_msg.is_jump_mode() { - env_vars.insert("RDM_OPT_JUMP".to_owned(), "1".to_owned()); - info!("Starting RDM in jump mode"); - } - - // Create environment block - let _env_block = crate::dvc::env::make_environment_block(env_vars)?; - // Convert command line to wide string let current_dir = WideString::from(&install_location); @@ -693,7 +714,10 @@ fn is_rdm_running() -> bool { }; // Compare the full paths case-insensitively - if exe_path == rdm_exe_path { + if exe_path + .to_string_lossy() + .eq_ignore_ascii_case(&rdm_exe_path.to_string_lossy()) + { info!( rdm_path = %rdm_exe_path.display(), found_path = %exe_path.display(),