Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
399 changes: 171 additions & 228 deletions .github/workflows/docker-build.yml

Large diffs are not rendered by default.

287 changes: 161 additions & 126 deletions .github/workflows/prerelease.yml

Large diffs are not rendered by default.

48 changes: 32 additions & 16 deletions agent/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::{Context, Result};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
use uuid::Uuid;

#[derive(Debug, Deserialize, Clone)]
Expand Down Expand Up @@ -124,23 +125,38 @@ impl ApiClient {
csr_pem,
};

let resp = self
.client
.post(&url)
.json(&body)
.send()
.await
.context("Failed to send registration request")?;

if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("Registration failed status={} body={}", status, body);
loop {
let resp = self
.client
.post(&url)
.json(&body)
.send()
.await
.context("Failed to send registration request")?;

if resp.status() == reqwest::StatusCode::TOO_MANY_REQUESTS {
let retry_after = resp
.headers()
.get("Retry-After")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(60);
tracing::warn!(retry_after, "Registration rate-limited, retrying");
tokio::time::sleep(Duration::from_secs(retry_after)).await;
continue;
}

if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("Registration failed status={} body={}", status, body);
}

return resp
.json::<RegisterResponse>()
.await
.context("Failed to parse registration response");
}

resp.json::<RegisterResponse>()
.await
.context("Failed to parse registration response")
}

