Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion crates/apollo_batcher/src/batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ use apollo_storage::partial_block_hash::{
PartialBlockHashComponentsStorageWriter,
};
use apollo_storage::state::{StateStorageReader, StateStorageWriter};
use apollo_storage::storage_reader_server::ServerConfig;
use apollo_storage::storage_reader_server::{
DynamicConfigError,
DynamicConfigProvider,
ServerConfig,
SharedDynamicConfigProvider,
StorageReaderServerDynamicConfig,
};
use apollo_storage::storage_reader_types::{
GenericStorageReaderServerHandler,
StorageReaderRequest,
Expand Down Expand Up @@ -1390,6 +1396,24 @@ fn log_txs_execution_result(
}
}

struct BatcherDynamicConfigProvider {
config_manager_client: SharedConfigManagerClient,
}

#[async_trait]
impl DynamicConfigProvider for BatcherDynamicConfigProvider {
async fn get_storage_reader_dynamic_config(
&self,
) -> Result<StorageReaderServerDynamicConfig, DynamicConfigError> {
let config = self
.config_manager_client
.get_batcher_dynamic_config()
.await
.map_err(|e| DynamicConfigError(e.to_string()))?;
Ok(config.storage_reader_server_dynamic_config)
}
}

#[allow(clippy::too_many_arguments)]
pub async fn create_batcher(
config: BatcherConfig,
Expand All @@ -1405,6 +1429,10 @@ pub async fn create_batcher(
static_config: config.static_config.storage_reader_server_static_config.clone(),
dynamic_config: config.dynamic_config.storage_reader_server_dynamic_config.clone(),
};
let dynamic_config_provider: SharedDynamicConfigProvider =
Arc::new(BatcherDynamicConfigProvider {
config_manager_client: config_manager_client.clone(),
});
let (storage_reader, storage_writer, storage_reader_server) =
open_storage_with_metric_and_server::<
GenericStorageReaderServerHandler,
Expand All @@ -1414,6 +1442,7 @@ pub async fn create_batcher(
config.static_config.storage.clone(),
&BATCHER_STORAGE_OPEN_READ_TRANSACTIONS,
storage_reader_server_config,
dynamic_config_provider,
)
.expect("Failed to open batcher's storage");

Expand Down
30 changes: 30 additions & 0 deletions crates/apollo_class_manager/src/class_manager.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::sync::Arc;
use std::time::Instant;

use apollo_class_manager_config::config::{ClassManagerDynamicConfig, FsClassManagerConfig};
Expand All @@ -16,6 +17,12 @@ use apollo_compile_to_casm_types::{
};
use apollo_config_manager_types::communication::SharedConfigManagerClient;
use apollo_infra::component_definitions::{default_component_start_fn, ComponentStarter};
use apollo_storage::storage_reader_server::{
DynamicConfigError,
DynamicConfigProvider,
SharedDynamicConfigProvider,
StorageReaderServerDynamicConfig,
};
use async_trait::async_trait;
use starknet_api::state::{SierraContractClass, CONTRACT_CLASS_VERSION};
use tracing::{debug, instrument};
Expand Down Expand Up @@ -182,14 +189,37 @@ where
}
}

struct ClassManagerDynamicConfigProvider {
config_manager_client: SharedConfigManagerClient,
}

#[async_trait]
impl DynamicConfigProvider for ClassManagerDynamicConfigProvider {
async fn get_storage_reader_dynamic_config(
&self,
) -> Result<StorageReaderServerDynamicConfig, DynamicConfigError> {
let config = self
.config_manager_client
.get_class_manager_dynamic_config()
.await
.map_err(|e| DynamicConfigError(e.to_string()))?;
Ok(config.storage_reader_server_dynamic_config)
}
}

