Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pin-project-lite = "0.2"
tokio-stream = "0.1"
protobuf-src = "1.1.0"
url = "2"
socket2 = "0.6"

# Database
sqlx = { version = "0.8", features = ["runtime-tokio-rustls", "postgres", "sqlite", "migrate"] }
Expand Down
11 changes: 11 additions & 0 deletions architecture/sandbox-connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,22 @@ sequenceDiagram
5. When SSH starts, it spawns the `ssh-proxy` subprocess as its `ProxyCommand`.
6. `crates/navigator-cli/src/ssh.rs` -- `sandbox_ssh_proxy()`:
- Parses the gateway URL, connects via TCP (plain) or TLS (mTLS)
- Enables TCP keepalive on the gateway socket
- Sends a raw HTTP CONNECT request with `X-Sandbox-Id` and `X-Sandbox-Token` headers
- Reads the response status line; proceeds if 200
- Spawns two `tokio::spawn` tasks for bidirectional copy between stdin/stdout and the gateway stream
- When the remote-to-stdout direction completes, aborts the stdin-to-remote task (SSH has all the data it needs)

### Connection stability

Recent SSH stability hardening is split across the client, gateway, sandbox, and edge tunnel paths:

- **OpenSSH keepalives**: the CLI now sets `ServerAliveInterval=30` and `ServerAliveCountMax=3` on every SSH invocation so idle sessions still emit SSH traffic.
- **TCP keepalive**: the CLI-to-gateway and gateway-to-sandbox TCP sockets enable 30-second keepalive probes to reduce drops from NAT, load balancers, and other idle-sensitive middleboxes.
- **Sandbox SSH daemon**: the embedded `russh` server disables its default 10-minute inactivity timeout and instead sends protocol keepalives every 30 seconds. This prevents quiet shells from being garbage-collected while still detecting dead peers.
- **Edge WebSocket tunnel**: the WebSocket bridge now lets both copy directions observe shutdown instead of aborting the peer task immediately, which reduces abrupt closes and truncated tail data.
- **Limit diagnostics**: when the gateway rejects a connection because the per-session or per-sandbox cap is reached, it now logs the active count and configured limit to make 429s easier to diagnose.

### Command Execution (CLI)

The `sandbox exec` path is identical to interactive connect except:
Expand Down
40 changes: 20 additions & 20 deletions crates/navigator-cli/src/edge_tunnel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ use tokio_tungstenite::tungstenite::http::HeaderValue;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tracing::{debug, error, warn};

const EDGE_TUNNEL_WS_CONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);

/// A running edge-authenticated tunnel proxy.
///
/// The proxy listens on a local TCP port and tunnels each connection over a
Expand Down Expand Up @@ -124,24 +126,18 @@ async fn handle_connection(tcp_stream: TcpStream, config: &TunnelConfig) -> Resu
let (ws_sink, ws_source) = ws_stream.split();
let (tcp_read, tcp_write) = tokio::io::split(tcp_stream);

// Two tasks: TCP->WS and WS->TCP. Abort the peer task once either
// direction finishes so the connection tears down promptly.
let mut tcp_to_ws = tokio::spawn(copy_tcp_to_ws(tcp_read, ws_sink));
let mut ws_to_tcp = tokio::spawn(copy_ws_to_tcp(ws_source, tcp_write));
// Keep both directions alive until each side has observed shutdown. This
// avoids cutting off trailing bytes when one half closes slightly earlier
// than the other.
let tcp_to_ws = tokio::spawn(copy_tcp_to_ws(tcp_read, ws_sink));
let ws_to_tcp = tokio::spawn(copy_ws_to_tcp(ws_source, tcp_write));

