Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
713 changes: 232 additions & 481 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ members = ["crates/*"]
resolver = "2"

[workspace.dependencies]
sqd-contract-client = { git = "https://github.com/subsquid/sqd-network.git", rev = "c57f520", version = "1.2.0" }
sqd-messages = { git = "https://github.com/subsquid/sqd-network.git", rev = "c57f520", version = "2.0.1", features = ["bitstring"] }
sqd-network-transport = { git = "https://github.com/subsquid/sqd-network.git", rev = "c57f520", version = "3.0.0" }
sqd-contract-client = { git = "https://github.com/subsquid/sqd-network.git", rev = "8a7e5bc", version = "1.2.0" }
sqd-messages = { git = "https://github.com/subsquid/sqd-network.git", rev = "8a7e5bc", version = "2.0.1", features = ["bitstring"] }
sqd-network-transport = { git = "https://github.com/subsquid/sqd-network.git", rev = "8a7e5bc", version = "3.0.0" }
6 changes: 3 additions & 3 deletions crates/collector-utils/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub trait Storage {
query_logs: T,
) -> anyhow::Result<()>;

async fn store_pings<T: Iterator<Item = PingRow> + Sized + Send>(
async fn store_heartbeats<T: Iterator<Item = PingRow> + Sized + Send>(
&self,
pings: T,
) -> anyhow::Result<()>;
Expand Down Expand Up @@ -354,7 +354,7 @@ impl Storage for ClickhouseStorage {
Ok(())
}

async fn store_pings<T: Iterator<Item = PingRow> + Sized + Send>(
async fn store_heartbeats<T: Iterator<Item = PingRow> + Sized + Send>(
&self,
pings: T,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -509,7 +509,7 @@ mod tests {
};
let ts = timestamp_now_ms();
storage
.store_pings(std::iter::once(
.store_heartbeats(std::iter::once(
PingRow::new(ping.clone(), worker_id.to_string()).unwrap(),
))
.await
Expand Down
2 changes: 1 addition & 1 deletion crates/logs-collector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "logs-collector"
version = "2.1.3"
version = "2.1.4"
edition = "2021"

[dependencies]
Expand Down
8 changes: 1 addition & 7 deletions crates/pings-collector/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
[package]
name = "pings-collector"
version = "2.1.3"
version = "2.2.1"
edition = "2021"

[dependencies]
anyhow = "1"
async-trait = "0.1"
bincode = { version = "2.0.0-rc.3", features = ["serde"] }
clap = { version = "4", features = ["derive", "env"] }
clickhouse = "0.12"
env_logger = "0.11"
Expand All @@ -15,12 +14,7 @@ lazy_static = "1"
log = "0.4"
parking_lot = "0.12"
semver = "1"
serde = { version = "1.0.188", features = ["derive"] }
serde_bytes = "0.11"
serde_json = "1"
serde_repr = "0.1"
tokio = { version = "1", features = ["full"] }
yaque = "0.6"

sqd-contract-client = { workspace = true }
sqd-messages = { workspace = true }
Expand Down
22 changes: 5 additions & 17 deletions crates/pings-collector/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::path::PathBuf;

use clap::Parser;
use sqd_network_transport::TransportArgs;

Expand All @@ -14,14 +12,6 @@ pub struct Cli {
#[command(flatten)]
pub clickhouse: ClickhouseArgs,

#[arg(
long,
env,
help = "Interval at which logs are saved to persistent storage (seconds)",
default_value = "120"
)]
pub storage_sync_interval_sec: u32,

#[arg(
long,
env,
Expand All @@ -30,17 +20,15 @@ pub struct Cli {
)]
pub worker_update_interval_sec: u32,

#[arg(
long,
env,
help = "Path to store the local pings buffer",
default_value = "."
)]
pub buffer_dir: PathBuf,
#[arg(long, env, default_value_t = 10)]
pub connect_timeout_sec: u32,

#[arg(long, env, default_value_t = 15)]
pub request_timeout_sec: u32,

#[arg(long, env, default_value_t = 60)]
pub request_interval_sec: u32,

#[arg(long, env, default_value_t = 10)]
pub concurrent_requests: usize,
}
36 changes: 13 additions & 23 deletions crates/pings-collector/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use std::time::Duration;

use clap::Parser;
use env_logger::Env;
use sqd_network_transport::{
get_agent_info, AgentInfo, BaseConfig, P2PTransportBuilder, PingsCollectorConfig,
};
use sqd_network_transport::{get_agent_info, AgentInfo, P2PTransportBuilder, PingsCollectorConfig};

use collector_utils::ClickhouseStorage;

Expand Down Expand Up @@ -33,31 +31,23 @@ async fn main() -> anyhow::Result<()> {

// Build P2P transport
let agent_info = get_agent_info!();
let transport_builder = P2PTransportBuilder::from_cli(args.transport, agent_info)
.await?
.with_base_config(|config| BaseConfig {
status_request_timeout: Duration::from_secs(args.request_timeout_sec as u64),
concurrent_status_requests: args.concurrent_requests,
..config
});
let transport_builder = P2PTransportBuilder::from_cli(args.transport, agent_info).await?;

let contract_client: Arc<dyn sqd_contract_client::Client> =
transport_builder.contract_client().into();
let (incoming_pings, transport_handle) =
transport_builder.build_pings_collector(PingsCollectorConfig {
..Default::default()
})?;
let transport_handle = transport_builder.build_pings_collector(PingsCollectorConfig {
request_timeout: Duration::from_secs(args.request_timeout_sec as u64),
connect_timeout: Duration::from_secs(args.connect_timeout_sec as u64),
..Default::default()
})?;

let storage = ClickhouseStorage::new(args.clickhouse).await?;
let storage_sync_interval = Duration::from_secs(args.storage_sync_interval_sec as u64);
let worker_update_interval = Duration::from_secs(args.worker_update_interval_sec as u64);
Server::new(incoming_pings, transport_handle)
.run(
contract_client,
storage_sync_interval,
worker_update_interval,
args.buffer_dir,
storage,
)

let request_interval = Duration::from_secs(args.request_interval_sec as u64);
let concurrency_limit = args.concurrent_requests;

Server::new(transport_handle, request_interval, concurrency_limit)
.run(contract_client, worker_update_interval, storage)
.await
}
Loading