Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
4961ce0
initial impl with incoming/outgoing message impl
noot Jul 7, 2025
f5365d9
request-response protocol working
noot Jul 7, 2025
565ed95
clippy
noot Jul 7, 2025
a532121
clippy
noot Jul 7, 2025
921724d
merge w main
noot Jul 8, 2025
a548ce4
begin implementation of libp2p node in worker; working on msg handling
noot Jul 8, 2025
d780aae
implement more request handlers
noot Jul 8, 2025
bcaa444
impl hardware challenge, add new p2p to worker cli
noot Jul 8, 2025
0f386af
implement invite request handling, finish cli changes
noot Jul 8, 2025
7bd1009
add full hardware challenge message
noot Jul 9, 2025
d6c1a4a
move messages to their own dir
noot Jul 9, 2025
ea46820
add general request-response protocol
noot Jul 9, 2025
977ab4e
merge
noot Jul 9, 2025
7288261
update SystemState to store libp2p keypair
noot Jul 9, 2025
304f8a8
organize and remove unused deps
noot Jul 9, 2025
46ecca7
add libp2p_port to cli
noot Jul 9, 2025
4358e32
serde for PersistedSystemState
noot Jul 9, 2025
577d843
spawn message handler
noot Jul 9, 2025
4285eaa
add dial channel to p2p node; impl validator libp2p node
noot Jul 9, 2025
7503885
fully implement hardware challenge flow
noot Jul 9, 2025
d32f540
upddate validator main to use libp2p node
noot Jul 9, 2025
56d6b1d
clean up deps
noot Jul 9, 2025
c6183d6
add authorized peer to map
noot Jul 9, 2025
94e9e4d
implement dialing peers
noot Jul 10, 2025
d05ad87
merge
noot Jul 10, 2025
91c0e5b
Merge branch 'noot/libp2p' of https://github.com/PrimeIntellect-ai/pr…
noot Jul 10, 2025
a8af706
use tracing
noot Jul 10, 2025
0952b30
Merge branch 'noot/libp2p' of https://github.com/PrimeIntellect-ai/pr…
noot Jul 10, 2025
15dc2c4
move shared authentication service to shared
noot Jul 10, 2025
0046fac
implement orchestrator p2p service
noot Jul 10, 2025
08a10ec
update orchestrator to use libp2p node
noot Jul 10, 2025
ac923ca
deps cleanup
noot Jul 10, 2025
de3afb1
merge
noot Jul 10, 2025
6a3b04b
Merge branch 'noot/validator-libp2p' into noot/libp2p-unified
noot Jul 10, 2025
f35b001
delete unused code
noot Jul 10, 2025
2475059
no port conflict
noot Jul 10, 2025
73300be
rename messages to be more correct
noot Jul 11, 2025
e135ad4
add logging
noot Jul 11, 2025
ecb5b66
fix tests
noot Jul 11, 2025
4798692
remove explicit dialing, messaging now working
noot Jul 11, 2025
f87d5d3
remove println
noot Jul 11, 2025
d77ef04
fix unit tests
noot Jul 11, 2025
8663553
address some comments
noot Jul 11, 2025
4f66957
store outbound request when already authenticated
noot Jul 11, 2025
d712de6
use debug log
noot Jul 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 25 additions & 173 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ members = [
"crates/validator",
"crates/shared",
"crates/orchestrator",
"crates/p2p",
"crates/dev-utils",
]
resolver = "2"

[workspace.dependencies]
shared = { path = "crates/shared" }
p2p = { path = "crates/p2p" }

actix-web = "4.9.0"
clap = { version = "4.5.27", features = ["derive"] }
serde = { version = "1.0.219", features = ["derive"] }
Expand Down Expand Up @@ -42,6 +45,7 @@ rand_core_v6 = { package = "rand_core", version = "0.6.4", features = ["std"] }
ipld-core = "0.4"
rust-ipfs = "0.14"
cid = "0.11"
tracing = "0.1.41"

[workspace.package]
version = "0.3.11"
Expand Down
10 changes: 3 additions & 7 deletions crates/discovery/src/chainsync/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ async fn sync_single_node(
})?;

