Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/devolutions-agent-shared/src/windows/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ pub fn get_installed_product_version(
})?;

// Convert encoded MSI version number to human-readable date.
// The high byte encodes the year as an offset:
// - Agent builds use a base year of 2000 (year = high_byte + 2000).
// - RDM MSI packages use 0x700 (1792) as base, found empirically.
// This offset must be preserved to correctly decode existing RDM installations.
let short_year = match version_encoding {
ProductVersionEncoding::Agent => (product_version >> 24) + 2000,
ProductVersionEncoding::Rdm => (product_version >> 24) + 0x700,
Expand Down
2 changes: 1 addition & 1 deletion devolutions-session/src/dvc/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub fn run_dvc_io(
trace!("DVC channel opened");

// All DVC messages should be under CHANNEL_CHUNK_LENGTH size, but sometimes RDP stack
// a few messages together; 128Kb buffer should be enough to hold a few dozen messages.
// sends a few messages together; 128Kb buffer should be enough to hold a few dozen messages.
let mut pdu_chunk_buffer = [0u8; 128 * 1024];
let mut overlapped = OVERLAPPED::default();
let mut bytes_read: u32 = 0;
Expand Down
112 changes: 68 additions & 44 deletions devolutions-session/src/dvc/rdm.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::mem::size_of;
use std::sync::Arc;
use std::sync::atomic::{AtomicU8, Ordering};
Expand Down Expand Up @@ -89,7 +88,7 @@ impl RdmPipeConnection {
/// The ready event is dropped after connection, allowing RDM to own it.
async fn connect(timeout_secs: u32, ready_event: Event) -> anyhow::Result<Self> {
let pipe_name = get_rdm_pipe_name()?;
let timeout_ms = timeout_secs * 1000;
let timeout_ms = timeout_secs.saturating_mul(1000);

info!(pipe_name, timeout_secs, "Waiting for RDM and connecting to pipe");

Expand Down Expand Up @@ -207,7 +206,9 @@ fn validate_capset_response(message: NowMessage<'_>) -> anyhow::Result<NowChanne

/// Perform NOW protocol negotiation with RDM
///
/// Sends agent capabilities to RDM and receives RDM's downgraded capabilities.
/// Sends the agent's proposed capabilities to RDM and receives RDM's negotiated
/// capabilities for the connection (a final set that may be a downgraded subset
/// of the agent's proposal and that both sides will use).
/// This establishes the protocol version and capabilities for the connection.
async fn negotiate_with_rdm(
pipe: &mut RdmPipeConnection,
Expand Down Expand Up @@ -303,12 +304,20 @@ async fn run_rdm_to_dvc_passthrough(
}

// Receive messages from channel and write to RDM pipe
Some(message) = dvc_rx.recv() => {
info!(pipe_name, "Forwarding message to RDM: {:?}", message);
if let Err(error) = pipe.send_message(&message).await {
error!(%error, pipe_name, "Failed to send message to RDM pipe");
connection_state.store(RdmConnectionState::Disconnected.as_u8(), Ordering::Release);
break;
message_opt = dvc_rx.recv() => {
match message_opt {
Some(message) => {
info!(pipe_name, "Forwarding message to RDM: {:?}", message);
if let Err(error) = pipe.send_message(&message).await {
error!(%error, pipe_name, "Failed to send message to RDM pipe");
connection_state.store(RdmConnectionState::Disconnected.as_u8(), Ordering::Release);
break;
}
}
None => {
info!(pipe_name, "DVC receiver channel closed; terminating RDM passthrough task");
break;
}
}
}
}
Expand Down Expand Up @@ -420,25 +429,50 @@ impl RdmMessageProcessor {
/// - If RDM is not started, launch RDM and start connection process.
/// - Spawns a background task to handle the connection process.
pub fn process_app_start(&mut self, rdm_app_start_msg: NowRdmAppStartMsg, agent_caps: NowChannelCapsetMsg) {
let current_state = RdmConnectionState::from_u8(self.connection_state.load(Ordering::Acquire));
let mut current_state = RdmConnectionState::from_u8(self.connection_state.load(Ordering::Acquire));

match current_state {
RdmConnectionState::Ready => {
info!("RDM already connected and ready, sending immediate READY notification");
let dvc_tx = self.dvc_tx.clone();
tokio::spawn(async move {
let _ = send_rdm_app_notify(&dvc_tx, NowRdmAppState::READY, NowRdmReason::NOT_SPECIFIED).await;
});
return;
}
RdmConnectionState::Connecting => {
info!("RDM connection already in progress, ignoring duplicate app_start");
return;
}
RdmConnectionState::Disconnected => {
info!("Starting RDM connection process");
self.connection_state
.store(RdmConnectionState::Connecting.as_u8(), Ordering::Release);
// Ensure that the transition from Disconnected to Connecting is done atomically
// so that only one task is spawned to handle app_start.
loop {
match current_state {
RdmConnectionState::Ready => {
info!("RDM already connected and ready, sending immediate READY notification");
let dvc_tx = self.dvc_tx.clone();
tokio::spawn(async move {
if let Err(error) =
send_rdm_app_notify(&dvc_tx, NowRdmAppState::READY, NowRdmReason::NOT_SPECIFIED).await
{
error!(%error, "Failed to send immediate RDM READY notification");
}
});
return;
}
RdmConnectionState::Connecting => {
info!("RDM connection already in progress, ignoring duplicate app_start");
return;
}
RdmConnectionState::Disconnected => {
info!("Starting RDM connection process");
let disconnected = RdmConnectionState::Disconnected.as_u8();
let connecting = RdmConnectionState::Connecting.as_u8();

match self.connection_state.compare_exchange(
disconnected,
connecting,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Successfully claimed responsibility for starting the connection.
break;
}
Err(actual) => {
// State changed concurrently; re-evaluate the new state.
current_state = RdmConnectionState::from_u8(actual);
continue;
}
}
}
}
}

Expand All @@ -451,9 +485,11 @@ impl RdmMessageProcessor {

tokio::spawn(async move {
if let Err(error) =
process_app_start_impl(rdm_app_start_msg, agent_caps, dvc_tx, connection_state, rdm_rx).await
process_app_start_impl(rdm_app_start_msg, agent_caps, dvc_tx, connection_state.clone(), rdm_rx).await
{
error!(%error, "RDM app start failed");
// Ensure connection_state is reset so future app_start attempts are possible
connection_state.store(RdmConnectionState::Disconnected.as_u8(), Ordering::Release);
}
});
}
Expand Down Expand Up @@ -587,21 +623,6 @@ async fn launch_rdm_process(rdm_app_start_msg: &NowRdmAppStartMsg) -> anyhow::Re
.to_string_lossy()
.to_string();

let mut env_vars = HashMap::new();

if rdm_app_start_msg.is_fullscreen() {
env_vars.insert("RDM_OPT_FULLSCREEN".to_owned(), "1".to_owned());
info!("Starting RDM in fullscreen mode");
}

if rdm_app_start_msg.is_jump_mode() {
env_vars.insert("RDM_OPT_JUMP".to_owned(), "1".to_owned());
info!("Starting RDM in jump mode");
}

// Create environment block
let _env_block = crate::dvc::env::make_environment_block(env_vars)?;

// Convert command line to wide string
let current_dir = WideString::from(&install_location);

Expand Down Expand Up @@ -693,7 +714,10 @@ fn is_rdm_running() -> bool {
};

// Compare the full paths case-insensitively
if exe_path == rdm_exe_path {
if exe_path
.to_string_lossy()
.eq_ignore_ascii_case(&rdm_exe_path.to_string_lossy())
{
info!(
rdm_path = %rdm_exe_path.display(),
found_path = %exe_path.display(),
Expand Down