Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions control-plane/api-gateway/src/routes/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::AppState;
const ETCD_DESIRED_VERSION_KEY: &str = "/csf/config/desired_cp_version";
const ETCD_UPDATE_RESULT_KEY: &str = "/csf/config/last_update_result";
const ETCD_GHCR_TOKEN_KEY: &str = "/csf/config/ghcr_token";
const ETCD_PAUSED_KEY: &str = "/csf/config/update_paused";

#[derive(Debug, Deserialize)]
pub struct UpdateRequest {
Expand All @@ -27,6 +28,9 @@ pub struct UpdateStatusResponse {
pub current_version: String,
pub desired_version: Option<String>,
pub last_result: Option<String>,
pub paused: bool,
pub agent_version: Option<String>,
pub updater_version: Option<String>,
}

#[derive(Debug, Deserialize)]
Expand All @@ -39,6 +43,8 @@ pub fn routes() -> Router<AppState> {
Router::new()
.route("/system/update", post(trigger_update))
.route("/system/update/status", get(update_status))
.route("/system/update/pause", post(pause_updates))
.route("/system/update/resume", post(resume_updates))
.route("/system/ghcr-token", post(set_ghcr_token))
}

Expand Down Expand Up @@ -98,14 +104,32 @@ async fn update_status(

let desired = etcd_get(&mut client, ETCD_DESIRED_VERSION_KEY).await?;
let last_result = etcd_get(&mut client, ETCD_UPDATE_RESULT_KEY).await?;
let paused = etcd_get(&mut client, ETCD_PAUSED_KEY).await?.as_deref() == Some("true");

let binary_dir = env::var("BINARY_DIR").unwrap_or_else(|_| "/usr/local/bin".to_string());
let agent_version = binary_version(&format!("{}/csf-agent", binary_dir)).await;
let updater_version = binary_version(&format!("{}/csf-updater", binary_dir)).await;

Ok(Json(UpdateStatusResponse {
current_version: env!("CARGO_PKG_VERSION").to_string(),
desired_version: desired,
last_result,
paused,
agent_version,
updater_version,
}))
}

async fn binary_version(path: &str) -> Option<String> {
let output = tokio::process::Command::new(path)
.arg("--version")
.output()
.await
.ok()?;
let raw = String::from_utf8(output.stdout).ok()?;
raw.split_whitespace().last().map(|s| s.trim().to_string())
}

async fn etcd_get(client: &mut Client, key: &str) -> Result<Option<String>, StatusCode> {
let resp = client.get(key, None).await.map_err(|e| {
tracing::error!(error = %e, key = key, "failed to read from etcd");
Expand Down Expand Up @@ -186,6 +210,38 @@ async fn write_result(result: &str) {
}
}

async fn pause_updates(
_auth: CanManageSystem,
State(_state): State<AppState>,
) -> Result<StatusCode, StatusCode> {
let mut client = etcd_client().await?;
client
.put(ETCD_PAUSED_KEY, b"true", None)
.await
.map_err(|e| {
tracing::error!(error = %e, "failed to set update_paused in etcd");
StatusCode::INTERNAL_SERVER_ERROR
})?;
tracing::info!("updates paused");
Ok(StatusCode::NO_CONTENT)
}

async fn resume_updates(
_auth: CanManageSystem,
State(_state): State<AppState>,
) -> Result<StatusCode, StatusCode> {
let mut client = etcd_client().await?;
client
.delete(ETCD_PAUSED_KEY, None)
.await
.map_err(|e| {
tracing::error!(error = %e, "failed to delete update_paused from etcd");
StatusCode::INTERNAL_SERVER_ERROR
})?;
tracing::info!("updates resumed");
Ok(StatusCode::NO_CONTENT)
}

async fn set_ghcr_token(
_auth: CanManageSystem,
State(_state): State<AppState>,
Expand Down
3 changes: 3 additions & 0 deletions control-plane/csf-updater/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
aes-gcm = { workspace = true }
base64 = { workspace = true }
sha2 = { workspace = true }
hex = "0.4"
bytes = "1"
tempfile = "3"
6 changes: 6 additions & 0 deletions control-plane/csf-updater/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ pub struct Config {
pub compose_file: String,
pub poll_interval_secs: u64,
pub secret_encryption_key: String,
pub binary_dir: String,
pub github_release_base_url: String,
}

impl Config {
Expand All @@ -26,6 +28,10 @@ impl Config {
.unwrap_or(30),
secret_encryption_key: env::var("SECRET_ENCRYPTION_KEY")
.context("SECRET_ENCRYPTION_KEY must be set")?,
binary_dir: env::var("BINARY_DIR")
.unwrap_or_else(|_| "/usr/local/bin".to_string()),
github_release_base_url: env::var("GITHUB_RELEASE_BASE_URL")
.unwrap_or_else(|_| "https://github.com/csfx-cloud/CSF-Core/releases/download".to_string()),
})
}
}
1 change: 1 addition & 0 deletions control-plane/csf-updater/src/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::config::Config;
pub const DESIRED_VERSION_KEY: &str = "/csf/config/desired_cp_version";
pub const RESULT_KEY: &str = "/csf/config/last_update_result";
pub const GHCR_TOKEN_KEY: &str = "/csf/config/ghcr_token";
pub const PAUSED_KEY: &str = "/csf/config/update_paused";

pub struct Client {
inner: etcd_client::Client,
Expand Down
7 changes: 6 additions & 1 deletion control-plane/csf-updater/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ async fn main() -> anyhow::Result<()> {
async fn run_once(cfg: &config::Config, last_applied: &str) -> anyhow::Result<Option<String>> {
let mut etcd = etcd::Client::connect(cfg).await?;

if etcd.get(etcd::PAUSED_KEY).await?.as_deref() == Some("true") {
tracing::info!("updates paused, skipping");
return Ok(None);
}

let desired = match etcd.get(etcd::DESIRED_VERSION_KEY).await? {
Some(v) => v,
None => return Ok(None),
Expand Down Expand Up @@ -66,7 +71,7 @@ async fn run_once(cfg: &config::Config, last_applied: &str) -> anyhow::Result<Op
Err(e) => {
tracing::error!(error = %e, version = %desired, "update failed");
etcd.put(etcd::RESULT_KEY, "failed").await?;
Ok(None)
Ok(Some(desired))
}
}
}
Expand Down
91 changes: 90 additions & 1 deletion control-plane/csf-updater/src/updater.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::{bail, Result};
use sha2::{Digest, Sha256};
use std::process::Stdio;
use tokio::process::Command;
use tracing::info;
Expand All @@ -13,7 +14,9 @@ pub async fn run(cfg: &Config, version: &str, etcd: &mut etcd::Client) -> Result
pull(cfg, version, docker_config_dir.as_deref()).await?;
verify::verify_images(cfg, version, ghcr_auth.as_deref()).await?;
up(cfg, version, docker_config_dir.as_deref()).await?;
health_check(cfg, version).await
health_check(cfg, version).await?;
update_agent_binary(cfg, version).await?;
update_self_binary(cfg, version).await
}

async fn setup_docker_auth(cfg: &Config, etcd: &mut etcd::Client) -> Result<(Option<String>, Option<String>)> {
Expand Down Expand Up @@ -82,6 +85,92 @@ async fn health_check(cfg: &Config, version: &str) -> Result<()> {
Ok(())
}

async fn update_agent_binary(cfg: &Config, version: &str) -> Result<()> {
info!(version = %version, "updating csf-agent binary");
let arch = detect_arch();
let url = format!(
"{}/v{}/csf-agent-{}",
cfg.github_release_base_url, version, arch
);
let dest = format!("{}/csf-agent", cfg.binary_dir);
download_and_swap(&url, &dest).await?;
restart_unit("csf-daemon").await
}

async fn update_self_binary(cfg: &Config, version: &str) -> Result<()> {
info!(version = %version, "updating csf-updater binary");
let arch = detect_arch();
let url = format!(
"{}/v{}/csf-updater-{}",
cfg.github_release_base_url, version, arch
);
let dest = format!("{}/csf-updater", cfg.binary_dir);
download_and_swap(&url, &dest).await?;
restart_unit("csf-updater").await
}

async fn download_and_swap(url: &str, dest: &str) -> Result<()> {
let tmp = format!("{}.new", dest);

let bytes = fetch(url).await?;
let expected = fetch_checksum(&format!("{}.sha256", url)).await?;
verify_checksum(&bytes, &expected)?;

tokio::fs::write(&tmp, &bytes).await?;

let mut perms = tokio::fs::metadata(&tmp).await?.permissions();
std::os::unix::fs::PermissionsExt::set_mode(&mut perms, 0o750);
tokio::fs::set_permissions(&tmp, perms).await?;

tokio::fs::rename(&tmp, dest).await?;
info!(dest = %dest, "binary swapped");
Ok(())
}

async fn fetch(url: &str) -> Result<bytes::Bytes> {
let resp = reqwest::get(url).await?;
if !resp.status().is_success() {
bail!("failed to download {}: {}", url, resp.status());
}
Ok(resp.bytes().await?)
}

async fn fetch_checksum(url: &str) -> Result<String> {
let resp = reqwest::get(url).await?;
if !resp.status().is_success() {
bail!("failed to download checksum {}: {}", url, resp.status());
}
let text = resp.text().await?;
text.split_whitespace()
.next()
.map(|s| s.to_string())
.ok_or_else(|| anyhow::anyhow!("empty checksum file at {}", url))
}

fn verify_checksum(data: &[u8], expected: &str) -> Result<()> {
let digest = hex::encode(Sha256::digest(data));
if digest != expected {
bail!("checksum mismatch: expected={} got={}", expected, digest);
}
info!("checksum verified");
Ok(())
}

async fn restart_unit(unit: &str) -> Result<()> {
let status = Command::new("sudo")
.args(["systemctl", "restart", unit])
.status()
.await?;
if !status.success() {
bail!("systemctl restart {} failed: {}", unit, status);
}
Ok(())
}

fn detect_arch() -> &'static str {
if cfg!(target_arch = "aarch64") { "arm64" } else { "amd64" }
}

async fn compose(cfg: &Config, version: &str, docker_config_dir: Option<&str>, args: &[&str]) -> Result<()> {
let mut cmd_args = vec!["compose", "-f", cfg.compose_file.as_str()];
cmd_args.extend_from_slice(args);
Expand Down
14 changes: 8 additions & 6 deletions control-plane/csf-updater/src/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,19 @@ async fn remote_digest(client: &reqwest::Client, image: &str, tag: &str, ghcr_au

fn local_digest(image: &str) -> Result<String> {
let output = std::process::Command::new("docker")
.args(["inspect", "--format", "{{index .RepoDigests 0}}", image])
.args(["image", "inspect", "--format", "{{json .RepoDigests}}", image])
.output()?;

if !output.status.success() {
bail!("docker inspect failed for {}", image);
}

let raw = String::from_utf8(output.stdout)?;
raw.trim()
.split('@')
.nth(1)
.map(|s| s.to_string())
.ok_or_else(|| anyhow::anyhow!("could not parse digest from docker inspect output for {}", image))
let digests: Vec<String> = serde_json::from_str(raw.trim())
.map_err(|e| anyhow::anyhow!("failed to parse RepoDigests for {}: {}", image, e))?;

digests
.into_iter()
.find_map(|d| d.split('@').nth(1).map(|s| s.to_string()))
.ok_or_else(|| anyhow::anyhow!("no repo digest found for {}", image))
}
9 changes: 8 additions & 1 deletion nixos-node/modules/csf-daemon.nix
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ in
description = "The csf-agent package to use.";
};

binaryPath = lib.mkOption {
type = lib.types.str;
default = "/usr/local/bin/csf-agent";
description = "Path to the csf-agent binary. Can be overwritten by the updater.";
};

apiGateway = lib.mkOption {
type = lib.types.str;
example = "https://gateway.csf.example:8000";
Expand Down Expand Up @@ -42,6 +48,7 @@ in
users.users.csf-daemon = {
isSystemUser = true;
group = "csf-daemon";
extraGroups = [ "csf-updater" ];
home = "/var/lib/csf-daemon";
shell = pkgs.shadow;
description = "CSF daemon service user";
Expand Down Expand Up @@ -69,7 +76,7 @@ in
};

serviceConfig = {
ExecStart = "${cfg.package}/bin/csf-agent";
ExecStart = cfg.binaryPath;
Restart = "always";
RestartSec = "5s";
User = "csf-daemon";
Expand Down
Loading
Loading