Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ impl AmqpSettings {
}
}

/// Parses a boolean value from an environment variable string.
///
/// Recognizes common boolean representations: "1", "true", "TRUE"
/// Returns `true` if the value matches any of these, `false` otherwise.
pub fn parse_bool_env(value: &str) -> bool {
matches!(value, "1" | "true" | "TRUE")
}

pub fn get_configuration() -> Result<Settings, config::ConfigError> {
// Load environment variables from .env file
dotenvy::dotenv().ok();
Expand Down Expand Up @@ -262,7 +270,7 @@ pub fn get_configuration() -> Result<Settings, config::ConfigError> {
}

if let Ok(enabled) = std::env::var("STACKER_CASBIN_RELOAD_ENABLED") {
config.casbin_reload_enabled = matches!(enabled.as_str(), "1" | "true" | "TRUE");
config.casbin_reload_enabled = parse_bool_env(&enabled);
}

if let Ok(interval) = std::env::var("STACKER_CASBIN_RELOAD_INTERVAL_SECS") {
Expand Down Expand Up @@ -294,3 +302,27 @@ pub fn get_configuration() -> Result<Settings, config::ConfigError> {

Ok(config)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_parse_bool_env_true_values() {
assert!(parse_bool_env("1"));
assert!(parse_bool_env("true"));
assert!(parse_bool_env("TRUE"));
}

#[test]
fn test_parse_bool_env_false_values() {
assert!(!parse_bool_env("0"));
assert!(!parse_bool_env("false"));
assert!(!parse_bool_env("FALSE"));
assert!(!parse_bool_env(""));
assert!(!parse_bool_env("yes"));
assert!(!parse_bool_env("no"));
assert!(!parse_bool_env("True")); // Case-sensitive
assert!(!parse_bool_env("invalid"));
}
}
5 changes: 1 addition & 4 deletions src/connectors/user_service/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,7 @@ impl UserServiceConnector for UserServiceClient {

let url = format!("{}/api/1.0/products", self.base_url);

let mut req = self
.http_client
.get(&url)
.query(&[("where", &where_json)]);
let mut req = self.http_client.get(&url).query(&[("where", &where_json)]);
if let Some(auth) = self.auth_header() {
req = req.header("Authorization", auth);
}
Expand Down
8 changes: 2 additions & 6 deletions src/connectors/user_service/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,8 @@
pub(crate) fn is_plan_upgrade(user_plan: &str, required_plan: &str) -> bool {
let plan_hierarchy = vec!["basic", "professional", "enterprise"];

let user_level = plan_hierarchy
.iter()
.position(|&p| p == user_plan);
let required_level = plan_hierarchy
.iter()
.position(|&p| p == required_plan);
let user_level = plan_hierarchy.iter().position(|&p| p == user_plan);
let required_level = plan_hierarchy.iter().position(|&p| p == required_plan);

match (user_level, required_level) {
(Some(user_level), Some(required_level)) => user_level > required_level,
Expand Down
2 changes: 1 addition & 1 deletion src/db/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ pub async fn fetch_recent_by_deployment(
exclude_results: bool,
) -> Result<Vec<Command>, String> {
let query_span = tracing::info_span!("Fetching recent commands for deployment");

if exclude_results {
// Fetch commands without result/error fields to reduce payload size
sqlx::query_as::<_, Command>(
Expand Down
90 changes: 58 additions & 32 deletions src/mcp/tools/monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,27 @@ async fn wait_for_command_result(
command_id: &str,
) -> Result<Option<Command>, String> {
let wait_deadline = Instant::now() + Duration::from_secs(COMMAND_RESULT_TIMEOUT_SECS);

while Instant::now() < wait_deadline {
let fetched = db::command::fetch_by_command_id(pg_pool, command_id)
.await
.map_err(|e| format!("Failed to fetch command: {}", e))?
;

.map_err(|e| format!("Failed to fetch command: {}", e))?;

if let Some(cmd) = fetched {
let status = cmd.status.to_lowercase();
// Return if completed, failed, or has result/error
if status == "completed" || status == "failed" || cmd.result.is_some() || cmd.error.is_some() {
if status == "completed"
|| status == "failed"
|| cmd.result.is_some()
|| cmd.error.is_some()
{
return Ok(Some(cmd));
}
}

sleep(Duration::from_millis(COMMAND_POLL_INTERVAL_MS)).await;
}

Ok(None)
}

Expand Down Expand Up @@ -133,7 +136,9 @@ impl ToolHandler for GetContainerLogsTool {
.map_err(|e| format!("Failed to queue command: {}", e))?;

// Wait for result or timeout
let result = if let Some(cmd) = wait_for_command_result(&context.pg_pool, &command.command_id).await? {
let result = if let Some(cmd) =
wait_for_command_result(&context.pg_pool, &command.command_id).await?
{
let status = cmd.status.to_lowercase();
json!({
"status": status,
Expand Down Expand Up @@ -258,7 +263,9 @@ impl ToolHandler for GetContainerHealthTool {
.map_err(|e| format!("Failed to queue command: {}", e))?;

// Wait for result or timeout
let result = if let Some(cmd) = wait_for_command_result(&context.pg_pool, &command.command_id).await? {
let result = if let Some(cmd) =
wait_for_command_result(&context.pg_pool, &command.command_id).await?
{
let status = cmd.status.to_lowercase();
json!({
"status": status,
Expand Down Expand Up @@ -447,8 +454,10 @@ impl ToolHandler for DiagnoseDeploymentTool {
serde_json::from_value(args).map_err(|e| format!("Invalid arguments: {}", e))?;

// Create identifier and resolve with full info
let identifier =
DeploymentIdentifier::try_from_options(params.deployment_hash.clone(), params.deployment_id)?;
let identifier = DeploymentIdentifier::try_from_options(
params.deployment_hash.clone(),
params.deployment_id,
)?;
let resolver = create_resolver(context);
let info = resolver.resolve_with_info(&identifier).await?;

Expand All @@ -457,33 +466,48 @@ impl ToolHandler for DiagnoseDeploymentTool {
let mut domain = info.domain;
let mut server_ip = info.server_ip;
let mut apps_info: Option<Value> = info.apps.as_ref().map(|apps| {
json!(apps.iter().map(|a| json!({
"app_code": a.app_code,
"display_name": a.name,
"version": a.version,
"port": a.port
})).collect::<Vec<_>>())
json!(apps
.iter()
.map(|a| json!({
"app_code": a.app_code,
"display_name": a.name,
"version": a.version,
"port": a.port
}))
.collect::<Vec<_>>())
});

// For Stack Builder deployments (hash-based), fetch from Stacker's database
if params.deployment_hash.is_some() || (apps_info.is_none() && !deployment_hash.is_empty()) {
if params.deployment_hash.is_some() || (apps_info.is_none() && !deployment_hash.is_empty())
{
// Fetch deployment from Stacker DB
if let Ok(Some(deployment)) = db::deployment::fetch_by_deployment_hash(&context.pg_pool, &deployment_hash).await {
status = if deployment.status.is_empty() { "unknown".to_string() } else { deployment.status.clone() };

if let Ok(Some(deployment)) =
db::deployment::fetch_by_deployment_hash(&context.pg_pool, &deployment_hash).await
{
status = if deployment.status.is_empty() {
"unknown".to_string()
} else {
deployment.status.clone()
};

// Fetch apps from project
if let Ok(project_apps) = db::project_app::fetch_by_project(&context.pg_pool, deployment.project_id).await {
let apps_list: Vec<Value> = project_apps.iter().map(|app| {
json!({
"app_code": app.code,
"display_name": app.name,
"image": app.image,
"domain": app.domain,
"status": "configured"
if let Ok(project_apps) =
db::project_app::fetch_by_project(&context.pg_pool, deployment.project_id).await
{
let apps_list: Vec<Value> = project_apps
.iter()
.map(|app| {
json!({
"app_code": app.code,
"display_name": app.name,
"image": app.image,
"domain": app.domain,
"status": "configured"
})
})
}).collect();
.collect();
apps_info = Some(json!(apps_list));

// Try to get domain from first app if not set
if domain.is_none() {
domain = project_apps.iter().find_map(|a| a.domain.clone());
Expand Down Expand Up @@ -1201,7 +1225,9 @@ impl ToolHandler for GetServerResourcesTool {
.map_err(|e| format!("Failed to queue command: {}", e))?;

// Wait for result or timeout
let result = if let Some(cmd) = wait_for_command_result(&context.pg_pool, &command.command_id).await? {
let result = if let Some(cmd) =
wait_for_command_result(&context.pg_pool, &command.command_id).await?
{
let status = cmd.status.to_lowercase();
json!({
"status": status,
Expand Down
36 changes: 19 additions & 17 deletions src/mcp/tools/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,13 +264,11 @@ impl ToolHandler for CreateProjectAppTool {
}
project_id
} else if let Some(ref deployment_hash) = params.deployment_hash {
let deployment = db::deployment::fetch_by_deployment_hash(
&context.pg_pool,
deployment_hash,
)
.await
.map_err(|e| format!("Failed to lookup deployment: {}", e))?
.ok_or_else(|| "Deployment not found".to_string())?;
let deployment =
db::deployment::fetch_by_deployment_hash(&context.pg_pool, deployment_hash)
.await
.map_err(|e| format!("Failed to lookup deployment: {}", e))?
.ok_or_else(|| "Deployment not found".to_string())?;

if deployment.user_id != Some(context.user.id.clone()) {
return Err("Deployment not found".to_string());
Expand Down Expand Up @@ -302,19 +300,23 @@ impl ToolHandler for CreateProjectAppTool {
.map_err(|e| format!("Failed to search applications: {}", e))?;

let code_lower = code.to_lowercase();
let matched = apps.iter().find(|app| {
app.code
.as_deref()
.map(|c| c.to_lowercase() == code_lower)
.unwrap_or(false)
}).or_else(|| {
apps.iter().find(|app| {
app.name
let matched = apps
.iter()
.find(|app| {
app.code
.as_deref()
.map(|n| n.to_lowercase() == code_lower)
.map(|c| c.to_lowercase() == code_lower)
.unwrap_or(false)
})
}).or_else(|| apps.first());
.or_else(|| {
apps.iter().find(|app| {
app.name
.as_deref()
.map(|n| n.to_lowercase() == code_lower)
.unwrap_or(false)
})
})
.or_else(|| apps.first());

if let Some(app) = matched {
if resolved_image.is_empty() {
Expand Down
3 changes: 2 additions & 1 deletion src/middleware/authorization.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::configuration::parse_bool_env;
use actix_casbin_auth::{
casbin::{function_map::key_match2, CoreApi, DefaultModel},
CasbinService,
Expand Down Expand Up @@ -34,7 +35,7 @@ pub async fn try_new(db_connection_address: String) -> Result<CasbinService, Err
.matching_fn(Some(key_match2), None);

if std::env::var("STACKER_CASBIN_RELOAD_ENABLED")
.map(|value| matches!(value.as_str(), "1" | "true" | "TRUE"))
.map(|value| parse_bool_env(&value))
.unwrap_or(true)
{
let interval = std::env::var("STACKER_CASBIN_RELOAD_INTERVAL_SECS")
Expand Down
29 changes: 18 additions & 11 deletions src/routes/agent/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,23 @@ pub async fn snapshot_handler(
};

tracing::debug!("[SNAPSHOT HANDLER] Apps : {:?}", apps);

// Fetch recent health commands WITH results to populate container states
// (we always need health results for container status, even if include_command_results=false)
let health_commands = db::command::fetch_recent_by_deployment(
agent_pool.get_ref(),
&deployment_hash,
10, // Fetch last 10 health checks
10, // Fetch last 10 health checks
false, // Always include results for health commands
)
.await
.unwrap_or_default();

// Extract container states from recent health check commands
// Use a HashMap to keep only the most recent health check per app_code
let mut container_map: std::collections::HashMap<String, ContainerSnapshot> = std::collections::HashMap::new();

let mut container_map: std::collections::HashMap<String, ContainerSnapshot> =
std::collections::HashMap::new();

for cmd in health_commands.iter() {
if cmd.r#type == "health" && cmd.status == "completed" {
if let Some(result) = &cmd.result {
Expand All @@ -119,25 +120,31 @@ pub async fn snapshot_handler(
.ok()
.and_then(|v| v.as_str().map(String::from))
.map(|s| s.to_lowercase());

let container = ContainerSnapshot {
id: None,
app: Some(health.app_code.clone()),
state,
image: None,
name: None,
};

// Only insert if we don't have this app yet (keeps most recent due to DESC order)
container_map.entry(health.app_code.clone()).or_insert(container);
container_map
.entry(health.app_code.clone())
.or_insert(container);
}
}
}
}

let containers: Vec<ContainerSnapshot> = container_map.into_values().collect();

tracing::debug!("[SNAPSHOT HANDLER] Containers extracted from {} health checks: {:?}", health_commands.len(), containers);

tracing::debug!(
"[SNAPSHOT HANDLER] Containers extracted from {} health checks: {:?}",
health_commands.len(),
containers
);

let agent_snapshot = agent.map(|a| AgentSnapshot {
version: a.version,
Expand Down