tokio::select! {
res = &mut tcp_to_ws => {
if let Err(e) = res {
debug!(error = %e, "tcp->ws task panicked");
}
ws_to_tcp.abort();
}
res = &mut ws_to_tcp => {
if let Err(e) = res {
debug!(error = %e, "ws->tcp task panicked");
}
tcp_to_ws.abort();
}
let (tcp_to_ws_res, ws_to_tcp_res) = tokio::join!(tcp_to_ws, ws_to_tcp);
if let Err(e) = tcp_to_ws_res {
debug!(error = %e, "tcp->ws task panicked");
}
if let Err(e) = ws_to_tcp_res {
debug!(error = %e, "ws->tcp task panicked");
}

Ok(())
Expand Down Expand Up @@ -170,9 +166,13 @@ async fn open_ws(config: &TunnelConfig) -> Result<WebSocketStream<MaybeTlsStream

debug!(url = %config.ws_url, "opening WebSocket to edge");

let (ws_stream, response) = tokio_tungstenite::connect_async(request)
.await
.map_err(|e| miette::miette!("WebSocket connect failed: {e}"))?;
let (ws_stream, response) = tokio::time::timeout(
EDGE_TUNNEL_WS_CONNECT_TIMEOUT,
tokio_tungstenite::connect_async(request),
)
.await
.map_err(|_| miette::miette!("timed out opening WebSocket to edge"))?
.map_err(|e| miette::miette!("WebSocket connect failed: {e}"))?;

debug!(
status = %response.status(),
Expand Down
76 changes: 66 additions & 10 deletions crates/navigator-cli/src/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use miette::{IntoDiagnostic, Result, WrapErr};
use navigator_core::forward::{
find_ssh_forward_pid, resolve_ssh_gateway, shell_escape, write_forward_pid,
};
use navigator_core::net::enable_tcp_keepalive;
use navigator_core::proto::{CreateSshSessionRequest, GetSandboxRequest};
use owo_colors::OwoColorize;
use rustls::pki_types::ServerName;
Expand All @@ -17,10 +18,16 @@ use std::os::unix::process::CommandExt;
use std::path::Path;
use std::process::Command;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use tokio_rustls::TlsConnector;

const SSH_PROXY_CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const SSH_PROXY_STATUS_TIMEOUT: Duration = Duration::from_secs(10);
const SSH_SERVER_ALIVE_INTERVAL_SECS: u64 = 30;
const SSH_SERVER_ALIVE_COUNT_MAX: u64 = 3;

struct SshSessionConfig {
proxy_command: String,
sandbox_id: String,
Expand Down Expand Up @@ -109,6 +116,12 @@ fn ssh_base_command(proxy_command: &str) -> Command {
.arg("-o")
.arg("GlobalKnownHostsFile=/dev/null")
.arg("-o")
.arg(format!(
"ServerAliveInterval={SSH_SERVER_ALIVE_INTERVAL_SECS}"
))
.arg("-o")
.arg(format!("ServerAliveCountMax={SSH_SERVER_ALIVE_COUNT_MAX}"))
.arg("-o")
.arg("LogLevel=ERROR");
command
}
Expand Down Expand Up @@ -530,10 +543,20 @@ pub async fn sandbox_ssh_proxy(
// any bytes read past the `\r\n\r\n` header boundary stay buffered and
// are returned by subsequent reads during the bidirectional copy phase.
let mut buf_stream = BufReader::new(stream);
let status = read_connect_status(&mut buf_stream).await?;
let status = tokio::time::timeout(
SSH_PROXY_STATUS_TIMEOUT,
read_connect_status(&mut buf_stream),
)
.await
.map_err(|_| miette::miette!("timed out waiting for gateway CONNECT response"))??;
if status != 200 {
let reason = match status {
401 => " (SSH session expired, was revoked, or is invalid)",
429 => " (too many concurrent SSH connections for this session or sandbox)",
_ => "",
};
return Err(miette::miette!(
"gateway CONNECT failed with status {status}"
"gateway CONNECT failed with status {status}{reason}"
));
}

Expand Down Expand Up @@ -594,6 +617,8 @@ pub fn print_ssh_config(gateway: &str, name: &str) {
println!(" StrictHostKeyChecking no");
println!(" UserKnownHostsFile /dev/null");
println!(" GlobalKnownHostsFile /dev/null");
println!(" ServerAliveInterval {SSH_SERVER_ALIVE_INTERVAL_SECS}");
println!(" ServerAliveCountMax {SSH_SERVER_ALIVE_COUNT_MAX}");
println!(" LogLevel ERROR");
println!(" ProxyCommand {proxy_cmd}");
}
Expand Down Expand Up @@ -628,25 +653,37 @@ async fn connect_gateway(
.ok_or_else(|| miette::miette!("edge token required for tunnel"))?;
let gateway_url = format!("https://{host}:{port}");
let proxy = crate::edge_tunnel::start_tunnel_proxy(&gateway_url, token).await?;
let tcp = TcpStream::connect(proxy.local_addr)
.await
.into_diagnostic()?;
let tcp = tokio::time::timeout(
SSH_PROXY_CONNECT_TIMEOUT,
TcpStream::connect(proxy.local_addr),
)
.await
.map_err(|_| miette::miette!("timed out connecting to edge tunnel proxy"))?
.into_diagnostic()?;
tcp.set_nodelay(true).into_diagnostic()?;
let _ = enable_tcp_keepalive(&tcp);
return Ok(Box::new(tcp));
}

let tcp = TcpStream::connect((host, port)).await.into_diagnostic()?;
let tcp = tokio::time::timeout(SSH_PROXY_CONNECT_TIMEOUT, TcpStream::connect((host, port)))
.await
.map_err(|_| miette::miette!("timed out connecting to SSH gateway"))?
.into_diagnostic()?;
tcp.set_nodelay(true).into_diagnostic()?;
let _ = enable_tcp_keepalive(&tcp);
if scheme.eq_ignore_ascii_case("https") {
let materials = require_tls_materials(&format!("https://{host}:{port}"), tls)?;
let config = build_rustls_config(&materials)?;
let connector = TlsConnector::from(Arc::new(config));
let server_name = ServerName::try_from(host.to_string())
.map_err(|_| miette::miette!("invalid server name: {host}"))?;
let tls = connector
.connect(server_name, tcp)
.await
.into_diagnostic()?;
let tls = tokio::time::timeout(
SSH_PROXY_CONNECT_TIMEOUT,
connector.connect(server_name, tcp),
)
.await
.map_err(|_| miette::miette!("timed out establishing TLS to SSH gateway"))?
.into_diagnostic()?;
Ok(Box::new(tls))
} else {
Ok(Box::new(tcp))
Expand Down Expand Up @@ -688,3 +725,22 @@ async fn read_connect_status<R: AsyncRead + Unpin>(stream: &mut R) -> Result<u16
trait ProxyStream: AsyncRead + AsyncWrite + Unpin + Send {}

impl<T> ProxyStream for T where T: AsyncRead + AsyncWrite + Unpin + Send {}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn ssh_base_command_enables_server_keepalives() {
let command = ssh_base_command("openshell ssh-proxy");
let args = command
.get_args()
.map(|arg| arg.to_string_lossy().into_owned())
.collect::<Vec<_>>();

assert!(args.contains(&format!(
"ServerAliveInterval={SSH_SERVER_ALIVE_INTERVAL_SECS}"
)));
assert!(args.contains(&format!("ServerAliveCountMax={SSH_SERVER_ALIVE_COUNT_MAX}")));
}
}
1 change: 1 addition & 0 deletions crates/navigator-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ miette = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
url = { workspace = true }
socket2 = { workspace = true }

[build-dependencies]
tonic-build = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/navigator-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod config;
pub mod error;
pub mod forward;
pub mod inference;
pub mod net;
pub mod proto;

pub use config::{Config, TlsConfig};
Expand Down
60 changes: 60 additions & 0 deletions crates/navigator-core/src/net.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Shared socket configuration helpers.

use socket2::{SockRef, TcpKeepalive};
use std::io;
use std::time::Duration;

/// Idle time before TCP keepalive probes start.
pub const TCP_KEEPALIVE_IDLE: Duration = Duration::from_secs(30);

/// Interval between TCP keepalive probes on supported platforms.
pub const TCP_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30);

fn default_keepalive() -> TcpKeepalive {
let keepalive = TcpKeepalive::new().with_time(TCP_KEEPALIVE_IDLE);
#[cfg(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "fuchsia",
target_os = "illumos",
target_os = "ios",
target_os = "visionos",
target_os = "linux",
target_os = "macos",
target_os = "netbsd",
target_os = "tvos",
target_os = "watchos",
target_os = "windows",
target_os = "cygwin",
))]
let keepalive = keepalive.with_interval(TCP_KEEPALIVE_INTERVAL);
keepalive
}

/// Enable aggressive TCP keepalive on a socket.
#[cfg(unix)]
pub fn enable_tcp_keepalive<S>(socket: &S) -> io::Result<()>
where
S: std::os::fd::AsFd,
{
SockRef::from(socket).set_tcp_keepalive(&default_keepalive())
}

/// Enable aggressive TCP keepalive on a socket.
#[cfg(windows)]
pub fn enable_tcp_keepalive<S>(socket: &S) -> io::Result<()>
where
S: std::os::windows::io::AsSocket,
{
SockRef::from(socket).set_tcp_keepalive(&default_keepalive())
}

/// Enable aggressive TCP keepalive on a socket.
#[cfg(not(any(unix, windows)))]
pub fn enable_tcp_keepalive<S>(_socket: &S) -> io::Result<()> {
Ok(())
}
Loading
Loading