let balance = provider.get_balance(node_address).await.map_err(|e| {
error!("Error retrieving balance for node {}: {}", node_address, e);
error!("Error retrieving balance for node {node_address}: {e}");
anyhow::anyhow!("Failed to retrieve node balance")
})?;
n.latest_balance = Some(balance);
Expand All @@ -166,8 +166,7 @@ async fn sync_single_node(
.await
.map_err(|e| {
error!(
"Error retrieving node info for provider {} and node {}: {}",
provider_address, node_address, e
"Error retrieving node info for provider {provider_address} and node {node_address}: {e}"
);
anyhow::anyhow!("Failed to retrieve node info")
})?;
Expand All @@ -177,10 +176,7 @@ async fn sync_single_node(
.get_provider(provider_address)
.await
.map_err(|e| {
error!(
"Error retrieving provider info for {}: {}",
provider_address, e
);
error!("Error retrieving provider info for {provider_address}: {e}");
anyhow::anyhow!("Failed to retrieve provider info")
})?;

Expand Down
22 changes: 11 additions & 11 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,35 @@ edition.workspace = true
workspace = true

[dependencies]
p2p = { workspace = true}
shared = { workspace = true }

actix-web = { workspace = true }
actix-web-prometheus = "0.1.2"
alloy = { workspace = true }
anyhow = { workspace = true }
async-trait = "0.1.88"
base64 = "0.22.1"
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
google-cloud-auth = "0.18.0"
google-cloud-storage = "0.24.0"
hex = { workspace = true }
log = { workspace = true }
prometheus = "0.14.0"
rand = "0.9.0"
redis = { workspace = true, features = ["tokio-comp"] }
redis-test = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
shared = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }

actix-web-prometheus = "0.1.2"
google-cloud-auth = "0.18.0"
google-cloud-storage = "0.24.0"
prometheus = "0.14.0"
rand = "0.9.0"
utoipa = { version = "5.3.0", features = ["actix_extras", "chrono", "uuid"] }
utoipa-swagger-ui = { version = "9.0.2", features = ["actix-web", "debug-embed", "reqwest", "vendored"] }
uuid = { workspace = true }
iroh = { workspace = true }
rand_v8 = { workspace = true }

[dev-dependencies]
mockito = { workspace = true }
24 changes: 16 additions & 8 deletions crates/orchestrator/src/api/routes/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,6 @@ async fn fetch_node_logs_p2p(

match node {
Some(node) => {
// Check if P2P client is available
let p2p_client = app_state.p2p_client.clone();

// Check if node has P2P information
let (worker_p2p_id, worker_p2p_addresses) =
match (&node.worker_p2p_id, &node.worker_p2p_addresses) {
Expand All @@ -254,11 +251,22 @@ async fn fetch_node_logs_p2p(
};

// Send P2P request for task logs
match tokio::time::timeout(
Duration::from_secs(NODE_REQUEST_TIMEOUT),
p2p_client.get_task_logs(node_address, worker_p2p_id, worker_p2p_addresses),
)
.await
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let get_task_logs_request = crate::p2p::GetTaskLogsRequest {
worker_wallet_address: node_address,
worker_p2p_id: worker_p2p_id.clone(),
worker_addresses: worker_p2p_addresses.clone(),
response_tx,
};
if let Err(e) = app_state.get_task_logs_tx.send(get_task_logs_request).await {
error!("Failed to send GetTaskLogsRequest for node {node_address}: {e}");
return json!({
"success": false,
"error": format!("Failed to send request: {}", e),
"status": node.status.to_string()
});
};
match tokio::time::timeout(Duration::from_secs(NODE_REQUEST_TIMEOUT), response_rx).await
{
Ok(Ok(log_lines)) => {
json!({
Expand Down
42 changes: 32 additions & 10 deletions crates/orchestrator/src/api/routes/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,22 @@ async fn restart_node_task(node_id: web::Path<String>, app_state: Data<AppState>
.as_ref()
.expect("worker_p2p_addresses should be present");

match app_state
.p2p_client
.restart_task(node_address, p2p_id, p2p_addresses)
.await
{
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let restart_task_request = crate::p2p::RestartTaskRequest {
worker_wallet_address: node.address,
worker_p2p_id: p2p_id.clone(),
worker_addresses: p2p_addresses.clone(),
response_tx,
};
if let Err(e) = app_state.restart_task_tx.send(restart_task_request).await {
error!("Failed to send restart task request: {e}");
return HttpResponse::InternalServerError().json(json!({
"success": false,
"error": "Failed to send restart task request"
}));
}

match response_rx.await {
Ok(_) => HttpResponse::Ok().json(json!({
"success": true,
"message": "Task restarted successfully"
Expand Down Expand Up @@ -240,11 +251,22 @@ async fn get_node_logs(node_id: web::Path<String>, app_state: Data<AppState>) ->
}));
};

match app_state
.p2p_client
.get_task_logs(node_address, p2p_id, p2p_addresses)
.await
{
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let get_task_logs_request = crate::p2p::GetTaskLogsRequest {
worker_wallet_address: node.address,
worker_p2p_id: p2p_id.clone(),
worker_addresses: p2p_addresses.clone(),
response_tx,
};
if let Err(e) = app_state.get_task_logs_tx.send(get_task_logs_request).await {
error!("Failed to send get task logs request: {e}");
return HttpResponse::InternalServerError().json(json!({
"success": false,
"error": "Failed to send get task logs request"
}));
}

match response_rx.await {
Ok(logs) => HttpResponse::Ok().json(json!({
"success": true,
"logs": logs
Expand Down
32 changes: 18 additions & 14 deletions crates/orchestrator/src/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::api::routes::task::tasks_routes;
use crate::api::routes::{heartbeat::heartbeat_routes, metrics::metrics_routes};
use crate::metrics::MetricsContext;
use crate::models::node::NodeStatus;
use crate::p2p::client::P2PClient;
use crate::p2p::{GetTaskLogsRequest, RestartTaskRequest};
use crate::plugins::node_groups::NodeGroupsPlugin;
use crate::scheduler::Scheduler;
use crate::store::core::{RedisStore, StoreContext};
Expand All @@ -23,6 +23,7 @@ use shared::utils::StorageProvider;
use shared::web3::contracts::core::builder::Contracts;
use shared::web3::wallet::WalletProvider;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use utoipa::{
openapi::security::{ApiKey, ApiKeyValue, SecurityScheme},
Modify, OpenApi,
Expand Down Expand Up @@ -116,17 +117,18 @@ async fn health_check(data: web::Data<AppState>) -> HttpResponse {
}

pub(crate) struct AppState {
pub store_context: Arc<StoreContext>,
pub storage_provider: Option<Arc<dyn StorageProvider>>,
pub heartbeats: Arc<LoopHeartbeats>,
pub redis_store: Arc<RedisStore>,
pub hourly_upload_limit: i64,
pub contracts: Option<Contracts<WalletProvider>>,
pub pool_id: u32,
pub scheduler: Scheduler,
pub node_groups_plugin: Option<Arc<NodeGroupsPlugin>>,
pub metrics: Arc<MetricsContext>,
pub p2p_client: Arc<P2PClient>,
pub(crate) store_context: Arc<StoreContext>,
pub(crate) storage_provider: Option<Arc<dyn StorageProvider>>,
pub(crate) heartbeats: Arc<LoopHeartbeats>,
pub(crate) redis_store: Arc<RedisStore>,
pub(crate) hourly_upload_limit: i64,
pub(crate) contracts: Option<Contracts<WalletProvider>>,
pub(crate) pool_id: u32,
pub(crate) scheduler: Scheduler,
pub(crate) node_groups_plugin: Option<Arc<NodeGroupsPlugin>>,
pub(crate) metrics: Arc<MetricsContext>,
pub(crate) get_task_logs_tx: Sender<GetTaskLogsRequest>,
pub(crate) restart_task_tx: Sender<RestartTaskRequest>,
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -145,7 +147,8 @@ pub async fn start_server(
scheduler: Scheduler,
node_groups_plugin: Option<Arc<NodeGroupsPlugin>>,
metrics: Arc<MetricsContext>,
p2p_client: Arc<P2PClient>,
get_task_logs_tx: Sender<GetTaskLogsRequest>,
restart_task_tx: Sender<RestartTaskRequest>,
) -> Result<(), Error> {
info!("Starting server at http://{host}:{port}");
let app_state = Data::new(AppState {
Expand All @@ -159,7 +162,8 @@ pub async fn start_server(
scheduler,
node_groups_plugin,
metrics,
p2p_client,
get_task_logs_tx,
restart_task_tx,
});
let node_store = app_state.store_context.node_store.clone();
let node_store_clone = node_store.clone();
Expand Down
50 changes: 20 additions & 30 deletions crates/orchestrator/src/api/tests/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ use std::sync::Arc;
use url::Url;

#[cfg(test)]
pub async fn create_test_app_state() -> Data<AppState> {
pub(crate) async fn create_test_app_state() -> Data<AppState> {
use shared::utils::MockStorageProvider;

use crate::{
metrics::MetricsContext, p2p::client::P2PClient, scheduler::Scheduler,
utils::loop_heartbeats::LoopHeartbeats, ServerMode,
metrics::MetricsContext, scheduler::Scheduler, utils::loop_heartbeats::LoopHeartbeats,
ServerMode,
};

let store = Arc::new(RedisStore::new_test());
Expand All @@ -46,12 +46,8 @@ pub async fn create_test_app_state() -> Data<AppState> {
let mock_storage = MockStorageProvider::new();
let storage_provider = Arc::new(mock_storage);
let metrics = Arc::new(MetricsContext::new(1.to_string()));
let wallet = Wallet::new(
"0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97",
Url::parse("http://localhost:8545").unwrap(),
)
.unwrap();
let p2p_client = Arc::new(P2PClient::new(wallet.clone()).await.unwrap());
let (get_task_logs_tx, _) = tokio::sync::mpsc::channel(1);
let (restart_task_tx, _) = tokio::sync::mpsc::channel(1);

Data::new(AppState {
store_context: store_context.clone(),
Expand All @@ -64,17 +60,17 @@ pub async fn create_test_app_state() -> Data<AppState> {
scheduler,
node_groups_plugin: None,
metrics,
p2p_client: p2p_client.clone(),
get_task_logs_tx,
restart_task_tx,
})
}

#[cfg(test)]
pub async fn create_test_app_state_with_nodegroups() -> Data<AppState> {
pub(crate) async fn create_test_app_state_with_nodegroups() -> Data<AppState> {
use shared::utils::MockStorageProvider;

use crate::{
metrics::MetricsContext,
p2p::client::P2PClient,
plugins::node_groups::{NodeGroupConfiguration, NodeGroupsPlugin},
scheduler::Scheduler,
utils::loop_heartbeats::LoopHeartbeats,
Expand Down Expand Up @@ -116,12 +112,8 @@ pub async fn create_test_app_state_with_nodegroups() -> Data<AppState> {
let mock_storage = MockStorageProvider::new();
let storage_provider = Arc::new(mock_storage);
let metrics = Arc::new(MetricsContext::new(1.to_string()));
let wallet = Wallet::new(
"0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97",
Url::parse("http://localhost:8545").unwrap(),
)
.unwrap();
let p2p_client = Arc::new(P2PClient::new(wallet.clone()).await.unwrap());
let (get_task_logs_tx, _) = tokio::sync::mpsc::channel(1);
let (restart_task_tx, _) = tokio::sync::mpsc::channel(1);

Data::new(AppState {
store_context: store_context.clone(),
Expand All @@ -134,12 +126,13 @@ pub async fn create_test_app_state_with_nodegroups() -> Data<AppState> {
scheduler,
node_groups_plugin,
metrics,
p2p_client: p2p_client.clone(),
get_task_logs_tx,
restart_task_tx,
})
}

#[cfg(test)]
pub fn setup_contract() -> Contracts<WalletProvider> {
pub(crate) fn setup_contract() -> Contracts<WalletProvider> {
let coordinator_key = "0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97";
let rpc_url: Url = Url::parse("http://localhost:8545").unwrap();
let wallet = Wallet::new(coordinator_key, rpc_url).unwrap();
Expand All @@ -154,12 +147,12 @@ pub fn setup_contract() -> Contracts<WalletProvider> {
}

#[cfg(test)]
pub async fn create_test_app_state_with_metrics() -> Data<AppState> {
pub(crate) async fn create_test_app_state_with_metrics() -> Data<AppState> {
use shared::utils::MockStorageProvider;

use crate::{
metrics::MetricsContext, p2p::client::P2PClient, scheduler::Scheduler,
utils::loop_heartbeats::LoopHeartbeats, ServerMode,
metrics::MetricsContext, scheduler::Scheduler, utils::loop_heartbeats::LoopHeartbeats,
ServerMode,
};

let store = Arc::new(RedisStore::new_test());
Expand All @@ -182,12 +175,8 @@ pub async fn create_test_app_state_with_metrics() -> Data<AppState> {
let mock_storage = MockStorageProvider::new();
let storage_provider = Arc::new(mock_storage);
let metrics = Arc::new(MetricsContext::new("0".to_string()));
let wallet = Wallet::new(
"0xdbda1821b80551c9d65939329250298aa3472ba22feea921c0cf5d620ea67b97",
Url::parse("http://localhost:8545").unwrap(),
)
.unwrap();
let p2p_client = Arc::new(P2PClient::new(wallet.clone()).await.unwrap());
let (get_task_logs_tx, _) = tokio::sync::mpsc::channel(1);
let (restart_task_tx, _) = tokio::sync::mpsc::channel(1);

Data::new(AppState {
store_context: store_context.clone(),
Expand All @@ -200,6 +189,7 @@ pub async fn create_test_app_state_with_metrics() -> Data<AppState> {
scheduler,
node_groups_plugin: None,
metrics,
p2p_client: p2p_client.clone(),
get_task_logs_tx,
restart_task_tx,
})
}
Loading