diff --git a/src/configuration/mod.rs b/src/configuration/mod.rs index b54e82c..618b7e0 100644 --- a/src/configuration/mod.rs +++ b/src/configuration/mod.rs @@ -1,5 +1,7 @@ use code0_flow::flow_config::{env_with_default, environment::Environment, mode::Mode}; +pub mod state; + /// Struct for all relevant `Aquila` startup configurations pub struct Config { /// Aquila environment diff --git a/src/configuration/state.rs b/src/configuration/state.rs new file mode 100644 index 0000000..ed5e9b2 --- /dev/null +++ b/src/configuration/state.rs @@ -0,0 +1,27 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; + +/// Tracks readiness of each external service. +#[derive(Clone)] +pub struct AppReadiness { + // Readiness state of Sagittarius service + pub sagittarius_ready: Arc, +} + +impl Default for AppReadiness { + fn default() -> Self { + Self::new() + } +} + +impl AppReadiness { + pub fn new() -> Self { + Self { + sagittarius_ready: Arc::new(AtomicBool::new(false)), + } + } + + pub fn is_ready(&self) -> bool { + self.sagittarius_ready.load(Ordering::SeqCst) + } +} diff --git a/src/main.rs b/src/main.rs index 20784dc..ee4d0a7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,15 @@ -use crate::{configuration::Config as AquilaConfig, flow::get_flow_identifier}; +use crate::{ + configuration::{Config as AquilaConfig, state::AppReadiness}, + flow::get_flow_identifier, + sagittarius::retry::create_channel_with_retry, +}; use async_nats::jetstream::kv::Config; use code0_flow::flow_config::load_env_file; use prost::Message; use sagittarius::flow_service_client_impl::SagittariusFlowClient; use serde_json::from_str; use server::AquilaGRPCServer; -use std::{fs::File, io::Read, sync::Arc}; +use std::{fs::File, io::Read, sync::Arc, time::Duration}; use tucana::shared::Flows; pub mod authorization; @@ -27,6 +31,7 @@ async fn main() { // Load environment variables from .env file load_env_file(); let config = AquilaConfig::new(); + let app_readiness = AppReadiness::new(); //Create connection to JetStream let client = match async_nats::connect(config.nats_url.clone()).await { @@ -66,9 +71,14 @@ async fn main() { return; } - let server = AquilaGRPCServer::new(&config); let backend_url_flow = config.backend_url.clone(); - let runtime_token_flow = config.runtime_token.clone(); + let sagittarius_channel = create_channel_with_retry( + "Sagittarius Endpoint", + backend_url_flow, + app_readiness.sagittarius_ready.clone(), + ) + .await; + let server = AquilaGRPCServer::new(&config, app_readiness.clone(), sagittarius_channel.clone()); let kv_for_flow = kv_store.clone(); let mut server_task = tokio::spawn(async move { @@ -80,17 +90,45 @@ async fn main() { }); let env = match config.environment { - code0_flow::flow_config::environment::Environment::Development => String::from("DEVELOPMENT"), + code0_flow::flow_config::environment::Environment::Development => { + String::from("DEVELOPMENT") + } code0_flow::flow_config::environment::Environment::Staging => String::from("STAGING"), code0_flow::flow_config::environment::Environment::Production => String::from("PRODUCTION"), }; let mut flow_task = tokio::spawn(async move { - let mut flow_client = - SagittariusFlowClient::new(backend_url_flow, kv_for_flow, env, runtime_token_flow).await; + let mut backoff = Duration::from_millis(200); + let max_backoff = Duration::from_secs(10); + + loop { + let ch = create_channel_with_retry( + "Sagittarius Stream", + config.backend_url.clone(), + app_readiness.sagittarius_ready.clone(), + ) + .await; + + let mut flow_client = SagittariusFlowClient::new( + kv_for_flow.clone(), + env.clone(), + config.runtime_token.clone(), + ch, + app_readiness.sagittarius_ready.clone(), + ); - flow_client.init_flow_stream().await; - log::warn!("Flow stream task exited"); + match flow_client.init_flow_stream().await { + Ok(_) => { + log::warn!("Flow stream ended cleanly. Reconnecting..."); + } + Err(e) => { + log::warn!("Flow stream dropped: {:?}. Reconnecting...", e); + } + } + + tokio::time::sleep(backoff).await; + backoff = std::cmp::min(backoff * 2, max_backoff); + } }); #[cfg(unix)] diff --git a/src/sagittarius/data_type_service_client_impl.rs b/src/sagittarius/data_type_service_client_impl.rs index 7b2b109..e6520e4 100644 --- a/src/sagittarius/data_type_service_client_impl.rs +++ b/src/sagittarius/data_type_service_client_impl.rs @@ -1,7 +1,6 @@ use crate::authorization::authorization::get_authorization_metadata; -use std::sync::Arc; -use tokio::sync::Mutex; -use tonic::{Extensions, Request, transport::Channel}; +use tonic::transport::Channel; +use tonic::{Extensions, Request}; use tucana::sagittarius::{ DataTypeUpdateRequest as SagittariusDataTypeUpdateRequest, data_type_service_client::DataTypeServiceClient, @@ -16,21 +15,8 @@ pub struct SagittariusDataTypeServiceClient { } impl SagittariusDataTypeServiceClient { - pub async fn new_arc(sagittarius_url: String, token: String) -> Arc> { - Arc::new(Mutex::new(Self::new(sagittarius_url, token).await)) - } - - pub async fn new(sagittarius_url: String, token: String) -> Self { - let client = match DataTypeServiceClient::connect(sagittarius_url).await { - Ok(client) => { - log::info!("Successfully connected to Sagittarius DataType Endpoint!"); - client - } - Err(err) => panic!( - "Failed to connect to Sagittarius (DataType Endpoint): {:?}", - err - ), - }; + pub fn new(channel: Channel, token: String) -> Self { + let client = DataTypeServiceClient::new(channel); Self { client, token } } @@ -49,10 +35,7 @@ impl SagittariusDataTypeServiceClient { let response = match self.client.update(request).await { Ok(response) => { - log::info!( - "Successfully transferred data types. Did Sagittarius updated them? {:?}", - &response - ); + log::info!("Successfully transferred data types.",); response.into_inner() } Err(err) => { @@ -61,6 +44,11 @@ impl SagittariusDataTypeServiceClient { } }; + match response.success { + true => log::info!("Sagittarius successfully updated DataTypes."), + false => log::error!("Sagittarius didn't update any DataTypes."), + }; + AquilaDataTypeUpdateResponse { success: response.success, } diff --git a/src/sagittarius/flow_service_client_impl.rs b/src/sagittarius/flow_service_client_impl.rs index b17b097..f329d6b 100644 --- a/src/sagittarius/flow_service_client_impl.rs +++ b/src/sagittarius/flow_service_client_impl.rs @@ -1,13 +1,17 @@ +use crate::{authorization::authorization::get_authorization_metadata, flow::get_flow_identifier}; use futures::{StreamExt, TryStreamExt}; use prost::Message; -use tokio::fs; use std::{path::Path, sync::Arc}; +use tokio::fs; use tonic::{Extensions, Request, transport::Channel}; -use tucana::{sagittarius::{ - FlowLogonRequest, FlowResponse, flow_response::Data, flow_service_client::FlowServiceClient, -}, shared::{Flows, ValidationFlow}}; +use tucana::{ + sagittarius::{ + FlowLogonRequest, FlowResponse, flow_response::Data, flow_service_client::FlowServiceClient, + }, + shared::Flows, +}; -use crate::{authorization::authorization::get_authorization_metadata, flow::get_flow_identifier}; +use std::sync::atomic::{AtomicBool, Ordering}; #[derive(Clone)] pub struct SagittariusFlowClient { @@ -15,36 +19,30 @@ pub struct SagittariusFlowClient { client: FlowServiceClient, env: String, token: String, + sagittarius_ready: Arc, } impl SagittariusFlowClient { - pub async fn new( - sagittarius_url: String, + pub fn new( store: Arc, env: String, token: String, + channel: Channel, + sagittarius_ready: Arc, ) -> SagittariusFlowClient { - let client = match FlowServiceClient::connect(sagittarius_url).await { - Ok(res) => { - log::info!("Successfully connected to Sagittarius Flow Endpoint!"); - res - } - Err(err) => panic!( - "Failed to connect to Sagittarius (Flow Endpoint): {:?}", - err - ), - }; + let client = FlowServiceClient::new(channel); SagittariusFlowClient { store, client, env, token, + sagittarius_ready, } } fn is_development(&self) -> bool { - self.env == String::from("DEVELOPMENT") + self.env == "DEVELOPMENT" } async fn export_flows_json_overwrite(&self, flows: Flows) { @@ -79,7 +77,11 @@ impl SagittariusFlowClient { } } - log::info!("Exported {} flows to {}", flows.flows.len(), final_path.display()); + log::info!( + "Exported {} flows to {}", + flows.flows.len(), + final_path.display() + ); } async fn handle_response(&mut self, response: FlowResponse) { @@ -95,7 +97,6 @@ impl SagittariusFlowClient { }; match data { - // Will delete the flow id it receives Data::DeletedFlowId(id) => { log::info!("Deleting the Flow with the id: {}", id); let identifier = format!("{}::*", id); @@ -104,7 +105,6 @@ impl SagittariusFlowClient { Err(err) => log::error!("Failed to delete flow. Reason: {:?}", err), }; } - //Will update the flow it receives Data::UpdatedFlow(flow) => { log::info!("Updating the Flow with the id: {}", &flow.flow_id); let key = get_flow_identifier(&flow); @@ -114,11 +114,9 @@ impl SagittariusFlowClient { Err(err) => log::error!("Failed to update flow. Reason: {:?}", err), }; } - //WIll drop all flows that it holds and insert all new ones Data::Flows(flows) => { log::info!("Dropping all Flows & inserting the new ones!"); - - // Writing all flows into an output if its in `DEVELOPMENT` + self.export_flows_json_overwrite(flows.clone()).await; let mut keys = match self.store.keys().await { @@ -132,9 +130,7 @@ impl SagittariusFlowClient { let mut purged_count = 0; while let Ok(Some(key)) = keys.try_next().await { match self.store.purge(&key).await { - Ok(_) => { - purged_count += 1; - } + Ok(_) => purged_count += 1, Err(e) => log::error!("Failed to purge key {}: {}", key, e), } } @@ -154,7 +150,9 @@ impl SagittariusFlowClient { } } - pub async fn init_flow_stream(&mut self) { + pub async fn init_flow_stream(&mut self) -> Result<(), tonic::Status> { + self.sagittarius_ready.store(false, Ordering::SeqCst); + let request = Request::from_parts( get_authorization_metadata(&self.token), Extensions::new(), @@ -164,14 +162,13 @@ impl SagittariusFlowClient { let response = match self.client.update(request).await { Ok(res) => { log::info!("Successfully established a Stream (for Flows)"); + self.sagittarius_ready.store(true, Ordering::SeqCst); res } Err(status) => { - log::error!( - "Received a {:?}, can't retrieve flows from Sagittarius", - status - ); - return; + self.sagittarius_ready.store(false, Ordering::SeqCst); + log::warn!("Failed to establish Flow stream: {:?}", status); + return Err(status); } }; @@ -183,12 +180,16 @@ impl SagittariusFlowClient { self.handle_response(res).await; } Err(status) => { - log::error!( - "Received a {:?}, can't retrieve flows from Sagittarius", - status - ); + self.sagittarius_ready.store(false, Ordering::SeqCst); + log::warn!("Flow stream error (will reconnect): {:?}", status); + return Err(status); } }; } + + // Stream ended without an explicit error + self.sagittarius_ready.store(false, Ordering::SeqCst); + log::warn!("Flow stream ended (server closed). Will reconnect."); + Err(tonic::Status::unavailable("flow stream ended")) } } diff --git a/src/sagittarius/flow_type_service_client_impl.rs b/src/sagittarius/flow_type_service_client_impl.rs index 685c9c7..033d85b 100644 --- a/src/sagittarius/flow_type_service_client_impl.rs +++ b/src/sagittarius/flow_type_service_client_impl.rs @@ -1,6 +1,4 @@ use crate::authorization::authorization::get_authorization_metadata; -use std::sync::Arc; -use tokio::sync::Mutex; use tonic::Extensions; use tonic::Request; use tonic::transport::Channel; @@ -15,21 +13,8 @@ pub struct SagittariusFlowTypeServiceClient { } impl SagittariusFlowTypeServiceClient { - pub async fn new_arc(sagittarius_url: String, token: String) -> Arc> { - Arc::new(Mutex::new(Self::new(sagittarius_url, token).await)) - } - - pub async fn new(sagittarius_url: String, token: String) -> Self { - let client = match FlowTypeServiceClient::connect(sagittarius_url).await { - Ok(client) => { - log::info!("Successfully connected to Sagittarius FlowType Endpoint!"); - client - } - Err(err) => panic!( - "Failed to connect to Sagittarius (FlowType Endpoint): {:?}", - err - ), - }; + pub fn new(channel: Channel, token: String) -> Self { + let client = FlowTypeServiceClient::new(channel); Self { client, token } } @@ -48,10 +33,7 @@ impl SagittariusFlowTypeServiceClient { let response = match self.client.update(request).await { Ok(response) => { - log::info!( - "Successfully transferred FlowTypes. Did Sagittarius updated them? {:?}", - &response - ); + log::info!("Successfully transferred FlowTypes.",); response.into_inner() } Err(err) => { @@ -60,6 +42,11 @@ impl SagittariusFlowTypeServiceClient { } }; + match response.success { + true => log::info!("Sagittarius successfully updated FlowTypes."), + false => log::error!("Sagittarius didn't update any FlowTypes."), + }; + AquilaFlowTypeUpdateResponse { success: response.success, } diff --git a/src/sagittarius/mod.rs b/src/sagittarius/mod.rs index 7fe9d4f..33405d3 100644 --- a/src/sagittarius/mod.rs +++ b/src/sagittarius/mod.rs @@ -2,5 +2,6 @@ pub mod action_service_client_impl; pub mod data_type_service_client_impl; pub mod flow_service_client_impl; pub mod flow_type_service_client_impl; +pub mod retry; pub mod runtime_function_service_client_impl; pub mod test_execution_client_impl; diff --git a/src/sagittarius/retry.rs b/src/sagittarius/retry.rs new file mode 100644 index 0000000..c2613da --- /dev/null +++ b/src/sagittarius/retry.rs @@ -0,0 +1,61 @@ +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; + +use tokio::time::{Duration, sleep}; +use tonic::transport::{Channel, Endpoint}; + +const MAX_BACKOFF: u64 = 2000 * 60; +const MAX_RETRIES: i8 = 10; + +// Will create a channel and retry if its not possible +pub async fn create_channel_with_retry( + channel_name: &str, + url: String, + ready: Arc, +) -> Channel { + let mut backoff = 100; + let mut retries = 0; + + loop { + ready.store(false, Ordering::SeqCst); + + let channel = match Endpoint::from_shared(url.clone()) { + Ok(c) => { + log::debug!("Creating a new endpoint for the: {} Service", channel_name); + c.connect_timeout(Duration::from_secs(2)) + .timeout(Duration::from_secs(10)) + } + Err(err) => { + panic!( + "Cannot create Endpoint for Service: `{}`. Reason: {:?}", + channel_name, err + ); + } + }; + + match channel.connect().await { + Ok(ch) => { + return ch; + } + Err(err) => { + log::warn!( + "Retry connect to `{}` using url: `{}` failed: {:?}, retrying in {}ms", + channel_name, + url, + err, + backoff + ); + sleep(Duration::from_millis(backoff)).await; + + backoff = (backoff * 2).min(MAX_BACKOFF); + retries += 1; + + if retries >= MAX_RETRIES { + panic!("Reached max retries to url {}", url) + } + } + } + } +} diff --git a/src/sagittarius/runtime_function_service_client_impl.rs b/src/sagittarius/runtime_function_service_client_impl.rs index bf0f7e9..6450a9b 100644 --- a/src/sagittarius/runtime_function_service_client_impl.rs +++ b/src/sagittarius/runtime_function_service_client_impl.rs @@ -15,23 +15,13 @@ pub struct SagittariusRuntimeFunctionServiceClient { } impl SagittariusRuntimeFunctionServiceClient { - pub async fn new(sagittarius_url: String, token: String) -> Self { - let client = match RuntimeFunctionDefinitionServiceClient::connect(sagittarius_url).await { - Ok(client) => { - log::info!("Successfully connected to Sagittarius RuntimeFunction Endpoint!"); - client - } - Err(err) => panic!( - "Failed to connect to Sagittarius (RuntimeFunction Endpoint): {:?}", - err - ), - }; - + pub fn new(channel: Channel, token: String) -> Self { + let client = RuntimeFunctionDefinitionServiceClient::new(channel); Self { client, token } } - pub async fn new_arc(sagittarius_url: String, token: String) -> Arc> { - Arc::new(Mutex::new(Self::new(sagittarius_url, token).await)) + pub fn new_arc(channel: Channel, token: String) -> Arc> { + Arc::new(Mutex::new(Self::new(channel, token))) } pub async fn update_runtime_function_definitions( @@ -48,10 +38,7 @@ impl SagittariusRuntimeFunctionServiceClient { let response = match self.client.update(request).await { Ok(response) => { - log::info!( - "Successfully transferred RuntimeFunctions. Did Sagittarius updated them? {:?}", - &response - ); + log::info!("Successfully transferred RuntimeFunctions.",); response.into_inner() } Err(err) => { @@ -60,6 +47,11 @@ impl SagittariusRuntimeFunctionServiceClient { } }; + match response.success { + true => log::info!("Sagittarius successfully updated RuntimeFunctions."), + false => log::error!("Sagittarius didn't update any RuntimeFunctionRuntimeFunctions."), + }; + AquilaRuntimeFunctionUpdateResponse { success: response.success, } diff --git a/src/server/data_type_service_server_impl.rs b/src/server/data_type_service_server_impl.rs index 757a1ba..b7a099a 100644 --- a/src/server/data_type_service_server_impl.rs +++ b/src/server/data_type_service_server_impl.rs @@ -21,9 +21,13 @@ impl DataTypeService for AquilaDataTypeServiceServer { ) -> Result, tonic::Status> { let data_type_update_request = request.into_inner(); - log::info!( + log::debug!( "Received DataTypes: {:?}", - data_type_update_request.data_types + data_type_update_request + .data_types + .iter() + .map(|d| d.identifier.clone()) + .collect::>() ); let mut client = self.client.lock().await; diff --git a/src/server/flow_type_service_server_impl.rs b/src/server/flow_type_service_server_impl.rs index 86185f5..eceab96 100644 --- a/src/server/flow_type_service_server_impl.rs +++ b/src/server/flow_type_service_server_impl.rs @@ -22,9 +22,13 @@ impl FlowTypeService for AquilaFlowTypeServiceServer { ) -> Result, tonic::Status> { let flow_type_update_request = request.into_inner(); - log::info!( + log::debug!( "Received FlowTypes: {:?}", - flow_type_update_request.flow_types + flow_type_update_request + .flow_types + .iter() + .map(|f| f.identifier.clone()) + .collect::>() ); let mut client = self.client.lock().await; diff --git a/src/server/mod.rs b/src/server/mod.rs index f61ff4f..2ba7f85 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,5 +1,5 @@ use crate::{ - configuration::Config, + configuration::{Config, state::AppReadiness}, sagittarius::{ data_type_service_client_impl::SagittariusDataTypeServiceClient, flow_type_service_client_impl::SagittariusFlowTypeServiceClient, @@ -10,8 +10,12 @@ use data_type_service_server_impl::AquilaDataTypeServiceServer; use flow_type_service_server_impl::AquilaFlowTypeServiceServer; use log::info; use runtime_function_service_server_impl::AquilaRuntimeFunctionServiceServer; -use std::net::SocketAddr; -use tonic::transport::Server; +use std::{net::SocketAddr, sync::Arc}; +use tokio::sync::Mutex; +use tonic::{ + Request, Status, + transport::{Channel, Server}, +}; use tucana::aquila::{ data_type_service_server::DataTypeServiceServer, flow_type_service_server::FlowTypeServiceServer, @@ -24,14 +28,15 @@ mod runtime_function_service_server_impl; pub struct AquilaGRPCServer { token: String, - sagittarius_url: String, nats_url: String, address: SocketAddr, with_health_service: bool, + app_readiness: AppReadiness, + channel: Channel, } impl AquilaGRPCServer { - pub fn new(config: &Config) -> Self { + pub fn new(config: &Config, app_readiness: AppReadiness, channel: Channel) -> Self { let address = match format!("{}:{}", config.grpc_host, config.grpc_port).parse() { Ok(addr) => { info!("Listening on {:?}", &addr); @@ -42,35 +47,31 @@ impl AquilaGRPCServer { AquilaGRPCServer { token: config.runtime_token.clone(), - sagittarius_url: config.backend_url.clone(), nats_url: config.nats_url.clone(), with_health_service: config.with_health_service, address, + app_readiness, + channel, } } pub async fn start(&self) -> Result<(), tonic::transport::Error> { - let data_type_service = SagittariusDataTypeServiceClient::new_arc( - self.sagittarius_url.clone(), + let data_type_service = Arc::new(Mutex::new(SagittariusDataTypeServiceClient::new( + self.channel.clone(), self.token.clone(), - ) - .await; + ))); info!("DataTypeService started"); - let flow_type_service = SagittariusFlowTypeServiceClient::new_arc( - self.sagittarius_url.clone(), + let flow_type_service = Arc::new(Mutex::new(SagittariusFlowTypeServiceClient::new( + self.channel.clone(), self.token.clone(), - ) - .await; - + ))); info!("FlowTypeService started"); - let runtime_function_service = SagittariusRuntimeFunctionServiceClient::new_arc( - self.sagittarius_url.clone(), - self.token.clone(), - ) - .await; + let runtime_function_service = Arc::new(Mutex::new( + SagittariusRuntimeFunctionServiceClient::new(self.channel.clone(), self.token.clone()), + )); info!("RuntimeFunctionService started"); @@ -81,6 +82,22 @@ impl AquilaGRPCServer { info!("Starting gRPC Server..."); + let readiness: Arc = Arc::new(self.app_readiness.clone()); + + let intercept = { + let readiness = readiness.clone(); + move |req: Request<()>| -> Result, Status> { + if !readiness.is_ready() { + log::error!("Rejected a request because Sagittarius is not ready."); + Err(Status::unavailable( + "Service not ready, waiting on Sagittarius. Please retry again later!", + )) + } else { + Ok(req) + } + } + }; + if self.with_health_service { info!("Starting with HealthService"); let health_service = code0_flow::flow_health::HealthService::new(self.nats_url.clone()); @@ -89,19 +106,33 @@ impl AquilaGRPCServer { .add_service(tonic_health::pb::health_server::HealthServer::new( health_service, )) - .add_service(DataTypeServiceServer::new(data_type_server)) - .add_service(FlowTypeServiceServer::new(flow_type_server)) - .add_service(RuntimeFunctionDefinitionServiceServer::new( + .add_service(DataTypeServiceServer::with_interceptor( + data_type_server, + intercept.clone(), + )) + .add_service(FlowTypeServiceServer::with_interceptor( + flow_type_server, + intercept.clone(), + )) + .add_service(RuntimeFunctionDefinitionServiceServer::with_interceptor( runtime_function_server, + intercept.clone(), )) .serve(self.address) .await } else { Server::builder() - .add_service(DataTypeServiceServer::new(data_type_server)) - .add_service(FlowTypeServiceServer::new(flow_type_server)) - .add_service(RuntimeFunctionDefinitionServiceServer::new( + .add_service(DataTypeServiceServer::with_interceptor( + data_type_server, + intercept.clone(), + )) + .add_service(FlowTypeServiceServer::with_interceptor( + flow_type_server, + intercept.clone(), + )) + .add_service(RuntimeFunctionDefinitionServiceServer::with_interceptor( runtime_function_server, + intercept.clone(), )) .serve(self.address) .await diff --git a/src/server/runtime_function_service_server_impl.rs b/src/server/runtime_function_service_server_impl.rs index 3f06a56..be37ae4 100644 --- a/src/server/runtime_function_service_server_impl.rs +++ b/src/server/runtime_function_service_server_impl.rs @@ -26,9 +26,13 @@ impl RuntimeFunctionDefinitionService for AquilaRuntimeFunctionServiceServer { > { let runtime_function_definition_update_request = request.into_inner(); - log::info!( + log::debug!( "Received RuntimeFunctions: {:?}", - runtime_function_definition_update_request.runtime_functions + runtime_function_definition_update_request + .runtime_functions + .iter() + .map(|f| f.runtime_name.clone()) + .collect::>() ); let mut client = self.client.lock().await;