Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions cortex-app-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,101 @@ impl ServerConfig {
Ok(config)
}

/// Watch a config file for changes, supporting atomic replacement (#2171).
///
/// Configuration management tools (Ansible, Puppet, Chef) often replace files
/// atomically using a temp-file-plus-rename strategy. Standard file watchers
/// track the original inode and miss these changes.
///
/// This function watches the parent directory and detects when the config file
/// is created/renamed, which properly handles atomic replacements.
///
/// Returns a receiver that yields the new config whenever the file changes.
#[cfg(feature = "config-watcher")]
pub fn watch(
path: impl AsRef<std::path::Path>,
) -> anyhow::Result<tokio::sync::mpsc::Receiver<Self>> {
use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
use std::sync::mpsc::channel;

let path = path.as_ref().to_path_buf();
let file_name = path
.file_name()
.ok_or_else(|| anyhow::anyhow!("Config path has no filename"))?
.to_string_lossy()
.to_string();

// Watch the parent directory to catch atomic replacements
let parent_dir = path
.parent()
.ok_or_else(|| anyhow::anyhow!("Config path has no parent directory"))?
.to_path_buf();

let (tx, rx) = tokio::sync::mpsc::channel(16);
let (notify_tx, notify_rx) = channel();

let mut watcher = RecommendedWatcher::new(notify_tx, Config::default())?;

// Watch the parent directory, not the file itself
// This ensures we catch atomic renames (temp file -> config file)
watcher.watch(&parent_dir, RecursiveMode::NonRecursive)?;

let config_path = path.clone();
let config_file_name = file_name.clone();

std::thread::Builder::new()
.name("config-watcher".to_string())
.spawn(move || {
let _watcher = watcher; // Keep watcher alive

loop {
match notify_rx.recv() {
Ok(Ok(event)) => {
// Check if the event involves our config file
let involves_config = event.paths.iter().any(|p| {
p.file_name()
.map(|n| n.to_string_lossy() == config_file_name)
.unwrap_or(false)
});

if involves_config {
// Small delay to ensure atomic rename is complete
std::thread::sleep(std::time::Duration::from_millis(50));

// Try to reload the config
match Self::load(&config_path) {
Ok(config) => {
tracing::info!(
"Config file changed (atomic replacement detected), reloading"
);
if tx.blocking_send(config).is_err() {
tracing::debug!("Config watcher channel closed");
break;
}
}
Err(e) => {
tracing::warn!(
"Failed to reload config after change: {}",
e
);
}
}
}
}
Ok(Err(e)) => {
tracing::warn!("Config watcher error: {}", e);
}
Err(_) => {
tracing::debug!("Config watcher notify channel closed");
break;
}
}
}
})?;

Ok(rx)
}

/// Load from environment variables.
pub fn from_env() -> anyhow::Result<Self> {
let mut config = Self::default();
Expand Down Expand Up @@ -266,6 +361,10 @@ pub struct RateLimitConfig {
/// Exempt paths from rate limiting.
#[serde(default)]
pub exempt_paths: Vec<String>,
/// Trust proxy headers for client IP detection.
/// When enabled, checks X-Forwarded-For and X-Real-IP headers.
#[serde(default)]
pub trust_proxy: bool,
}

fn default_rpm() -> u32 {
Expand All @@ -286,6 +385,7 @@ impl Default for RateLimitConfig {
by_api_key: false,
by_user: false,
exempt_paths: vec!["/health".to_string()],
trust_proxy: false,
}
}
}
Expand Down
42 changes: 39 additions & 3 deletions cortex-cli/src/export_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,48 @@ impl ExportCommand {
serde_json::to_string(&export)?
};

