diff --git a/Cargo.lock b/Cargo.lock index cdb0cddb..e5db17a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5451,6 +5451,7 @@ dependencies = [ "pluto-app", "pluto-cluster", "pluto-core", + "pluto-dkg", "pluto-eth2util", "pluto-k1util", "pluto-p2p", @@ -5468,6 +5469,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "url", "wiremock", ] @@ -5561,12 +5563,14 @@ name = "pluto-dkg" version = "1.7.1" dependencies = [ "anyhow", + "bon", "clap", "either", "futures", "hex", "k256", "libp2p", + "pluto-app", "pluto-build-proto", "pluto-cluster", "pluto-core", @@ -5598,6 +5602,7 @@ version = "1.7.1" dependencies = [ "alloy", "thiserror 2.0.18", + "tokio", "url", ] diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 5a53e5d4..997a5082 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -19,6 +19,7 @@ humantime.workspace = true tokio.workspace = true pluto-app.workspace = true pluto-cluster.workspace = true +pluto-dkg.workspace = true pluto-relay-server.workspace = true pluto-tracing.workspace = true pluto-core.workspace = true @@ -35,6 +36,7 @@ serde_with = { workspace = true, features = ["base64"] } rand.workspace = true tempfile.workspace = true reqwest.workspace = true +url.workspace = true [dev-dependencies] tempfile.workspace = true diff --git a/crates/cli/src/cli.rs b/crates/cli/src/cli.rs index 5c4956f0..b2fc4022 100644 --- a/crates/cli/src/cli.rs +++ b/crates/cli/src/cli.rs @@ -4,6 +4,7 @@ use clap::{Parser, Subcommand}; use crate::commands::{ create_enr::CreateEnrArgs, + dkg::DkgArgs, enr::EnrArgs, relay::RelayArgs, test::{ @@ -51,6 +52,12 @@ pub enum Commands { )] Relay(Box), + #[command( + about = "Participate in a Distributed Key Generation ceremony", + long_about = "Participate in a distributed key generation ceremony for a specific cluster definition that creates distributed validator key shares and a final cluster lock configuration. Note that all other cluster operators should run this command at the same time." + )] + Dkg(Box), + #[command( about = "Alpha subcommands provide early access to in-development features", long_about = "Alpha subcommands represent features that are currently under development. They're not yet released for general use, but offer a glimpse into future functionalities planned for the distributed cluster system." diff --git a/crates/cli/src/commands/common.rs b/crates/cli/src/commands/common.rs new file mode 100644 index 00000000..f76b345b --- /dev/null +++ b/crates/cli/src/commands/common.rs @@ -0,0 +1,49 @@ +//! Shared helpers for CLI commands. + +use std::str::FromStr; + +use libp2p::{Multiaddr, multiaddr}; + +/// Shared license notice shown by long-running commands. +pub const LICENSE: &str = concat!( + "This software is licensed under the Maria DB Business Source License 1.1; ", + "you may not use this software except in compliance with this license. You may obtain a ", + "copy of this license at https://github.com/NethermindEth/pluto/blob/main/LICENSE" +); + +/// Console color selection for terminal logging. +#[derive(clap::ValueEnum, Clone, Copy, Debug, Default)] +pub enum ConsoleColor { + /// Automatically decide whether to use ANSI colors. + #[default] + Auto, + /// Always use ANSI colors. + Force, + /// Never use ANSI colors. + Disable, +} + +/// Builds a console tracing configuration for CLI commands. +pub fn build_console_tracing_config( + level: impl Into, + color: &ConsoleColor, +) -> pluto_tracing::TracingConfig { + let mut builder = pluto_tracing::TracingConfig::builder().with_default_console(); + + builder = match color { + ConsoleColor::Auto => builder.console_with_ansi(std::env::var("NO_COLOR").is_err()), + ConsoleColor::Force => builder.console_with_ansi(true), + ConsoleColor::Disable => builder.console_with_ansi(false), + }; + + // TODO: Handle loki config + + // TODO: Handle log output path + + builder.override_env_filter(level.into()).build() +} + +/// Parses a relay string as either a relay URL or a raw multiaddr. +pub fn parse_relay_addr(relay: &str) -> std::result::Result { + multiaddr::from_url(relay).or_else(|_| Multiaddr::from_str(relay)) +} diff --git a/crates/cli/src/commands/dkg.rs b/crates/cli/src/commands/dkg.rs new file mode 100644 index 00000000..7823bfcd --- /dev/null +++ b/crates/cli/src/commands/dkg.rs @@ -0,0 +1,489 @@ +//! DKG command implementation. + +use std::{future::Future, path::PathBuf}; + +use crate::{ + commands::common::{ConsoleColor, LICENSE, build_console_tracing_config, parse_relay_addr}, + duration::Duration, + error::{CliError, Result}, +}; +use libp2p::multiaddr::Protocol; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn}; + +/// Arguments for the `dkg` command. +#[derive(clap::Args, Clone, Debug)] +pub struct DkgArgs { + #[arg( + long = "data-dir", + env = "CHARON_DATA_DIR", + default_value = ".charon", + help = "The directory where charon will store all its internal data." + )] + pub data_dir: PathBuf, + + #[arg( + long = "definition-file", + env = "CHARON_DEFINITION_FILE", + default_value = ".charon/cluster-definition.json", + help = "The path to the cluster definition file or an HTTP URL." + )] + pub definition_file: String, + + #[arg( + long = "no-verify", + env = "CHARON_NO_VERIFY", + default_value_t = false, + help = "Disables cluster definition and lock file verification." + )] + pub no_verify: bool, + + #[arg( + long = "keymanager-address", + env = "CHARON_KEYMANAGER_ADDRESS", + default_value = "", + help = "The keymanager URL to import validator keyshares." + )] + pub keymanager_address: String, + + #[arg( + long = "keymanager-auth-token", + env = "CHARON_KEYMANAGER_AUTH_TOKEN", + default_value = "", + help = "Authentication bearer token to interact with keymanager API. Don't include the \"Bearer\" symbol, only include the api-token." + )] + pub keymanager_auth_token: String, + + #[command(flatten)] + pub p2p: DkgP2PArgs, + + #[command(flatten)] + pub log: DkgLogArgs, + + #[arg( + long = "publish-address", + env = "CHARON_PUBLISH_ADDRESS", + default_value = "https://api.obol.tech/v1", + help = "The URL to publish the cluster to." + )] + pub publish_address: String, + + #[arg( + long = "publish-timeout", + env = "CHARON_PUBLISH_TIMEOUT", + default_value = "30s", + help = "Timeout for publishing a cluster, consider increasing if the cluster contains more than 200 validators." + )] + pub publish_timeout: Duration, + + #[arg( + long = "publish", + env = "CHARON_PUBLISH", + default_value_t = false, + help = "Publish the created cluster to a remote API." + )] + pub publish: bool, + + #[arg( + long = "shutdown-delay", + env = "CHARON_SHUTDOWN_DELAY", + default_value = "1s", + help = "Graceful shutdown delay." + )] + pub shutdown_delay: Duration, + + #[arg( + long = "execution-client-rpc-endpoint", + env = "CHARON_EXECUTION_CLIENT_RPC_ENDPOINT", + default_value = "", + help = "The address of the execution engine JSON-RPC API." + )] + pub execution_client_rpc_endpoint: String, + + #[arg( + long = "timeout", + env = "CHARON_TIMEOUT", + default_value = "1m0s", + help = "Timeout for the DKG process, should be increased if DKG times out." + )] + pub timeout: Duration, + + #[arg( + long = "zipped", + env = "CHARON_ZIPPED", + default_value_t = false, + help = "Create a tar archive compressed with gzip of the target directory after creation." + )] + pub zipped: bool, +} + +impl DkgArgs { + /// Converts CLI arguments into the DKG crate configuration. + pub fn into_config(self) -> Result { + validate_p2p_args(&self.p2p)?; + + let tracing_config = build_console_tracing_config(self.log.level.clone(), &self.log.color); + let p2p_config = { + let mut relays = Vec::new(); + + for relay in &self.p2p.relays { + let multiaddr = parse_relay_addr(relay)?; + + if multiaddr.iter().any(|protocol| protocol == Protocol::Http) { + warn!(address = %relay, "Insecure relay address provided, not HTTPS"); + } + + relays.push(multiaddr); + } + + pluto_p2p::config::P2PConfig { + relays, + external_ip: self.p2p.external_ip, + external_host: self.p2p.external_host, + tcp_addrs: self.p2p.tcp_addrs, + udp_addrs: self.p2p.udp_addrs, + disable_reuse_port: self.p2p.disable_reuseport, + } + }; + + Ok(pluto_dkg::dkg::Config::builder() + .def_file(self.definition_file) + .no_verify(self.no_verify) + .data_dir(self.data_dir) + .p2p(p2p_config) + .log(tracing_config) + .keymanager( + pluto_dkg::dkg::KeymanagerConfig::builder() + .address(self.keymanager_address) + .auth_token(self.keymanager_auth_token) + .build(), + ) + .publish( + pluto_dkg::dkg::PublishConfig::builder() + .address(self.publish_address) + .timeout(self.publish_timeout.into()) + .enabled(self.publish) + .build(), + ) + .shutdown_delay(self.shutdown_delay.into()) + .timeout(self.timeout.into()) + .execution_engine_addr(self.execution_client_rpc_endpoint) + .zipped(self.zipped) + .test_config(pluto_dkg::dkg::TestConfig::builder().build()) + .build()) + } +} + +/// P2P arguments for the `dkg` command. +#[derive(clap::Args, Clone, Debug)] +pub struct DkgP2PArgs { + #[arg( + long = "p2p-relays", + env = "CHARON_P2P_RELAYS", + value_delimiter = ',', + default_values_t = pluto_p2p::config::DEFAULT_RELAYS.map(String::from), + help = "Comma-separated list of libp2p relay URLs or multiaddrs." + )] + pub relays: Vec, + + #[arg( + long = "p2p-external-ip", + env = "CHARON_P2P_EXTERNAL_IP", + help = "The IP address advertised by libp2p. This may be used to advertise an external IP." + )] + pub external_ip: Option, + + #[arg( + long = "p2p-external-hostname", + env = "CHARON_P2P_EXTERNAL_HOSTNAME", + help = "The DNS hostname advertised by libp2p. This may be used to advertise an external DNS." + )] + pub external_host: Option, + + #[arg( + long = "p2p-tcp-address", + env = "CHARON_P2P_TCP_ADDRESS", + value_delimiter = ',', + help = "Comma-separated list of listening TCP addresses (ip and port) for libP2P traffic. Empty default doesn't bind to local port therefore only supports outgoing connections." + )] + pub tcp_addrs: Vec, + + #[arg( + long = "p2p-udp-address", + env = "CHARON_P2P_UDP_ADDRESS", + value_delimiter = ',', + help = "Comma-separated list of listening UDP addresses (ip and port) for libP2P traffic. Empty default doesn't bind to local port therefore only supports outgoing connections." + )] + pub udp_addrs: Vec, + + #[arg( + long = "p2p-disable-reuseport", + env = "CHARON_P2P_DISABLE_REUSEPORT", + default_value_t = false, + help = "Disables TCP port reuse for outgoing libp2p connections." + )] + pub disable_reuseport: bool, +} + +/// Logging arguments for the `dkg` command. +#[derive(clap::Args, Clone, Debug)] +pub struct DkgLogArgs { + #[arg( + long = "log-format", + env = "CHARON_LOG_FORMAT", + default_value = "console", + help = "Log format; console, logfmt or json" + )] + pub format: String, + + #[arg( + long = "log-level", + env = "CHARON_LOG_LEVEL", + default_value = "info", + help = "Log level; debug, info, warn or error" + )] + pub level: String, + + #[arg( + long = "log-color", + env = "CHARON_LOG_COLOR", + default_value = "auto", + help = "Log color; auto, force, disable." + )] + pub color: ConsoleColor, + + #[arg( + long = "log-output-path", + env = "CHARON_LOG_OUTPUT_PATH", + help = "Path in which to write on-disk logs." + )] + pub log_output_path: Option, +} + +/// Runs the `dkg` command from an already-built configuration. +pub async fn run(config: pluto_dkg::dkg::Config, ct: CancellationToken) -> Result<()> { + run_with_runner(config, ct, pluto_dkg::dkg::run).await +} + +async fn run_with_runner( + config: pluto_dkg::dkg::Config, + ct: CancellationToken, + runner: Runner, +) -> Result<()> +where + Runner: FnOnce(pluto_dkg::dkg::Config, CancellationToken) -> Fut, + Fut: Future>, +{ + info!("{LICENSE}"); + + runner(config, ct).await.map_err(Into::into) +} + +fn validate_p2p_args(args: &DkgP2PArgs) -> Result<()> { + if let Some(host) = &args.external_host { + url::Host::parse(host) + .map_err(|err| CliError::Other(format!("invalid hostname: {host}: {err}")))?; + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::cli::{Cli, Commands}; + use clap::Parser; + use libp2p::{Multiaddr, multiaddr}; + use std::{str::FromStr, sync::Arc, time::Duration as StdDuration}; + + #[test] + fn dkg_is_registered_as_top_level_subcommand() { + let cli = Cli::try_parse_from(["pluto", "dkg"]).expect("dkg command should parse"); + + match cli.command { + Commands::Dkg(_) => {} + _ => panic!("expected dkg command"), + } + } + + #[test] + fn dkg_defaults_match_go() { + let cli = Cli::try_parse_from(["pluto", "dkg"]).expect("dkg command should parse"); + + let Commands::Dkg(args) = cli.command else { + panic!("expected dkg command"); + }; + let args = *args; + + assert_eq!(args.data_dir, PathBuf::from(".charon")); + assert_eq!(args.definition_file, ".charon/cluster-definition.json"); + assert!(!args.no_verify); + assert_eq!(args.timeout, Duration::new(StdDuration::from_secs(60))); + assert_eq!( + args.publish_timeout, + Duration::new(StdDuration::from_secs(30)) + ); + assert_eq!( + args.shutdown_delay, + Duration::new(StdDuration::from_secs(1)) + ); + assert_eq!(args.publish_address, "https://api.obol.tech/v1"); + assert!(!args.publish); + assert!(!args.zipped); + assert_eq!( + args.p2p.relays, + pluto_p2p::config::DEFAULT_RELAYS.map(String::from).to_vec(), + ); + assert_eq!(args.log.level, "info"); + assert_eq!(args.log.format, "console"); + } + + #[test] + fn invalid_duration_fails_during_parse() { + let err = match Cli::try_parse_from(["pluto", "dkg", "--timeout=not-a-duration"]) { + Ok(_) => panic!("invalid duration should fail"), + Err(err) => err, + }; + + assert_eq!(err.kind(), clap::error::ErrorKind::ValueValidation); + } + + #[test] + fn dkg_args_expose_expected_env_bindings() { + use clap::CommandFactory; + + let command = Cli::command(); + let dkg = command + .get_subcommands() + .find(|subcommand| subcommand.get_name() == "dkg") + .expect("dkg subcommand should exist"); + + let expected = [ + ("data-dir", "CHARON_DATA_DIR"), + ("definition-file", "CHARON_DEFINITION_FILE"), + ("no-verify", "CHARON_NO_VERIFY"), + ("keymanager-address", "CHARON_KEYMANAGER_ADDRESS"), + ("keymanager-auth-token", "CHARON_KEYMANAGER_AUTH_TOKEN"), + ("p2p-relays", "CHARON_P2P_RELAYS"), + ("log-level", "CHARON_LOG_LEVEL"), + ("publish", "CHARON_PUBLISH"), + ("publish-timeout", "CHARON_PUBLISH_TIMEOUT"), + ("timeout", "CHARON_TIMEOUT"), + ]; + + for (arg_name, env_name) in expected { + let arg = dkg + .get_arguments() + .find(|arg| arg.get_long() == Some(arg_name)) + .unwrap_or_else(|| panic!("missing argument: {arg_name}")); + + let actual = arg + .get_env() + .map(|value| value.to_string_lossy().into_owned()); + assert_eq!(actual.as_deref(), Some(env_name)); + } + } + + #[test] + fn config_mapping_preserves_fields() { + let cli = Cli::try_parse_from([ + "pluto", + "dkg", + "--data-dir=/tmp/charon", + "--definition-file=/tmp/definition.json", + "--no-verify", + "--keymanager-address=https://keymanager.example", + "--keymanager-auth-token=token", + "--p2p-relays=https://relay.one,/ip4/127.0.0.1/tcp/9000", + "--p2p-external-ip=1.2.3.4", + "--p2p-external-hostname=example.com", + "--p2p-tcp-address=0.0.0.0:9000", + "--p2p-udp-address=0.0.0.0:9000", + "--p2p-disable-reuseport", + "--log-format=json", + "--log-level=debug", + "--log-color=force", + "--log-output-path=/tmp/pluto.log", + "--publish", + "--publish-address=https://api.example/v1", + "--publish-timeout=40s", + "--shutdown-delay=2s", + "--execution-client-rpc-endpoint=http://127.0.0.1:8545", + "--timeout=90s", + "--zipped", + ]) + .expect("dkg command should parse"); + + let Commands::Dkg(args) = cli.command else { + panic!("expected dkg command"); + }; + let args = *args; + + let config = args.into_config().expect("config should map"); + + assert_eq!(config.data_dir, PathBuf::from("/tmp/charon")); + assert_eq!(config.def_file, "/tmp/definition.json"); + assert!(config.no_verify); + assert_eq!(config.keymanager.address, "https://keymanager.example"); + assert_eq!(config.keymanager.auth_token, "token"); + assert_eq!( + config.p2p.relays, + vec![ + multiaddr::from_url("https://relay.one").expect("relay url"), + Multiaddr::from_str("/ip4/127.0.0.1/tcp/9000").expect("relay multiaddr") + ] + ); + assert_eq!(config.p2p.external_ip.as_deref(), Some("1.2.3.4")); + assert_eq!(config.p2p.external_host.as_deref(), Some("example.com")); + assert_eq!(config.p2p.tcp_addrs, vec!["0.0.0.0:9000".to_string()]); + assert_eq!(config.p2p.udp_addrs, vec!["0.0.0.0:9000".to_string()]); + assert!(config.p2p.disable_reuse_port); + assert_eq!(config.log.override_env_filter.as_deref(), Some("debug")); + let console = config.log.console.as_ref().expect("console config"); + assert!(console.with_ansi); + assert!(config.publish.enabled); + assert_eq!(config.publish.address, "https://api.example/v1"); + assert_eq!(config.publish.timeout, StdDuration::from_secs(40)); + assert_eq!(config.shutdown_delay, StdDuration::from_secs(2)); + assert_eq!(config.execution_engine_addr, "http://127.0.0.1:8545"); + assert_eq!(config.timeout, StdDuration::from_secs(90)); + assert!(config.zipped); + } + + #[tokio::test] + async fn run_passes_config_and_token_to_runner() { + let cli = Cli::try_parse_from([ + "pluto", + "dkg", + "--log-level=debug", + "--log-color=disable", + "--log-format=json", + "--log-output-path=/tmp/pluto.log", + ]) + .expect("dkg command should parse"); + let Commands::Dkg(args) = cli.command else { + panic!("expected dkg command"); + }; + let config = (*args).into_config().expect("config should map"); + + let events = Arc::new(std::sync::Mutex::new(Vec::new())); + let ct = CancellationToken::new(); + + run_with_runner(config, ct.clone(), { + let events = events.clone(); + move |config, token| async move { + assert!(!token.is_cancelled()); + assert_eq!(config.def_file, ".charon/cluster-definition.json"); + assert_eq!(config.log.override_env_filter.as_deref(), Some("debug")); + let console = config.log.console.as_ref().expect("console config"); + assert!(!console.with_ansi); + events.lock().expect("lock").push("runner"); + Ok(()) + } + }) + .await + .expect("dkg run should succeed"); + + assert_eq!(*events.lock().expect("lock"), vec!["runner"]); + } +} diff --git a/crates/cli/src/commands/mod.rs b/crates/cli/src/commands/mod.rs index e18c3e1f..50998b9a 100644 --- a/crates/cli/src/commands/mod.rs +++ b/crates/cli/src/commands/mod.rs @@ -1,4 +1,6 @@ +pub mod common; pub mod create_enr; +pub mod dkg; pub mod enr; pub mod relay; pub mod test; diff --git a/crates/cli/src/commands/relay.rs b/crates/cli/src/commands/relay.rs index a3b8825a..ab86bc54 100644 --- a/crates/cli/src/commands/relay.rs +++ b/crates/cli/src/commands/relay.rs @@ -1,19 +1,13 @@ -use crate::error::CliError; -use libp2p::{ - Multiaddr, - multiaddr::{self, Protocol}, +use crate::{ + commands::common::{ConsoleColor, LICENSE, build_console_tracing_config, parse_relay_addr}, + error::CliError, }; +use libp2p::multiaddr::Protocol; use pluto_p2p::k1; -use std::{path::PathBuf, str::FromStr}; +use std::path::PathBuf; use tokio_util::sync::CancellationToken; use tracing::{error, info}; -pub const LICENSE: &str = concat!( - "This software is licensed under the Maria DB Business Source License 1.1; ", - "you may not use this software except in compliance with this license. You may obtain a ", - "copy of this license at https://github.com/ObolNetwork/charon/blob/main/LICENSE" -); - /// Arguments for the relay command. #[derive(clap::Args, Clone)] pub struct RelayArgs { @@ -44,8 +38,7 @@ impl TryInto for RelayArgs { let mut relays = Vec::new(); for relay in &self.p2p.relays { - let multiaddr = - multiaddr::from_url(relay).or_else(|_| Multiaddr::from_str(relay))?; + let multiaddr = parse_relay_addr(relay)?; if multiaddr.iter().any(|protocol| protocol == Protocol::Http) { tracing::warn!( @@ -67,23 +60,7 @@ impl TryInto for RelayArgs { } }; - let log_config = { - let mut builder = pluto_tracing::TracingConfig::builder(); - - builder = builder.with_default_console(); - builder = match self.log.color { - ConsoleColor::Auto => builder.console_with_ansi(std::env::var("NO_COLOR").is_err()), - ConsoleColor::Force => builder.console_with_ansi(true), - ConsoleColor::Disable => builder.console_with_ansi(false), - }; - builder = builder.override_env_filter(self.log.level); - - // TODO: Handle loki config - - // TODO: Handle log output path - - builder.build() - }; + let log_config = build_console_tracing_config(self.log.level.clone(), &self.log.color); let builder = pluto_relay_server::config::Config::builder() .data_dir(self.data_dir.data_dir) @@ -186,19 +163,13 @@ pub struct RelayDebugMonitoringArgs { pub debug_addr: Option, } -const DEFAULT_RELAYS: [&str; 3] = [ - "https://0.relay.obol.tech", - "https://2.relay.obol.dev", - "https://1.relay.obol.tech", -]; - #[derive(clap::Args, Clone)] pub struct RelayP2PArgs { #[arg( long = "p2p-relays", env = "PLUTO_P2P_RELAYS", value_delimiter = ',', - default_values_t = DEFAULT_RELAYS.map(String::from), + default_values_t = pluto_p2p::config::DEFAULT_RELAYS.map(String::from), help = "Comma-separated list of libp2p relay URLs or multiaddrs." )] pub relays: Vec, @@ -271,14 +242,6 @@ pub struct RelayLogFlags { pub log_output_path: Option, } -#[derive(clap::ValueEnum, Clone, Default)] -pub enum ConsoleColor { - #[default] - Auto, - Force, - Disable, -} - #[derive(clap::Args, Clone)] pub struct RelayLokiArgs { #[arg( @@ -302,7 +265,7 @@ pub async fn run( config: pluto_relay_server::config::Config, ct: CancellationToken, ) -> Result<(), CliError> { - info!(LICENSE); + info!("{LICENSE}"); info!(config = ?config); let key = match pluto_p2p::k1::load_priv_key(&config.data_dir) { diff --git a/crates/cli/src/error.rs b/crates/cli/src/error.rs index f2ae55a4..b9c1fdba 100644 --- a/crates/cli/src/error.rs +++ b/crates/cli/src/error.rs @@ -72,6 +72,14 @@ pub enum CliError { #[error("Relay P2P error: {0}")] RelayP2PError(#[from] pluto_relay_server::error::RelayP2PError), + /// DKG command error. + #[error("DKG error: {0}")] + DkgError(#[from] pluto_dkg::dkg::DkgError), + + /// Tracing initialization error. + #[error("Tracing initialization error: {0}")] + TracingInit(#[from] pluto_tracing::init::Error), + /// Command parsing error. #[error("Command parsing error: {0}")] CommandParsingError(#[from] clap::Error), diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index b51d65de..271a9cb9 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -42,12 +42,21 @@ async fn run() -> std::result::Result<(), CliError> { } }); + dispatch(cli, ct).await +} + +async fn dispatch(cli: Cli, ct: CancellationToken) -> std::result::Result<(), CliError> { match cli.command { Commands::Create(args) => match args.command { CreateCommands::Enr(args) => commands::create_enr::run(args), }, Commands::Enr(args) => commands::enr::run(args), Commands::Version(args) => commands::version::run(args), + Commands::Dkg(args) => { + let config = (*args).into_config()?; + pluto_tracing::init(&config.log).expect("Failed to initialize tracing"); + commands::dkg::run(config, ct.clone()).await + } Commands::Relay(args) => { let config: pluto_relay_server::config::Config = (*args).clone().try_into()?; pluto_tracing::init(&config.log_config).expect("Failed to initialize tracing"); diff --git a/crates/dkg/Cargo.toml b/crates/dkg/Cargo.toml index 1d1a19b9..bd4af966 100644 --- a/crates/dkg/Cargo.toml +++ b/crates/dkg/Cargo.toml @@ -10,6 +10,7 @@ publish.workspace = true prost.workspace = true prost-types.workspace = true thiserror.workspace = true +bon.workspace = true libp2p.workspace = true futures.workspace = true tokio.workspace = true @@ -17,12 +18,15 @@ sha2.workspace = true tracing.workspace = true either.workspace = true k256.workspace = true +tokio-util.workspace = true pluto-k1util.workspace = true pluto-p2p.workspace = true pluto-cluster.workspace = true +pluto-app.workspace = true pluto-crypto.workspace = true pluto-eth1wrap.workspace = true pluto-eth2util.workspace = true +pluto-tracing.workspace = true hex.workspace = true rand.workspace = true serde.workspace = true diff --git a/crates/dkg/src/disk.rs b/crates/dkg/src/disk.rs index ab5505ec..93d943ad 100644 --- a/crates/dkg/src/disk.rs +++ b/crates/dkg/src/disk.rs @@ -338,11 +338,10 @@ mod tests { let json = serde_json::to_string(definition).unwrap(); tokio::fs::write(&definition_path, json).await.unwrap(); - let cfg = dkg::Config { - def_file: definition_path.to_string_lossy().into(), - no_verify: false, - ..Default::default() - }; + let cfg = dkg::Config::builder() + .def_file(definition_path.to_string_lossy().into_owned()) + .no_verify(false) + .build(); let client = noop_eth1_client().await; let actual = super::load_definition(&cfg, &client).await.unwrap(); @@ -352,11 +351,10 @@ mod tests { #[tokio::test] async fn load_definition_file_does_not_exist() { - let cfg = dkg::Config { - def_file: "".into(), - no_verify: false, - ..Default::default() - }; + let cfg = dkg::Config::builder() + .def_file(String::new()) + .no_verify(false) + .build(); let client = noop_eth1_client().await; let result = super::load_definition(&cfg, &client).await; @@ -369,11 +367,10 @@ mod tests { let tempfile = tempfile::NamedTempFile::new().unwrap(); tokio::fs::write(tempfile.path(), r#"{}"#).await.unwrap(); - let cfg = dkg::Config { - def_file: tempfile.path().to_string_lossy().into(), - no_verify: false, - ..Default::default() - }; + let cfg = dkg::Config::builder() + .def_file(tempfile.path().to_string_lossy().into_owned()) + .no_verify(false) + .build(); let client = noop_eth1_client().await; let result = super::load_definition(&cfg, &client).await; @@ -400,11 +397,10 @@ mod tests { }; tokio::fs::write(&definition_path, json).await.unwrap(); - let cfg = dkg::Config { - def_file: definition_path.to_string_lossy().into(), - no_verify: true, // Intentionally set to `true` to bypass verification - ..Default::default() - }; + let cfg = dkg::Config::builder() + .def_file(definition_path.to_string_lossy().into_owned()) + .no_verify(true) + .build(); let client = noop_eth1_client().await; let actual = super::load_definition(&cfg, &client).await.unwrap(); @@ -431,11 +427,10 @@ mod tests { }; tokio::fs::write(&definition_path, json).await.unwrap(); - let cfg = dkg::Config { - def_file: definition_path.to_string_lossy().into(), - no_verify: false, // Verify the definition - ..Default::default() - }; + let cfg = dkg::Config::builder() + .def_file(definition_path.to_string_lossy().into_owned()) + .no_verify(false) + .build(); let client = noop_eth1_client().await; let result = super::load_definition(&cfg, &client).await; @@ -551,8 +546,6 @@ mod tests { } async fn noop_eth1_client() -> pluto_eth1wrap::EthClient { - pluto_eth1wrap::EthClient::new("http://0.0.0.0:0") - .await - .unwrap() + pluto_eth1wrap::EthClient::new("").await.unwrap() } } diff --git a/crates/dkg/src/dkg.rs b/crates/dkg/src/dkg.rs index b3c751fa..358b09fa 100644 --- a/crates/dkg/src/dkg.rs +++ b/crates/dkg/src/dkg.rs @@ -1,23 +1,382 @@ -use std::path; +use std::{path, time::Duration}; + +use bon::Builder; +use tokio_util::sync::CancellationToken; +use tracing::warn; + +const DEFAULT_DATA_DIR: &str = ".charon"; +const DEFAULT_DEFINITION_FILE: &str = ".charon/cluster-definition.json"; +const DEFAULT_PUBLISH_ADDRESS: &str = "https://api.obol.tech/v1"; +const DEFAULT_PUBLISH_TIMEOUT: Duration = Duration::from_secs(30); +const DEFAULT_SHUTDOWN_DELAY: Duration = Duration::from_secs(1); +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60); + +/// Entry-point DKG error. +#[derive(Debug, thiserror::Error)] +pub enum DkgError { + /// Shutdown was requested before the DKG entrypoint started. + #[error("DKG shutdown requested before startup")] + ShutdownRequestedBeforeStartup, + + /// Keymanager address was provided without the auth token. + #[error( + "--keymanager-address provided but --keymanager-auth-token absent. Please fix configuration flags" + )] + MissingKeymanagerAuthToken, + + /// Keymanager auth token was provided without the address. + #[error( + "--keymanager-auth-token provided but --keymanager-address absent. Please fix configuration flags" + )] + MissingKeymanagerAddress, + + /// Failed to parse the keymanager address. + #[error("failed to parse keymanager addr: {addr}: {source}")] + InvalidKeymanagerAddress { + /// The address that failed to parse. + addr: String, + /// The parse error. + source: url::ParseError, + }, + + /// Failed to build the ETH1 client. + #[error("ETH1 client setup failed: {0}")] + Eth1Client(#[from] pluto_eth1wrap::EthClientError), + + /// Disk or definition preflight failed. + #[error("DKG preflight failed: {0}")] + Disk(#[from] crate::disk::DiskError), + + /// Failed to verify keymanager connectivity. + #[error("verify keymanager address: {0}")] + Keymanager(#[from] pluto_eth2util::keymanager::KeymanagerError), +} + +/// Keymanager configuration accepted by the entrypoint. +#[derive(Debug, Clone, Default, Builder)] +pub struct KeymanagerConfig { + /// The keymanager URL. + pub address: String, + /// Bearer token used for authentication. + pub auth_token: String, +} + +/// Publish configuration accepted by the entrypoint. +#[derive(Debug, Clone, Builder)] +pub struct PublishConfig { + /// Publish API base address. + pub address: String, + /// Publish timeout. + pub timeout: Duration, + /// Whether publishing is enabled. + pub enabled: bool, +} + +impl Default for PublishConfig { + fn default() -> Self { + Self { + address: DEFAULT_PUBLISH_ADDRESS.to_string(), + timeout: DEFAULT_PUBLISH_TIMEOUT, + enabled: false, + } + } +} /// DKG configuration -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Builder)] pub struct Config { /// Path to the definition file. Can be an URL or an absolute path on disk. + #[builder(default = DEFAULT_DEFINITION_FILE.to_string())] pub def_file: String, /// Skip cluster definition verification. + #[builder(default)] pub no_verify: bool, /// Data directory to store generated keys and other DKG artifacts. + #[builder(default = path::PathBuf::from(DEFAULT_DATA_DIR))] pub data_dir: path::PathBuf, + /// P2P entrypoint configuration. + #[builder(default = default_p2p_config())] + pub p2p: pluto_p2p::config::P2PConfig, + + /// Shared tracing configuration for the DKG entrypoint. + #[builder(default = default_tracing_config())] + pub log: pluto_tracing::TracingConfig, + + /// Keymanager configuration. + #[builder(default)] + pub keymanager: KeymanagerConfig, + + /// Publish configuration. + #[builder(default)] + pub publish: PublishConfig, + + /// Graceful shutdown delay after completion. + #[builder(default = DEFAULT_SHUTDOWN_DELAY)] + pub shutdown_delay: Duration, + + /// Overall DKG timeout. + #[builder(default = DEFAULT_TIMEOUT)] + pub timeout: Duration, + + /// Execution engine JSON-RPC endpoint. + #[builder(default)] + pub execution_engine_addr: String, + + /// Whether to bundle the output directory as a tarball. + #[builder(default)] + pub zipped: bool, + /// Test configuration, used for testing purposes. + #[builder(default)] pub test_config: TestConfig, } +impl Config { + /// Returns `true` if any test-only configuration is active. + pub fn has_test_config(&self) -> bool { + // TODO: Extend this when more test-only hooks are added to TestConfig, + // so preflight skips stay aligned with the full test configuration. + self.test_config.def.is_some() + } +} + /// Additional test-only config for DKG. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, Default, Builder)] pub struct TestConfig { /// Provides the cluster definition explicitly, skips loading from disk. pub def: Option, } + +fn default_p2p_config() -> pluto_p2p::config::P2PConfig { + pluto_p2p::config::P2PConfig { + relays: pluto_p2p::config::default_relay_multiaddrs(), + ..Default::default() + } +} + +fn default_tracing_config() -> pluto_tracing::TracingConfig { + pluto_tracing::TracingConfig::builder() + .with_default_console() + .override_env_filter("info") + .build() +} + +/// Runs the DKG entrypoint until the unported backend boundary. +pub async fn run(conf: Config, shutdown: CancellationToken) -> Result<(), DkgError> { + if shutdown.is_cancelled() { + return Err(DkgError::ShutdownRequestedBeforeStartup); + } + + let eth1 = pluto_eth1wrap::EthClient::new(&conf.execution_engine_addr).await?; + + let _definition = crate::disk::load_definition(&conf, ð1).await?; + + validate_keymanager_flags(&conf)?; + verify_keymanager_connection(&conf).await?; + + if !conf.has_test_config() { + crate::disk::check_clear_data_dir(&conf.data_dir).await?; + } + crate::disk::check_writes(&conf.data_dir).await?; + + unimplemented!("DKG ceremony backend is not implemented yet"); +} + +fn validate_keymanager_flags(conf: &Config) -> Result<(), DkgError> { + let addr = conf.keymanager.address.as_str(); + let auth_token = conf.keymanager.auth_token.as_str(); + + if !addr.is_empty() && auth_token.is_empty() { + return Err(DkgError::MissingKeymanagerAuthToken); + } + + if addr.is_empty() && !auth_token.is_empty() { + return Err(DkgError::MissingKeymanagerAddress); + } + + if addr.is_empty() { + return Ok(()); + } + + let parsed = url::Url::parse(addr).map_err(|source| DkgError::InvalidKeymanagerAddress { + addr: addr.to_string(), + source, + })?; + + if parsed.scheme() == "http" { + warn!(addr = addr, "Keymanager URL does not use https protocol"); + } + + Ok(()) +} + +async fn verify_keymanager_connection(conf: &Config) -> Result<(), DkgError> { + let addr = conf.keymanager.address.as_str(); + + if addr.is_empty() { + return Ok(()); + } + + let client = pluto_eth2util::keymanager::Client::new(addr, &conf.keymanager.auth_token)?; + client.verify_connection().await?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn config_builder_defaults_match_charon() { + let config = Config::builder().build(); + + assert_eq!(config.def_file, DEFAULT_DEFINITION_FILE); + assert!(!config.no_verify); + assert_eq!(config.data_dir, path::PathBuf::from(DEFAULT_DATA_DIR)); + assert_eq!( + config.p2p.relays, + pluto_p2p::config::default_relay_multiaddrs() + ); + assert_eq!(config.log.override_env_filter.as_deref(), Some("info")); + assert!(config.log.console.is_some()); + assert_eq!(config.publish.address, DEFAULT_PUBLISH_ADDRESS); + assert_eq!(config.publish.timeout, DEFAULT_PUBLISH_TIMEOUT); + assert!(!config.publish.enabled); + assert_eq!(config.shutdown_delay, DEFAULT_SHUTDOWN_DELAY); + assert_eq!(config.timeout, DEFAULT_TIMEOUT); + assert_eq!(config.execution_engine_addr, ""); + assert!(!config.zipped); + assert!(config.test_config.def.is_none()); + } + + #[tokio::test] + async fn run_rejects_mismatched_keymanager_flags() { + let (lock, ..) = pluto_cluster::test_cluster::new_for_test(1, 3, 4, 0); + + let err = run( + Config::builder() + .test_config(TestConfig::builder().def(lock.definition.clone()).build()) + .keymanager( + KeymanagerConfig::builder() + .address("https://keymanager.example".to_string()) + .auth_token(String::new()) + .build(), + ) + .build(), + CancellationToken::new(), + ) + .await + .expect_err("mismatched keymanager flags should fail"); + + assert!(matches!(err, DkgError::MissingKeymanagerAuthToken)); + } + + #[tokio::test] + async fn verify_keymanager_connection_succeeds_for_reachable_address() { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("listener should bind"); + let addr = format!("http://{}", listener.local_addr().expect("local addr")); + + let config = Config::builder() + .keymanager( + KeymanagerConfig::builder() + .address(addr) + .auth_token("token".to_string()) + .build(), + ) + .build(); + + verify_keymanager_connection(&config) + .await + .expect("reachable keymanager should verify"); + } + + #[tokio::test] + async fn verify_keymanager_connection_fails_for_unreachable_address() { + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("listener should bind"); + let addr = format!("http://{}", listener.local_addr().expect("local addr")); + drop(listener); + + let config = Config::builder() + .keymanager( + KeymanagerConfig::builder() + .address(addr) + .auth_token("token".to_string()) + .build(), + ) + .build(); + + let err = verify_keymanager_connection(&config) + .await + .expect_err("unreachable keymanager should fail"); + + assert!(matches!(err, DkgError::Keymanager(_))); + } + + #[tokio::test] + async fn run_executes_preflight_before_reaching_backend_boundary() { + let tempdir = tempfile::tempdir().expect("tempdir"); + let definition_path = tempdir.path().join("cluster-definition.json"); + let private_key_path = tempdir.path().join("charon-enr-private-key"); + + tokio::fs::write(&private_key_path, b"dummy") + .await + .expect("private key"); + + let (lock, ..) = pluto_cluster::test_cluster::new_for_test(1, 3, 4, 0); + let definition = serde_json::to_string(&lock.definition).expect("definition json"); + tokio::fs::write(&definition_path, definition) + .await + .expect("definition file"); + + let join_err = tokio::spawn(async move { + run( + Config::builder() + .data_dir(tempdir.path().to_path_buf()) + .def_file(definition_path.to_string_lossy().into_owned()) + .no_verify(true) + .build(), + CancellationToken::new(), + ) + .await + }) + .await + .expect_err("backend handoff should panic until implemented"); + + assert!(join_err.is_panic()); + } + + #[tokio::test] + async fn run_surfaces_data_dir_preflight_errors() { + let tempdir = tempfile::tempdir().expect("tempdir"); + let definition_path = tempdir.path().join("cluster-definition.json"); + + let (lock, ..) = pluto_cluster::test_cluster::new_for_test(1, 3, 4, 0); + let definition = serde_json::to_string(&lock.definition).expect("definition json"); + tokio::fs::write(&definition_path, definition) + .await + .expect("definition file"); + + let err = run( + Config::builder() + .data_dir(tempdir.path().to_path_buf()) + .def_file(definition_path.to_string_lossy().into_owned()) + .no_verify(true) + .build(), + CancellationToken::new(), + ) + .await + .expect_err("missing private key should fail preflight"); + + assert!(matches!( + err, + DkgError::Disk(crate::disk::DiskError::MissingRequiredFiles { .. }) + )); + } +} diff --git a/crates/eth1wrap/Cargo.toml b/crates/eth1wrap/Cargo.toml index 2949c0be..625295bc 100644 --- a/crates/eth1wrap/Cargo.toml +++ b/crates/eth1wrap/Cargo.toml @@ -11,5 +11,8 @@ alloy.workspace = true url.workspace = true thiserror.workspace = true +[dev-dependencies] +tokio.workspace = true + [lints] workspace = true diff --git a/crates/eth1wrap/src/lib.rs b/crates/eth1wrap/src/lib.rs index b09db412..006f64ab 100644 --- a/crates/eth1wrap/src/lib.rs +++ b/crates/eth1wrap/src/lib.rs @@ -18,6 +18,10 @@ type Result = std::result::Result; /// Defines errors that can occur when interacting with the Ethereum client. #[derive(Debug, thiserror::Error)] pub enum EthClientError { + /// No execution engine endpoint was configured. + #[error("execution engine endpoint is not set")] + NoExecutionEngineAddr, + /// An RPC error. #[error("RPC error: {0}")] RpcTransportError(#[from] alloy::transports::RpcError), @@ -36,20 +40,20 @@ pub enum EthClientError { } /// Defines the interface for the Ethereum EL RPC client. -pub struct EthClient(DynProvider); - -impl std::ops::Deref for EthClient { - type Target = DynProvider; - - fn deref(&self) -> &DynProvider { - &self.0 - } -} +/// +/// `None` means no execution engine endpoint was configured, so the client +/// behaves as a no-op until a contract-signature verification path requires a +/// real EL provider. +pub struct EthClient(Option); impl EthClient { /// Create a new `EthClient` connected to the given address using defaults - /// for retry. + /// for retry, or a no-op client if no address is configured. pub async fn new(address: impl AsRef) -> Result { + if address.as_ref().trim().is_empty() { + return Ok(EthClient(None)); + } + // The maximum number of retries for rate limit errors. const MAX_RETRY: u32 = 10; // The initial backoff in milliseconds. @@ -66,7 +70,7 @@ impl EthClient { let provider = ProviderBuilder::new().connect_client(client); - Ok(EthClient(provider.erased())) + Ok(EthClient(Some(provider.erased()))) } /// Check if `sig` is a valid signature of `hash` according to ERC-1271. @@ -78,10 +82,13 @@ impl EthClient { ) -> Result { // Magic value defined in [ERC-1271](https://eips.ethereum.org/EIPS/eip-1271). const MAGIC_VALUE: [u8; 4] = [0x16, 0x26, 0xba, 0x7e]; + let Some(provider) = &self.0 else { + return Err(EthClientError::NoExecutionEngineAddr); + }; let address = alloy::primitives::Address::parse_checksummed(contract_address, None)?; - let instance = IERC1271::new(address, &self.0); + let instance = IERC1271::new(address, provider); let call = instance .isValidSignature(hash.into(), sig.to_vec().into()) @@ -91,3 +98,23 @@ impl EthClient { Ok(call == MAGIC_VALUE) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn empty_address_returns_noop_client() { + let client = EthClient::new("").await.expect("noop eth client"); + let err = client + .verify_smart_contract_based_signature( + "0x0000000000000000000000000000000000000000", + [0u8; 32], + &[], + ) + .await + .expect_err("empty address should not verify contract signatures"); + + assert!(matches!(err, EthClientError::NoExecutionEngineAddr)); + } +} diff --git a/crates/p2p/src/config.rs b/crates/p2p/src/config.rs index 56ac7b29..ac0da1f0 100644 --- a/crates/p2p/src/config.rs +++ b/crates/p2p/src/config.rs @@ -8,6 +8,13 @@ use std::{ use libp2p::{Multiaddr, multiaddr, ping}; +/// Shared default relay endpoints used by commands and P2P-facing configs. +pub const DEFAULT_RELAYS: [&str; 3] = [ + "https://0.relay.obol.tech", + "https://2.relay.obol.dev", + "https://1.relay.obol.tech", +]; + /// P2P configuration error. #[derive(Debug, thiserror::Error)] pub enum P2PConfigError { @@ -101,6 +108,14 @@ impl P2PConfig { } } +/// Returns the default relay endpoints parsed as [`Multiaddr`]s. +pub fn default_relay_multiaddrs() -> Vec { + DEFAULT_RELAYS + .iter() + .map(|relay| multiaddr::from_url(relay).expect("default relay should parse")) + .collect() +} + /// Builder for [`P2PConfig`]. #[derive(Default, Debug, Clone)] pub struct P2PConfigBuilder {