Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/configuration/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use code0_flow::flow_config::{env_with_default, environment::Environment, mode::Mode};

pub mod state;

/// Struct for all relevant `Aquila` startup configurations
pub struct Config {
/// Aquila environment
Expand Down
27 changes: 27 additions & 0 deletions src/configuration/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

/// Tracks readiness of each external service.
#[derive(Clone)]
pub struct AppReadiness {
// Readiness state of Sagittarius service
pub sagittarius_ready: Arc<AtomicBool>,
}

impl Default for AppReadiness {
fn default() -> Self {
Self::new()
}
}

impl AppReadiness {
pub fn new() -> Self {
Self {
sagittarius_ready: Arc::new(AtomicBool::new(false)),
}
}

pub fn is_ready(&self) -> bool {
self.sagittarius_ready.load(Ordering::SeqCst)
}
}
56 changes: 47 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use crate::{configuration::Config as AquilaConfig, flow::get_flow_identifier};
use crate::{
configuration::{Config as AquilaConfig, state::AppReadiness},
flow::get_flow_identifier,
sagittarius::retry::create_channel_with_retry,
};
use async_nats::jetstream::kv::Config;
use code0_flow::flow_config::load_env_file;
use prost::Message;
use sagittarius::flow_service_client_impl::SagittariusFlowClient;
use serde_json::from_str;
use server::AquilaGRPCServer;
use std::{fs::File, io::Read, sync::Arc};
use std::{fs::File, io::Read, sync::Arc, time::Duration};
use tucana::shared::Flows;

pub mod authorization;
Expand All @@ -27,6 +31,7 @@ async fn main() {
// Load environment variables from .env file
load_env_file();
let config = AquilaConfig::new();
let app_readiness = AppReadiness::new();

//Create connection to JetStream
let client = match async_nats::connect(config.nats_url.clone()).await {
Expand Down Expand Up @@ -66,9 +71,14 @@ async fn main() {
return;
}

let server = AquilaGRPCServer::new(&config);
let backend_url_flow = config.backend_url.clone();
let runtime_token_flow = config.runtime_token.clone();
let sagittarius_channel = create_channel_with_retry(
"Sagittarius Endpoint",
backend_url_flow,
app_readiness.sagittarius_ready.clone(),
)
.await;
let server = AquilaGRPCServer::new(&config, app_readiness.clone(), sagittarius_channel.clone());
let kv_for_flow = kv_store.clone();

let mut server_task = tokio::spawn(async move {
Expand All @@ -80,17 +90,45 @@ async fn main() {
});

let env = match config.environment {
code0_flow::flow_config::environment::Environment::Development => String::from("DEVELOPMENT"),
code0_flow::flow_config::environment::Environment::Development => {
String::from("DEVELOPMENT")
}
code0_flow::flow_config::environment::Environment::Staging => String::from("STAGING"),
code0_flow::flow_config::environment::Environment::Production => String::from("PRODUCTION"),
};

let mut flow_task = tokio::spawn(async move {
let mut flow_client =
SagittariusFlowClient::new(backend_url_flow, kv_for_flow, env, runtime_token_flow).await;
let mut backoff = Duration::from_millis(200);
let max_backoff = Duration::from_secs(10);

loop {
let ch = create_channel_with_retry(
"Sagittarius Stream",
config.backend_url.clone(),
app_readiness.sagittarius_ready.clone(),
)
.await;

let mut flow_client = SagittariusFlowClient::new(
kv_for_flow.clone(),
env.clone(),
config.runtime_token.clone(),
ch,
app_readiness.sagittarius_ready.clone(),
);

flow_client.init_flow_stream().await;
log::warn!("Flow stream task exited");
match flow_client.init_flow_stream().await {
Ok(_) => {
log::warn!("Flow stream ended cleanly. Reconnecting...");
}
Err(e) => {
log::warn!("Flow stream dropped: {:?}. Reconnecting...", e);
}
}

tokio::time::sleep(backoff).await;
backoff = std::cmp::min(backoff * 2, max_backoff);
}
});

#[cfg(unix)]
Expand Down
32 changes: 10 additions & 22 deletions src/sagittarius/data_type_service_client_impl.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::authorization::authorization::get_authorization_metadata;
use std::sync::Arc;
use tokio::sync::Mutex;
use tonic::{Extensions, Request, transport::Channel};
use tonic::transport::Channel;
use tonic::{Extensions, Request};
use tucana::sagittarius::{
DataTypeUpdateRequest as SagittariusDataTypeUpdateRequest,
data_type_service_client::DataTypeServiceClient,
Expand All @@ -16,21 +15,8 @@ pub struct SagittariusDataTypeServiceClient {
}

impl SagittariusDataTypeServiceClient {
pub async fn new_arc(sagittarius_url: String, token: String) -> Arc<Mutex<Self>> {
Arc::new(Mutex::new(Self::new(sagittarius_url, token).await))
}

pub async fn new(sagittarius_url: String, token: String) -> Self {
let client = match DataTypeServiceClient::connect(sagittarius_url).await {
Ok(client) => {
log::info!("Successfully connected to Sagittarius DataType Endpoint!");
client
}
Err(err) => panic!(
"Failed to connect to Sagittarius (DataType Endpoint): {:?}",
err
),
};
pub fn new(channel: Channel, token: String) -> Self {
let client = DataTypeServiceClient::new(channel);

Self { client, token }
}
Expand All @@ -49,10 +35,7 @@ impl SagittariusDataTypeServiceClient {

let response = match self.client.update(request).await {
Ok(response) => {
log::info!(
"Successfully transferred data types. Did Sagittarius updated them? {:?}",
&response
);
log::info!("Successfully transferred data types.",);
response.into_inner()
}
Err(err) => {
Expand All @@ -61,6 +44,11 @@ impl SagittariusDataTypeServiceClient {
}
};

match response.success {
true => log::info!("Sagittarius successfully updated DataTypes."),
false => log::error!("Sagittarius didn't update any DataTypes."),
};

AquilaDataTypeUpdateResponse {
success: response.success,
}
Expand Down
75 changes: 38 additions & 37 deletions src/sagittarius/flow_service_client_impl.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,48 @@
use crate::{authorization::authorization::get_authorization_metadata, flow::get_flow_identifier};
use futures::{StreamExt, TryStreamExt};
use prost::Message;
use tokio::fs;
use std::{path::Path, sync::Arc};
use tokio::fs;
use tonic::{Extensions, Request, transport::Channel};
use tucana::{sagittarius::{
FlowLogonRequest, FlowResponse, flow_response::Data, flow_service_client::FlowServiceClient,
}, shared::{Flows, ValidationFlow}};
use tucana::{
sagittarius::{
FlowLogonRequest, FlowResponse, flow_response::Data, flow_service_client::FlowServiceClient,
},
shared::Flows,
};

use crate::{authorization::authorization::get_authorization_metadata, flow::get_flow_identifier};
use std::sync::atomic::{AtomicBool, Ordering};

#[derive(Clone)]
pub struct SagittariusFlowClient {
store: Arc<async_nats::jetstream::kv::Store>,
client: FlowServiceClient<Channel>,
env: String,
token: String,
sagittarius_ready: Arc<AtomicBool>,
}

impl SagittariusFlowClient {
pub async fn new(
sagittarius_url: String,
pub fn new(
store: Arc<async_nats::jetstream::kv::Store>,
env: String,
token: String,
channel: Channel,
sagittarius_ready: Arc<AtomicBool>,
) -> SagittariusFlowClient {
let client = match FlowServiceClient::connect(sagittarius_url).await {
Ok(res) => {
log::info!("Successfully connected to Sagittarius Flow Endpoint!");
res
}
Err(err) => panic!(
"Failed to connect to Sagittarius (Flow Endpoint): {:?}",
err
),
};
let client = FlowServiceClient::new(channel);

SagittariusFlowClient {
store,
client,
env,
token,
sagittarius_ready,
}
}

fn is_development(&self) -> bool {
self.env == String::from("DEVELOPMENT")
self.env == "DEVELOPMENT"
}

async fn export_flows_json_overwrite(&self, flows: Flows) {
Expand Down Expand Up @@ -79,7 +77,11 @@ impl SagittariusFlowClient {
}
}

log::info!("Exported {} flows to {}", flows.flows.len(), final_path.display());
log::info!(
"Exported {} flows to {}",
flows.flows.len(),
final_path.display()
);
}

async fn handle_response(&mut self, response: FlowResponse) {
Expand All @@ -95,7 +97,6 @@ impl SagittariusFlowClient {
};

match data {
// Will delete the flow id it receives
Data::DeletedFlowId(id) => {
log::info!("Deleting the Flow with the id: {}", id);
let identifier = format!("{}::*", id);
Expand All @@ -104,7 +105,6 @@ impl SagittariusFlowClient {
Err(err) => log::error!("Failed to delete flow. Reason: {:?}", err),
};
}
//Will update the flow it receives
Data::UpdatedFlow(flow) => {
log::info!("Updating the Flow with the id: {}", &flow.flow_id);
let key = get_flow_identifier(&flow);
Expand All @@ -114,11 +114,9 @@ impl SagittariusFlowClient {
Err(err) => log::error!("Failed to update flow. Reason: {:?}", err),
};
}
//WIll drop all flows that it holds and insert all new ones
Data::Flows(flows) => {
log::info!("Dropping all Flows & inserting the new ones!");

// Writing all flows into an output if its in `DEVELOPMENT`

self.export_flows_json_overwrite(flows.clone()).await;

let mut keys = match self.store.keys().await {
Expand All @@ -132,9 +130,7 @@ impl SagittariusFlowClient {
let mut purged_count = 0;
while let Ok(Some(key)) = keys.try_next().await {
match self.store.purge(&key).await {
Ok(_) => {
purged_count += 1;
}
Ok(_) => purged_count += 1,
Err(e) => log::error!("Failed to purge key {}: {}", key, e),
}
}
Expand All @@ -154,7 +150,9 @@ impl SagittariusFlowClient {
}
}

pub async fn init_flow_stream(&mut self) {
pub async fn init_flow_stream(&mut self) -> Result<(), tonic::Status> {
self.sagittarius_ready.store(false, Ordering::SeqCst);

let request = Request::from_parts(
get_authorization_metadata(&self.token),
Extensions::new(),
Expand All @@ -164,14 +162,13 @@ impl SagittariusFlowClient {
let response = match self.client.update(request).await {
Ok(res) => {
log::info!("Successfully established a Stream (for Flows)");
self.sagittarius_ready.store(true, Ordering::SeqCst);
res
}
Err(status) => {
log::error!(
"Received a {:?}, can't retrieve flows from Sagittarius",
status
);
return;
self.sagittarius_ready.store(false, Ordering::SeqCst);
log::warn!("Failed to establish Flow stream: {:?}", status);
return Err(status);
}
};

Expand All @@ -183,12 +180,16 @@ impl SagittariusFlowClient {
self.handle_response(res).await;
}
Err(status) => {
log::error!(
"Received a {:?}, can't retrieve flows from Sagittarius",
status
);
self.sagittarius_ready.store(false, Ordering::SeqCst);
log::warn!("Flow stream error (will reconnect): {:?}", status);
return Err(status);
}
};
}

// Stream ended without an explicit error
self.sagittarius_ready.store(false, Ordering::SeqCst);
log::warn!("Flow stream ended (server closed). Will reconnect.");
Err(tonic::Status::unavailable("flow stream ended"))
}
}
Loading