pub async fn heartbeat(
Expand Down
6 changes: 3 additions & 3 deletions agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use serde::{Deserialize, Serialize};
use std::path::Path;
use uuid::Uuid;

const STATE_DIR: &str = "/var/lib/csfx-daemon";
const CREDENTIALS_FILE: &str = "/var/lib/csfx-daemon/credentials";
const CONFIG_FILE: &str = "/var/lib/csfx-daemon/config.json";
const STATE_DIR: &str = "/var/lib/csfx-agent";
const CREDENTIALS_FILE: &str = "/var/lib/csfx-agent/credentials";
const CONFIG_FILE: &str = "/var/lib/csfx-agent/config.json";

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DaemonConfig {
Expand Down
12 changes: 9 additions & 3 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ async fn perform_registration(
heartbeat_interval_secs: u64,
agent_pki: &pki::AgentPki,
) -> Result<(uuid::Uuid, String)> {
let token = match std::env::var("CSFX_REGISTRATION_TOKEN") {
Ok(t) => t,
Err(_) => {
let token = match std::env::var("CSFX_REGISTRATION_TOKEN").ok().filter(|t| !t.is_empty()) {
Some(t) => t,
None => {
info!("CSFX_REGISTRATION_TOKEN not set, fetching bootstrap token from gateway");
client
.fetch_bootstrap_token()
Expand Down Expand Up @@ -191,6 +191,12 @@ async fn run_heartbeat_loop(
failure_count = 0;
}

info!(
agent_id = %agent_id,
desired_flake_rev = ?resp.desired_flake_rev,
"heartbeat ok"
);

if let Some(count) = resp.post_update_heartbeats {
update_watch::write_heartbeat_counter(count).await;
}
Expand Down
8 changes: 4 additions & 4 deletions agent/src/pki.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use anyhow::{Context, Result};
use rcgen::{CertificateParams, DnType, KeyPair, PKCS_ECDSA_P256_SHA256};
use std::path::Path;

const KEY_FILE: &str = "/var/lib/csfx-daemon/agent.key";
const CSR_FILE: &str = "/var/lib/csfx-daemon/agent.csr";
const CERT_FILE: &str = "/var/lib/csfx-daemon/agent.crt";
const CA_FILE: &str = "/var/lib/csfx-daemon/ca.crt";
const KEY_FILE: &str = "/var/lib/csfx-agent/agent.key";
const CSR_FILE: &str = "/var/lib/csfx-agent/agent.csr";
const CERT_FILE: &str = "/var/lib/csfx-agent/agent.crt";
const CA_FILE: &str = "/var/lib/csfx-agent/ca.crt";

pub struct AgentPki {
key_pem: String,
Expand Down
39 changes: 37 additions & 2 deletions agent/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,46 @@ pub struct SystemMetrics {
pub uptime_seconds: u64,
}

fn parse_os_release_field(content: &str, field: &str) -> Option<String> {
content
.lines()
.find(|l| l.starts_with(field))
.and_then(|l| l.splitn(2, '=').nth(1))
.map(|v| v.trim_matches('"').to_string())
}

fn detect_os() -> (String, String) {
if let Ok(os_type) = std::env::var("CSFX_OS_TYPE") {
let os_version = std::env::var("CSFX_OS_VERSION")
.unwrap_or_else(|_| System::os_version().unwrap_or_else(|| "unknown".to_string()));
return (os_type.to_lowercase(), os_version);
}

if let Ok(content) = std::fs::read_to_string("/etc/os-release") {
let id = parse_os_release_field(&content, "ID");
let version = parse_os_release_field(&content, "VERSION_ID")
.or_else(|| parse_os_release_field(&content, "BUILD_ID"));

if let Some(os_type) = id {
let os_version = version.unwrap_or_else(|| {
System::os_version().unwrap_or_else(|| "unknown".to_string())
});
return (os_type.to_lowercase(), os_version);
}
}

(
System::name().unwrap_or_else(|| "linux".to_string()).to_lowercase(),
System::os_version().unwrap_or_else(|| "unknown".to_string()),
)
}

pub fn collect_info() -> SystemInfo {
let (os_type, os_version) = detect_os();
SystemInfo {
hostname: System::host_name().unwrap_or_else(|| "unknown".to_string()),
os_type: System::name().unwrap_or_else(|| "linux".to_string()).to_lowercase(),
os_version: System::os_version().unwrap_or_else(|| "unknown".to_string()),
os_type,
os_version,
architecture: std::env::consts::ARCH.to_string(),
}
}
Expand Down
9 changes: 7 additions & 2 deletions control-plane/api-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,13 @@ async fn main() {
.merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
.with_state(state);

let addr = SocketAddr::from(([0, 0, 0, 0], 8000));
tracing::info!(addr = %addr, "listening");
let port = std::env::var("GATEWAY_PORT")
.ok()
.and_then(|p| p.parse::<u16>().ok())
.unwrap_or(8000);
let listen_addr = std::env::var("LISTEN_ADDR").unwrap_or_else(|_| "0.0.0.0".to_string());
let addr: SocketAddr = format!("{}:{}", listen_addr, port).parse().unwrap();
tracing::info!(version = env!("CARGO_PKG_VERSION"), addr = %addr, "listening");
let listener = tokio::net::TcpListener::bind(addr).await.unwrap();
axum::serve(
listener,
Expand Down
14 changes: 8 additions & 6 deletions control-plane/api-gateway/src/routes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,25 +75,27 @@ pub fn create_router() -> Router<AppState> {
.allow_headers(vec![AUTHORIZATION, ACCEPT, CONTENT_TYPE])
.allow_credentials(true);

let internal_api_router = Router::new()
.merge(registry::registry_routes());

let api_router = Router::new()
.merge(agents::agents_routes())
.merge(networks::networks_routes())
.merge(organizations::routes())
.merge(registry::registry_routes())
.merge(system::routes())
.merge(update::routes())
.merge(users::users_routes())
.merge(volumes::volumes_routes())
.merge(workloads::workloads_routes())
.merge(events::events_routes());
.merge(events::events_routes())
.layer(GovernorLayer {
config: governor_config,
});

Router::new()
.route("/metrics", get(metrics::metrics_handler))
// API routes
.logged_nest("/api", api_router)
.layer(GovernorLayer {
config: governor_config,
})
.logged_nest("/api", internal_api_router)
.layer(
TraceLayer::new_for_http()
.make_span_with(|request: &Request<Body>| {
Expand Down
3 changes: 2 additions & 1 deletion control-plane/failover-controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ async fn main() -> anyhow::Result<()> {
.and_then(|p| p.parse::<u16>().ok())
.unwrap_or(8004);

let addr = SocketAddr::from(([0, 0, 0, 0], port));
let listen_addr = std::env::var("LISTEN_ADDR").unwrap_or_else(|_| "127.0.0.1".to_string());
let addr: SocketAddr = format!("{}:{}", listen_addr, port).parse().unwrap();
log_info!(
"main",
&format!("Failover Controller listening port={}", port)
Expand Down
43 changes: 43 additions & 0 deletions control-plane/registry/src/db/agents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,49 @@ use sea_orm::{
};
use uuid::Uuid;

pub async fn get_by_hostname(
db: &DatabaseConnection,
hostname: &str,
) -> Result<Option<agents::Model>> {
Ok(agents::Entity::find()
.filter(agents::Column::Hostname.eq(hostname))
.one(db)
.await?)
}

pub async fn update_registration(
db: &DatabaseConnection,
agent_id: Uuid,
agent_version: String,
os_type: String,
os_version: String,
architecture: String,
tags: Option<serde_json::Value>,
public_key_pem: Option<String>,
) -> Result<agents::Model> {
let mut agent: agents::ActiveModel = agents::Entity::find_by_id(agent_id)
.one(db)
.await?
.ok_or_else(|| anyhow::anyhow!("Agent not found"))?
.into();

agent.agent_version = Set(agent_version);
agent.os_type = Set(os_type);
agent.os_version = Set(os_version);
agent.architecture = Set(architecture);
agent.status = Set("Online".to_string());
agent.last_heartbeat = Set(Some(chrono::Utc::now().naive_utc()));
agent.updated_at = Set(Some(chrono::Utc::now().naive_utc()));
if tags.is_some() {
agent.tags = Set(tags);
}
if public_key_pem.is_some() {
agent.public_key_pem = Set(public_key_pem);
}

Ok(agent.update(db).await?)
}

pub async fn create(
db: &DatabaseConnection,
id: Uuid,
Expand Down
Loading
Loading