// Write to output
// Write to output with proper fsync to detect network filesystem issues (#2169)
match self.output {
Some(path) => {
std::fs::write(&path, &json)
use std::io::Write;

// Create file and write content
let mut file = std::fs::File::create(&path)
.with_context(|| format!("Failed to create file: {}", path.display()))?;

file.write_all(json.as_bytes())
.with_context(|| format!("Failed to write to: {}", path.display()))?;
eprintln!("Exported session to: {}", path.display());

// Explicitly sync to disk to detect network filesystem write failures
// This ensures the data is actually flushed to the network share
file.sync_all()
.with_context(|| format!(
"Failed to sync file to disk: {}. The file may be incomplete or corrupted. \
This can happen with network filesystems during temporary connectivity issues.",
path.display()
))?;

// Verify the written file size matches expected size
let written_size = std::fs::metadata(&path)
.with_context(|| format!("Failed to verify written file: {}", path.display()))?
.len();
let expected_size = json.len() as u64;

if written_size != expected_size {
bail!(
"File verification failed: expected {} bytes but wrote {} bytes to {}. \
The export may be incomplete due to filesystem issues.",
expected_size,
written_size,
path.display()
);
}

eprintln!(
"Exported session to: {} ({} bytes)",
path.display(),
written_size
);
}
None => {
println!("{json}");
Expand Down
72 changes: 72 additions & 0 deletions cortex-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,75 @@ fn pre_main_hardening() {
cortex_process_hardening::pre_main_hardening();
}

/// Validate LC_ALL and LANG environment variables (#2090).
/// Warns if an invalid locale is set which could cause encoding issues.
fn validate_locale_env() {
// Common valid locale patterns
fn is_valid_locale(locale: &str) -> bool {
if locale.is_empty() || locale == "C" || locale == "POSIX" {
return true;
}
// Valid locales typically match patterns like:
// - en_US.UTF-8
// - en_US.utf8
// - en_US
// - C.UTF-8
// - POSIX
let valid_patterns = [
// language_territory.encoding format
|s: &str| {
let parts: Vec<&str> = s.split('.').collect();
if parts.is_empty() || parts.len() > 2 {
return false;
}
let lang_part = parts[0];
// Check for language_TERRITORY or just language format
if lang_part.contains('_') {
let lang_parts: Vec<&str> = lang_part.split('_').collect();
lang_parts.len() == 2
&& lang_parts[0].len() >= 2
&& lang_parts[0].chars().all(|c| c.is_ascii_lowercase())
&& lang_parts[1].len() >= 2
&& lang_parts[1]
.chars()
.all(|c| c.is_ascii_uppercase() || c.is_ascii_digit())
} else {
// Just language code
lang_part.len() >= 2 && lang_part.chars().all(|c| c.is_ascii_lowercase())
}
},
];

valid_patterns.iter().any(|check| check(s))
}

// Check LC_ALL first (takes precedence)
if let Ok(lc_all) = std::env::var("LC_ALL") {
if !lc_all.is_empty() && !is_valid_locale(&lc_all) {
eprintln!(
"Warning: Invalid LC_ALL value '{}'. This may cause encoding issues.",
lc_all
);
eprintln!("Hint: Use a valid locale like 'en_US.UTF-8' or 'C.UTF-8', or unset LC_ALL.");
}
}

// Check LANG if LC_ALL is not set
if std::env::var("LC_ALL").is_err() {
if let Ok(lang) = std::env::var("LANG") {
if !lang.is_empty() && !is_valid_locale(&lang) {
eprintln!(
"Warning: Invalid LANG value '{}'. This may cause encoding issues.",
lang
);
eprintln!(
"Hint: Use a valid locale like 'en_US.UTF-8' or 'C.UTF-8', or unset LANG."
);
}
}
}
}

/// Guard that ensures debug log file is properly flushed when dropped.
struct DebugLogGuard {
_guard: tracing_appender::non_blocking::WorkerGuard,
Expand Down Expand Up @@ -871,6 +940,9 @@ async fn main() -> Result<()> {
// Install panic hook that suggests RUST_BACKTRACE for debugging
cortex_cli::install_panic_hook();

// Validate LC_ALL environment variable if set (#2090)
validate_locale_env();

let cli = Cli::parse();

// Handle color mode - set NO_COLOR env if --color never is used
Expand Down
15 changes: 13 additions & 2 deletions cortex-cli/src/scrape_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,19 @@ impl ScrapeCommand {
}
}
None => {
print!("{output}");
std::io::stdout().flush()?;
// Handle BrokenPipe gracefully when output is piped (#2095)
use std::io::ErrorKind;
if let Err(e) = write!(std::io::stdout(), "{output}") {
if e.kind() != ErrorKind::BrokenPipe {
return Err(e.into());
}
return Ok(());
}
if let Err(e) = std::io::stdout().flush() {
if e.kind() != ErrorKind::BrokenPipe {
return Err(e.into());
}
}
}
}

Expand Down
62 changes: 52 additions & 10 deletions cortex-cli/src/upgrade_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use anyhow::{Context, Result};
use clap::Parser;
use cortex_engine::create_default_client;
use std::io::{Write, stdout};
use std::io::{self, IsTerminal, Write, stdout};

use cortex_update::{
ReleaseChannel, SOFTWARE_URL, UpdateConfig, UpdateInfo, UpdateManager, UpdateOutcome,
Expand Down Expand Up @@ -53,16 +53,48 @@ pub struct UpgradeCli {
pub url: Option<String>,
}

/// Safe println that handles BrokenPipe gracefully (#2093).
/// Returns false if pipe is broken, true otherwise.
fn safe_println(msg: &str) -> bool {
if writeln!(io::stdout(), "{}", msg).is_err() {
return false;
}
if io::stdout().flush().is_err() {
return false;
}
true
}

/// Safe print (no newline) that handles BrokenPipe gracefully.
fn safe_print(msg: &str) -> bool {
if write!(io::stdout(), "{}", msg).is_err() {
return false;
}
if io::stdout().flush().is_err() {
return false;
}
true
}

impl UpgradeCli {
/// Run the upgrade command.
pub async fn run(self) -> Result<()> {
println!("Cortex CLI Upgrade");
println!("{}", "=".repeat(40));
println!("Current version: v{}", CLI_VERSION);
println!(
// Handle piped output gracefully (#2093)
if !safe_println("Cortex CLI Upgrade") {
return Ok(());
}
if !safe_println(&"=".repeat(40)) {
return Ok(());
}
if !safe_println(&format!("Current version: v{}", CLI_VERSION)) {
return Ok(());
}
if !safe_println(&format!(
"Update server: {}",
self.url.as_deref().unwrap_or(SOFTWARE_URL)
);
)) {
return Ok(());
}

// Parse channel (--pre is shorthand for --channel beta)
let channel = if self.pre {
Expand Down Expand Up @@ -108,10 +140,20 @@ impl UpgradeCli {
match manager.check_update_forced().await {
Ok(Some(info)) => Some(info),
Ok(None) => {
println!(
"\n✓ You are already on the latest version (v{})",
CLI_VERSION
);
// For non-stable channels, provide more helpful message (#2091)
if channel != ReleaseChannel::Stable {
println!(
"\n✓ No {} versions available at this time (current: v{})",
self.channel, CLI_VERSION
);
println!(" The {} channel may not have releases yet.", self.channel);
println!(" Check back later or use --channel stable for stable releases.");
} else {
println!(
"\n✓ You are already on the latest version (v{})",
CLI_VERSION
);
}
return Ok(());
}
Err(e) => {
Expand Down
Loading