pub fn create_class_manager(
config: FsClassManagerConfig,
compiler_client: SharedSierraCompilerClient,
config_manager_client: SharedConfigManagerClient,
) -> FsClassManager {
let dynamic_config_provider: SharedDynamicConfigProvider =
Arc::new(ClassManagerDynamicConfigProvider {
config_manager_client: config_manager_client.clone(),
});
let fs_class_storage = FsClassStorage::new(
config.static_config.class_storage_config.clone(),
config.dynamic_config.storage_reader_server_dynamic_config.clone(),
dynamic_config_provider,
)
.expect("Failed to create class storage.");
let class_manager =
Expand Down
16 changes: 13 additions & 3 deletions crates/apollo_class_manager/src/class_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ use apollo_storage::class_hash::{ClassHashStorageReader, ClassHashStorageWriter}
use apollo_storage::metrics::CLASS_MANAGER_STORAGE_OPEN_READ_TRANSACTIONS;
#[cfg(any(feature = "testing", test))]
use apollo_storage::open_storage;
use apollo_storage::storage_reader_server::{ServerConfig, StorageReaderServerDynamicConfig};
use apollo_storage::storage_reader_server::{
ServerConfig,
SharedDynamicConfigProvider,
StorageReaderServerDynamicConfig,
};
use apollo_storage::storage_reader_types::{
GenericStorageReaderServerHandler,
StorageReaderRequest,
Expand Down Expand Up @@ -263,6 +267,7 @@ impl ClassHashStorage {
pub fn new(
storage_config: StorageConfig,
storage_reader_server_config: ServerConfig,
dynamic_config_provider: SharedDynamicConfigProvider,
) -> ClassHashStorageResult<Self> {
let (reader, writer, storage_reader_server) =
apollo_storage::open_storage_with_metric_and_server::<
Expand All @@ -273,6 +278,7 @@ impl ClassHashStorage {
storage_config,
&CLASS_MANAGER_STORAGE_OPEN_READ_TRANSACTIONS,
storage_reader_server_config,
dynamic_config_provider,
)?;

let storage_reader_server_handle = Arc::new(storage_reader_server.spawn());
Expand Down Expand Up @@ -346,13 +352,17 @@ impl FsClassStorage {
pub fn new(
config: FsClassStorageConfig,
storage_reader_server_dynamic_config: StorageReaderServerDynamicConfig,
dynamic_config_provider: SharedDynamicConfigProvider,
) -> FsClassStorageResult<Self> {
let storage_reader_server_config = ServerConfig {
static_config: config.storage_reader_server_static_config.clone(),
dynamic_config: storage_reader_server_dynamic_config,
};
let class_hash_storage =
ClassHashStorage::new(config.class_hash_storage_config, storage_reader_server_config)?;
Comment thread
cursor[bot] marked this conversation as resolved.
let class_hash_storage = ClassHashStorage::new(
config.class_hash_storage_config,
storage_reader_server_config,
dynamic_config_provider,
)?;
std::fs::create_dir_all(&config.persistent_root)?;
Ok(Self { persistent_root: config.persistent_root, class_hash_storage })
}
Expand Down
8 changes: 6 additions & 2 deletions crates/apollo_state_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ pub fn create_state_sync_and_runner(
config_manager_client: SharedConfigManagerClient,
) -> (StateSync, StateSyncRunner) {
let (new_block_sender, new_block_receiver) = channel(BUFFER_SIZE);
let (state_sync_runner, storage_reader) =
StateSyncRunner::new(config.clone(), new_block_receiver, class_manager_client);
let (state_sync_runner, storage_reader) = StateSyncRunner::new(
config.clone(),
new_block_receiver,
class_manager_client,
config_manager_client.clone(),
);
(
StateSync::new(storage_reader, new_block_sender, config, config_manager_client),
state_sync_runner,
Expand Down
45 changes: 42 additions & 3 deletions crates/apollo_state_sync/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use apollo_central_sync::sources::central::{CentralError, CentralSource};
use apollo_central_sync::sources::pending::PendingSource;
use apollo_central_sync::{StateSync as CentralStateSync, StateSyncError as CentralStateSyncError};
use apollo_class_manager_types::SharedClassManagerClient;
use apollo_config_manager_types::communication::SharedConfigManagerClient;
use apollo_infra::component_definitions::ComponentStarter;
use apollo_infra::component_server::WrapperServer;
use apollo_network::metrics::{NetworkMetrics, SqmrNetworkMetrics};
Expand Down Expand Up @@ -41,7 +42,13 @@ use apollo_state_sync_types::state_sync_types::SyncBlock;
use apollo_storage::body::BodyStorageReader;
use apollo_storage::header::HeaderStorageReader;
use apollo_storage::metrics::SYNC_STORAGE_OPEN_READ_TRANSACTIONS;
use apollo_storage::storage_reader_server::ServerConfig;
use apollo_storage::storage_reader_server::{
DynamicConfigError,
DynamicConfigProvider,
ServerConfig,
SharedDynamicConfigProvider,
StorageReaderServerDynamicConfig,
};
use apollo_storage::storage_reader_types::{
GenericStorageReaderServerHandler,
StorageReaderRequest,
Expand All @@ -65,6 +72,24 @@ use tokio::task::AbortHandle;
use tracing::instrument::Instrument;
use tracing::{debug, info_span};

struct StateSyncDynamicConfigProvider {
config_manager_client: SharedConfigManagerClient,
}

#[async_trait]
impl DynamicConfigProvider for StateSyncDynamicConfigProvider {
async fn get_storage_reader_dynamic_config(
&self,
) -> Result<StorageReaderServerDynamicConfig, DynamicConfigError> {
let config = self
.config_manager_client
.get_state_sync_dynamic_config()
.await
.map_err(|e| DynamicConfigError(e.to_string()))?;
Ok(config.storage_reader_server_dynamic_config)
}
}

pub struct StateSyncRunner {
network_future: BoxFuture<'static, Result<(), NetworkError>>,
// TODO(Matan): change client and server to requester and responder respectively
Expand Down Expand Up @@ -116,7 +141,11 @@ pub struct StateSyncResources {
}

impl StateSyncResources {
pub fn new(storage_config: &StorageConfig, storage_reader_server_config: ServerConfig) -> Self {
pub fn new(
storage_config: &StorageConfig,
storage_reader_server_config: ServerConfig,
dynamic_config_provider: SharedDynamicConfigProvider,
) -> Self {
let (storage_reader, storage_writer, storage_reader_server) =
open_storage_with_metric_and_server::<
GenericStorageReaderServerHandler,
Expand All @@ -126,6 +155,7 @@ impl StateSyncResources {
storage_config.clone(),
&SYNC_STORAGE_OPEN_READ_TRANSACTIONS,
storage_reader_server_config,
dynamic_config_provider,
)
.expect("StateSyncRunner failed opening storage");
let storage_reader_server_handle = storage_reader_server.spawn();
Expand Down Expand Up @@ -156,21 +186,30 @@ impl StateSyncRunner {
config: StateSyncConfig,
new_block_receiver: Receiver<SyncBlock>,
class_manager_client: SharedClassManagerClient,
config_manager_client: SharedConfigManagerClient,
) -> (Self, StorageReader) {
let StateSyncConfig { static_config, dynamic_config } = config;

let storage_reader_server_config = ServerConfig {
static_config: static_config.storage_reader_server_static_config.clone(),
dynamic_config: dynamic_config.storage_reader_server_dynamic_config.clone(),
};
let dynamic_config_provider: SharedDynamicConfigProvider =
Arc::new(StateSyncDynamicConfigProvider {
config_manager_client: config_manager_client.clone(),
});
let StateSyncResources {
storage_reader,
mut storage_writer,
shared_highest_block,
pending_data,
pending_classes,
storage_reader_server_handle,
} = StateSyncResources::new(&static_config.storage_config, storage_reader_server_config);
} = StateSyncResources::new(
&static_config.storage_config,
storage_reader_server_config,
dynamic_config_provider,
);

let StateSyncStaticConfig {
storage_config: _,
Expand Down
9 changes: 7 additions & 2 deletions crates/apollo_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ use crate::state::data::IndexedDeprecatedContractClass;
use crate::storage_reader_server::{
create_storage_reader_server,
ServerConfig,
SharedDynamicConfigProvider,
StorageReaderServer,
StorageReaderServerHandler,
};
Expand All @@ -201,15 +202,19 @@ pub fn open_storage_with_metric_and_server<RequestHandler, Request, Response>(
storage_config: StorageConfig,
open_readers_metric: &'static MetricGauge,
storage_reader_server_config: ServerConfig,
dynamic_config_provider: SharedDynamicConfigProvider,
) -> StorageResult<StorageWithServer<RequestHandler, Request, Response>>
where
RequestHandler: StorageReaderServerHandler<Request, Response>,
Request: Serialize + DeserializeOwned + Send + 'static,
Response: Serialize + DeserializeOwned + Send + 'static,
{
let (reader, writer) = open_storage_internal(storage_config, Some(open_readers_metric))?;
let storage_reader_server =
create_storage_reader_server(reader.clone(), storage_reader_server_config);
let storage_reader_server = create_storage_reader_server(
reader.clone(),
storage_reader_server_config,
dynamic_config_provider,
);
Ok((reader, writer, storage_reader_server))
}

Expand Down
Loading
Loading