diff --git a/Cargo.lock b/Cargo.lock index cdb0cddb..33def73b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -373,7 +373,7 @@ dependencies = [ "derive_more", "foldhash 0.2.0", "hashbrown 0.16.1", - "indexmap 2.13.0", + "indexmap 2.13.1", "itoa", "k256", "keccak-asm", @@ -428,9 +428,9 @@ dependencies = [ [[package]] name = "alloy-rlp" -version = "0.3.13" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e93e50f64a77ad9c5470bf2ad0ca02f228da70c792a8f06634801e202579f35e" +checksum = "dc90b1e703d3c03f4ff7f48e82dd0bc1c8211ab7d079cd836a06fcfeb06651cb" dependencies = [ "alloy-rlp-derive", "arrayvec", @@ -439,9 +439,9 @@ dependencies = [ [[package]] name = "alloy-rlp-derive" -version = "0.3.13" +version = "0.3.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce8849c74c9ca0f5a03da1c865e3eb6f768df816e67dd3721a398a8a7e398011" +checksum = "f36834a5c0a2fa56e171bf256c34d70fca07d0c0031583edea1c4946b7889c9e" dependencies = [ "proc-macro2", "quote", @@ -581,7 +581,7 @@ dependencies = [ "alloy-sol-macro-input", "const-hex", "heck", - "indexmap 2.13.0", + "indexmap 2.13.1", "proc-macro-error2", "proc-macro2", "quote", @@ -1569,7 +1569,7 @@ version = "10.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06acb4f71407ba205a07cb453211e0e6a67b21904e47f6ba1f9589e38f2e454" dependencies = [ - "semver 1.0.27", + "semver 1.0.28", "serde", "toml", "url", @@ -1592,9 +1592,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.58" +version = "1.2.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1e928d4b69e3077709075a938a05ffbedfa53a84c8f766efbf8220bb1ff60e1" +checksum = "b7a4d3ec6524d28a329fc53654bbadc9bdd7b0431f5d65f1a56ffb28a1ee5283" dependencies = [ "find-msvc-tools", "jobserver", @@ -2574,9 +2574,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +checksum = "a043dc74da1e37d6afe657061213aa6f425f855399a11d3463c6ecccc4dfda1f" [[package]] name = "fastrlp" @@ -2999,7 +2999,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap 2.13.0", + "indexmap 2.13.1", "slab", "tokio", "tokio-util", @@ -3072,6 +3072,15 @@ dependencies = [ "hashbrown 0.15.5", ] +[[package]] +name = "hashlink" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea0b22561a9c04a7cb1a302c013e0259cd3b4bb619f145b32f72b8b4bcbed230" +dependencies = [ + "hashbrown 0.16.1", +] + [[package]] name = "heck" version = "0.5.0" @@ -3238,9 +3247,9 @@ checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" [[package]] name = "hyper" -version = "1.8.1" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" +checksum = "6299f016b246a94207e63da54dbe807655bf9e00044f73ded42c3ac5305fbcca" dependencies = [ "atomic-waker", "bytes", @@ -3253,7 +3262,6 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "pin-utils", "smallvec", "tokio", "want", @@ -3385,12 +3393,13 @@ dependencies = [ [[package]] name = "icu_collections" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43" +checksum = "2984d1cd16c883d7935b9e07e44071dca8d917fd52ecc02c04d5fa0b5a3f191c" dependencies = [ "displaydoc", "potential_utf", + "utf8_iter", "yoke", "zerofrom", "zerovec", @@ -3398,9 +3407,9 @@ dependencies = [ [[package]] name = "icu_locale_core" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6" +checksum = "92219b62b3e2b4d88ac5119f8904c10f8f61bf7e95b640d25ba3075e6cac2c29" dependencies = [ "displaydoc", "litemap", @@ -3411,9 +3420,9 @@ dependencies = [ [[package]] name = "icu_normalizer" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599" +checksum = "c56e5ee99d6e3d33bd91c5d85458b6005a22140021cc324cea84dd0e72cff3b4" dependencies = [ "icu_collections", "icu_normalizer_data", @@ -3425,15 +3434,15 @@ dependencies = [ [[package]] name = "icu_normalizer_data" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" +checksum = "da3be0ae77ea334f4da67c12f149704f19f81d1adf7c51cf482943e84a2bad38" [[package]] name = "icu_properties" -version = "2.1.2" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" +checksum = "bee3b67d0ea5c2cca5003417989af8996f8604e34fb9ddf96208a033901e70de" dependencies = [ "icu_collections", "icu_locale_core", @@ -3445,15 +3454,15 @@ dependencies = [ [[package]] name = "icu_properties_data" -version = "2.1.2" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" +checksum = "8e2bbb201e0c04f7b4b3e14382af113e17ba4f63e2c9d2ee626b720cbce54a14" [[package]] name = "icu_provider" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614" +checksum = "139c4cf31c8b5f33d7e199446eff9c1e02decfc2f0eec2c8d71f65befa45b421" dependencies = [ "displaydoc", "icu_locale_core", @@ -3584,9 +3593,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.13.0" +version = "2.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" +checksum = "45a8a2b9cb3e0b0c1803dbb0758ffac5de2f425b23c28f518faabd9d805342ff" dependencies = [ "equivalent", "hashbrown 0.16.1", @@ -3727,9 +3736,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.93" +version = "0.3.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "797146bb2677299a1eb6b7b50a890f4c361b29ef967addf5b2fa45dae1bb6d7d" +checksum = "2e04e2ef80ce82e13552136fabeef8a5ed1f985a96805761cbb9a2c34e7664d9" dependencies = [ "cfg-if", "futures-util", @@ -3785,9 +3794,9 @@ checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" [[package]] name = "libc" -version = "0.2.183" +version = "0.2.184" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5b646652bf6661599e1da8901b3b9522896f01e736bad5f723fe7a3a27f899d" +checksum = "48f5d2a454e16a5ea0f4ced81bd44e4cfc7bd3a507b61887c99fd3538b28e4af" [[package]] name = "libgit2-sys" @@ -4263,15 +4272,16 @@ dependencies = [ [[package]] name = "libp2p-rendezvous" -version = "0.17.0" +version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15285d828c2b4a34cb660c2e74cd6938116daceab1f4357bae933d5b08cca933" +checksum = "31114bab295403e9934ae2e4415c45d681353829ea218390eed8f5bcc82dd1fb" dependencies = [ "async-trait", "asynchronous-codec", "bimap", "futures", "futures-timer", + "hashlink 0.11.0", "libp2p-core", "libp2p-identity", "libp2p-request-response", @@ -4532,9 +4542,9 @@ dependencies = [ [[package]] name = "libz-sys" -version = "1.1.25" +version = "1.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d52f4c29e2a68ac30c9087e1b772dc9f44a2b66ed44edf2266cf2be9b03dafc1" +checksum = "fc3a226e576f50782b3305c5ccf458698f92798987f551c6a02efe8276721e22" dependencies = [ "cc", "libc", @@ -4550,9 +4560,9 @@ checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" [[package]] name = "litemap" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" +checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" [[package]] name = "lock_api" @@ -5016,7 +5026,7 @@ dependencies = [ "log", "once_cell", "regex", - "semver 1.0.27", + "semver 1.0.28", "serde", "serde_json", "serde_yaml", @@ -5036,7 +5046,7 @@ dependencies = [ "eventsource-stream", "futures-core", "http", - "indexmap 2.13.0", + "indexmap 2.13.1", "oas3", "prettyplease", "proc-macro2", @@ -5309,7 +5319,7 @@ checksum = "8701b58ea97060d5e5b155d383a69952a60943f0e6dfe30b04c287beb0b27455" dependencies = [ "fixedbitset", "hashbrown 0.15.5", - "indexmap 2.13.0", + "indexmap 2.13.1", ] [[package]] @@ -5509,22 +5519,29 @@ name = "pluto-core" version = "1.7.1" dependencies = [ "alloy", + "anyhow", "async-trait", "base64 0.22.1", "built", "cancellation", "chrono", + "clap", "crossbeam", "dyn-clone", "dyn-eq", "futures", + "futures-timer", "hex", + "k256", "libp2p", "pluto-build-proto", + "pluto-cluster", "pluto-eth2api", "pluto-eth2util", + "pluto-p2p", "pluto-ssz", "pluto-testutil", + "pluto-tracing", "prost 0.14.3", "prost-types 0.14.3", "rand 0.8.5", @@ -5538,6 +5555,7 @@ dependencies = [ "tokio-util", "tracing", "tree_hash", + "unsigned-varint 0.8.0", "vise", "wiremock", ] @@ -5717,6 +5735,29 @@ dependencies = [ "vise-exporter", ] +[[package]] +name = "pluto-parsigex" +version = "1.7.1" +dependencies = [ + "anyhow", + "clap", + "either", + "futures", + "hex", + "libp2p", + "pluto-cluster", + "pluto-core", + "pluto-p2p", + "pluto-tracing", + "prost 0.14.3", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tokio-util", + "tracing", + "unsigned-varint 0.8.0", +] + [[package]] name = "pluto-peerinfo" version = "1.7.1" @@ -5855,9 +5896,9 @@ checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" [[package]] name = "potential_utf" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77" +checksum = "0103b1cef7ec0cf76490e969665504990193874ea05c85ff9bab8b911d0a0564" dependencies = [ "zerovec", ] @@ -5913,7 +5954,7 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e67ba7e9b2b56446f1d419b1d807906278ffa1a658a8a5d8a39dcb1f5a78614f" dependencies = [ - "toml_edit 0.25.8+spec-1.1.0", + "toml_edit 0.25.10+spec-1.1.0", ] [[package]] @@ -6615,7 +6656,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" dependencies = [ - "semver 1.0.27", + "semver 1.0.28", ] [[package]] @@ -6891,9 +6932,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" +checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd" dependencies = [ "serde", "serde_core", @@ -6959,7 +7000,7 @@ version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" dependencies = [ - "indexmap 2.13.0", + "indexmap 2.13.1", "itoa", "memchr", "serde", @@ -7020,7 +7061,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.13.0", + "indexmap 2.13.1", "schemars 0.9.0", "schemars 1.2.1", "serde_core", @@ -7047,7 +7088,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.13.0", + "indexmap 2.13.1", "itoa", "ryu", "serde", @@ -7578,9 +7619,9 @@ dependencies = [ [[package]] name = "tinystr" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869" +checksum = "c8323304221c2a851516f22236c5722a72eaa19749016521d6dff0824447d96d" dependencies = [ "displaydoc", "zerovec", @@ -7613,9 +7654,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.50.0" +version = "1.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27ad5e34374e03cfffefc301becb44e9dc3c17584f414349ebe29ed26661822d" +checksum = "2bd1c4c0fc4a7ab90fc15ef6daaa3ec3b893f004f915f2392557ed23237820cd" dependencies = [ "bytes", "libc", @@ -7630,9 +7671,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.6.1" +version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" +checksum = "385a6cb71ab9ab790c5fe8d67f1645e6c450a7ce006a33de03daa956cf70a496" dependencies = [ "proc-macro2", "quote", @@ -7707,9 +7748,9 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "1.1.0+spec-1.1.0" +version = "1.1.1+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97251a7c317e03ad83774a8752a7e81fb6067740609f75ea2b585b569a59198f" +checksum = "3165f65f62e28e0115a00b2ebdd37eb6f3b641855f9d636d3cd4103767159ad7" dependencies = [ "serde_core", ] @@ -7720,7 +7761,7 @@ version = "0.22.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ - "indexmap 2.13.0", + "indexmap 2.13.1", "serde", "serde_spanned", "toml_datetime 0.6.11", @@ -7730,21 +7771,21 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.25.8+spec-1.1.0" +version = "0.25.10+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16bff38f1d86c47f9ff0647e6838d7bb362522bdf44006c7068c2b1e606f1f3c" +checksum = "a82418ca169e235e6c399a84e395ab6debeb3bc90edc959bf0f48647c6a32d1b" dependencies = [ - "indexmap 2.13.0", - "toml_datetime 1.1.0+spec-1.1.0", + "indexmap 2.13.1", + "toml_datetime 1.1.1+spec-1.1.0", "toml_parser", "winnow 1.0.1", ] [[package]] name = "toml_parser" -version = "1.1.0+spec-1.1.0" +version = "1.1.2+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2334f11ee363607eb04df9b8fc8a13ca1715a72ba8662a26ac285c98aabb4011" +checksum = "a2abe9b86193656635d2411dc43050282ca48aa31c2451210f4202550afb7526" dependencies = [ "winnow 1.0.1", ] @@ -7803,7 +7844,7 @@ checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" dependencies = [ "futures-core", "futures-util", - "indexmap 2.13.0", + "indexmap 2.13.1", "pin-project-lite", "slab", "sync_wrapper", @@ -8296,9 +8337,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.116" +version = "0.2.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dc0882f7b5bb01ae8c5215a1230832694481c1a4be062fd410e12ea3da5b631" +checksum = "0551fc1bb415591e3372d0bc4780db7e587d84e2a7e79da121051c5c4b89d0b0" dependencies = [ "cfg-if", "once_cell", @@ -8309,9 +8350,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.66" +version = "0.4.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19280959e2844181895ef62f065c63e0ca07ece4771b53d89bfdb967d97cbf05" +checksum = "03623de6905b7206edd0a75f69f747f134b7f0a2323392d664448bf2d3c5d87e" dependencies = [ "js-sys", "wasm-bindgen", @@ -8319,9 +8360,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.116" +version = "0.2.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75973d3066e01d035dbedaad2864c398df42f8dd7b1ea057c35b8407c015b537" +checksum = "7fbdf9a35adf44786aecd5ff89b4563a90325f9da0923236f6104e603c7e86be" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -8329,9 +8370,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.116" +version = "0.2.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91af5e4be765819e0bcfee7322c14374dc821e35e72fa663a830bbc7dc199eac" +checksum = "dca9693ef2bab6d4e6707234500350d8dad079eb508dca05530c85dc3a529ff2" dependencies = [ "bumpalo", "proc-macro2", @@ -8342,9 +8383,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.116" +version = "0.2.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9bf0406a78f02f336bf1e451799cca198e8acde4ffa278f0fb20487b150a633" +checksum = "39129a682a6d2d841b6c429d0c51e5cb0ed1a03829d8b3d1e69a011e62cb3d3b" dependencies = [ "unicode-ident", ] @@ -8366,7 +8407,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" dependencies = [ "anyhow", - "indexmap 2.13.0", + "indexmap 2.13.1", "wasm-encoder", "wasmparser", ] @@ -8392,8 +8433,8 @@ checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ "bitflags", "hashbrown 0.15.5", - "indexmap 2.13.0", - "semver 1.0.27", + "indexmap 2.13.1", + "semver 1.0.28", ] [[package]] @@ -8412,9 +8453,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.93" +version = "0.3.94" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "749466a37ee189057f54748b200186b59a03417a117267baf3fd89cecc9fb837" +checksum = "cd70027e39b12f0849461e08ffc50b9cd7688d942c1c8e3c7b22273236b4dd0a" dependencies = [ "js-sys", "wasm-bindgen", @@ -8959,7 +9000,7 @@ checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" dependencies = [ "anyhow", "heck", - "indexmap 2.13.0", + "indexmap 2.13.1", "prettyplease", "syn 2.0.117", "wasm-metadata", @@ -8990,7 +9031,7 @@ checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", "bitflags", - "indexmap 2.13.0", + "indexmap 2.13.1", "log", "serde", "serde_derive", @@ -9009,9 +9050,9 @@ checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" dependencies = [ "anyhow", "id-arena", - "indexmap 2.13.0", + "indexmap 2.13.1", "log", - "semver 1.0.27", + "semver 1.0.28", "serde", "serde_derive", "serde_json", @@ -9021,9 +9062,9 @@ dependencies = [ [[package]] name = "writeable" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +checksum = "1ffae5123b2d3fc086436f8834ae3ab053a283cfac8fe0a0b8eaae044768a4c4" [[package]] name = "wyz" @@ -9130,9 +9171,9 @@ dependencies = [ [[package]] name = "yoke" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954" +checksum = "abe8c5fda708d9ca3df187cae8bfb9ceda00dd96231bed36e445a1a48e66f9ca" dependencies = [ "stable_deref_trait", "yoke-derive", @@ -9141,9 +9182,9 @@ dependencies = [ [[package]] name = "yoke-derive" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" +checksum = "de844c262c8848816172cef550288e7dc6c7b7814b4ee56b3e1553f275f1858e" dependencies = [ "proc-macro2", "quote", @@ -9173,18 +9214,18 @@ dependencies = [ [[package]] name = "zerofrom" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" +checksum = "69faa1f2a1ea75661980b013019ed6687ed0e83d069bc1114e2cc74c6c04c4df" dependencies = [ "zerofrom-derive", ] [[package]] name = "zerofrom-derive" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" +checksum = "11532158c46691caf0f2593ea8358fed6bbf68a0315e80aae9bd41fbade684a1" dependencies = [ "proc-macro2", "quote", @@ -9214,9 +9255,9 @@ dependencies = [ [[package]] name = "zerotrie" -version = "0.2.3" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a59c17a5562d507e4b54960e8569ebee33bee890c70aa3fe7b97e85a9fd7851" +checksum = "0f9152d31db0792fa83f70fb2f83148effb5c1f5b8c7686c3459e361d9bc20bf" dependencies = [ "displaydoc", "yoke", @@ -9225,9 +9266,9 @@ dependencies = [ [[package]] name = "zerovec" -version = "0.11.5" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002" +checksum = "90f911cbc359ab6af17377d242225f4d75119aec87ea711a880987b18cd7b239" dependencies = [ "yoke", "zerofrom", @@ -9236,9 +9277,9 @@ dependencies = [ [[package]] name = "zerovec-derive" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" +checksum = "625dc425cab0dca6dc3c3319506e6593dcb08a9f387ea3b284dbd52a92c40555" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index f0a08095..a230786f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "crates/app", + "crates/parsigex", "crates/build-proto", "crates/cli", "crates/cluster", @@ -99,6 +100,7 @@ wiremock = "0.6" # Crates in the workspace pluto-app = { path = "crates/app" } +pluto-parsigex = { path = "crates/parsigex" } pluto-build-proto = { path = "crates/build-proto" } pluto-cli = { path = "crates/cli" } pluto-cluster = { path = "crates/cluster" } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index a387ba92..ec4407c4 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -12,6 +12,7 @@ cancellation.workspace = true chrono.workspace = true crossbeam.workspace = true futures.workspace = true +futures-timer.workspace = true dyn-clone.workspace = true dyn-eq.workspace = true hex.workspace = true @@ -31,18 +32,25 @@ tokio-util.workspace = true tracing.workspace = true pluto-eth2util.workspace = true tree_hash.workspace = true +unsigned-varint.workspace = true [dev-dependencies] +anyhow.workspace = true alloy.workspace = true +clap.workspace = true rand.workspace = true libp2p.workspace = true +k256.workspace = true prost.workspace = true prost-types.workspace = true hex.workspace = true chrono.workspace = true test-case.workspace = true pluto-eth2util.workspace = true +pluto-cluster.workspace = true +pluto-p2p.workspace = true pluto-testutil.workspace = true +pluto-tracing.workspace = true tokio = { workspace = true, features = ["test-util"] } wiremock.workspace = true pluto-ssz.workspace = true diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index ac709968..2e356963 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -26,6 +26,10 @@ pub mod deadline; /// parsigdb pub mod parsigdb; +mod parsigex_codec; + +pub use parsigex_codec::ParSigExCodecError; + /// Test utilities. #[cfg(test)] pub mod testutils; diff --git a/crates/core/src/parsigex_codec.rs b/crates/core/src/parsigex_codec.rs new file mode 100644 index 00000000..e3963dc5 --- /dev/null +++ b/crates/core/src/parsigex_codec.rs @@ -0,0 +1,118 @@ +//! Partial signature exchange codec helpers used by core types. + +use std::any::Any; + +use crate::{ + signeddata::{ + Attestation, BeaconCommitteeSelection, SignedAggregateAndProof, SignedRandao, + SignedSyncContributionAndProof, SignedSyncMessage, SignedVoluntaryExit, + SyncCommitteeSelection, VersionedAttestation, VersionedSignedAggregateAndProof, + VersionedSignedProposal, VersionedSignedValidatorRegistration, + }, + types::{DutyType, Signature, SignedData}, +}; + +/// Error type for partial signature exchange codec operations. +#[derive(Debug, thiserror::Error)] +pub enum ParSigExCodecError { + /// Missing duty or data set fields. + #[error("invalid parsigex msg fields")] + InvalidMessageFields, + + /// Invalid partial signed data set proto. + #[error("invalid partial signed data set proto fields")] + InvalidParSignedDataSetFields, + + /// Invalid partial signed proto. + #[error("invalid partial signed proto")] + InvalidParSignedProto, + + /// Invalid duty type. + #[error("invalid duty")] + InvalidDuty, + + /// Unsupported duty type. + #[error("unsupported duty type")] + UnsupportedDutyType, + + /// Deprecated builder proposer duty. + #[error("deprecated duty builder proposer")] + DeprecatedBuilderProposer, + + /// Failed to parse a public key. + #[error("invalid public key: {0}")] + InvalidPubKey(String), + + /// Invalid share index. + #[error("invalid share index")] + InvalidShareIndex, + + /// Serialization failed. + #[error("marshal signed data: {0}")] + Serialize(#[from] serde_json::Error), +} + +pub(crate) fn serialize_signed_data(data: &dyn SignedData) -> Result, ParSigExCodecError> { + let any = data as &dyn Any; + + macro_rules! serialize_as { + ($ty:ty) => { + if let Some(value) = any.downcast_ref::<$ty>() { + return Ok(serde_json::to_vec(value)?); + } + }; + } + + serialize_as!(Attestation); + serialize_as!(VersionedAttestation); + serialize_as!(VersionedSignedProposal); + serialize_as!(VersionedSignedValidatorRegistration); + serialize_as!(SignedVoluntaryExit); + serialize_as!(SignedRandao); + serialize_as!(Signature); + serialize_as!(BeaconCommitteeSelection); + serialize_as!(SignedAggregateAndProof); + serialize_as!(VersionedSignedAggregateAndProof); + serialize_as!(SignedSyncMessage); + serialize_as!(SyncCommitteeSelection); + serialize_as!(SignedSyncContributionAndProof); + + Err(ParSigExCodecError::UnsupportedDutyType) +} + +pub(crate) fn deserialize_signed_data( + duty_type: &DutyType, + bytes: &[u8], +) -> Result, ParSigExCodecError> { + macro_rules! deserialize_json { + ($ty:ty) => { + serde_json::from_slice::<$ty>(bytes) + .map(|value| Box::new(value) as Box) + .map_err(ParSigExCodecError::from) + }; + } + + match duty_type { + // Match Go order: old Attestation format first, then VersionedAttestation. + DutyType::Attester => deserialize_json!(Attestation) + .or_else(|_| deserialize_json!(VersionedAttestation)) + .map_err(|_| ParSigExCodecError::UnsupportedDutyType), + DutyType::Proposer => deserialize_json!(VersionedSignedProposal), + DutyType::BuilderProposer => Err(ParSigExCodecError::DeprecatedBuilderProposer), + DutyType::BuilderRegistration => deserialize_json!(VersionedSignedValidatorRegistration), + DutyType::Exit => deserialize_json!(SignedVoluntaryExit), + DutyType::Randao => deserialize_json!(SignedRandao), + DutyType::Signature => deserialize_json!(Signature), + DutyType::PrepareAggregator => deserialize_json!(BeaconCommitteeSelection), + // Match Go order: old SignedAggregateAndProof format first, then versioned. + DutyType::Aggregator => deserialize_json!(SignedAggregateAndProof) + .or_else(|_| deserialize_json!(VersionedSignedAggregateAndProof)) + .map_err(|_| ParSigExCodecError::UnsupportedDutyType), + DutyType::SyncMessage => deserialize_json!(SignedSyncMessage), + DutyType::PrepareSyncContribution => deserialize_json!(SyncCommitteeSelection), + DutyType::SyncContribution => deserialize_json!(SignedSyncContributionAndProof), + DutyType::Unknown | DutyType::InfoSync | DutyType::DutySentinel(_) => { + Err(ParSigExCodecError::UnsupportedDutyType) + } + } +} diff --git a/crates/core/src/signeddata.rs b/crates/core/src/signeddata.rs index 8e1d1bba..56ef8622 100644 --- a/crates/core/src/signeddata.rs +++ b/crates/core/src/signeddata.rs @@ -48,6 +48,9 @@ pub enum SignedDataError { /// Invalid attestation wrapper JSON. #[error("unmarshal attestation")] AttestationJson, + /// Custom error. + #[error("{0}")] + Custom(Box), } fn hash_root(value: &T) -> [u8; 32] { diff --git a/crates/core/src/types.rs b/crates/core/src/types.rs index 78e2bc62..05a73f03 100644 --- a/crates/core/src/types.rs +++ b/crates/core/src/types.rs @@ -1,6 +1,6 @@ //! Types for the Charon core. -use std::{collections::HashMap, fmt::Display, iter}; +use std::{any::Any, collections::HashMap, fmt::Display, iter}; use chrono::{DateTime, Duration, Utc}; use dyn_clone::DynClone; @@ -8,7 +8,12 @@ use dyn_eq::DynEq; use serde::{Deserialize, Serialize}; use std::fmt::Debug as StdDebug; -use crate::signeddata::SignedDataError; +use crate::{ + ParSigExCodecError, + corepb::v1::core as pbcore, + parsigex_codec::{deserialize_signed_data, serialize_signed_data}, + signeddata::SignedDataError, +}; /// The type of duty. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -66,6 +71,52 @@ impl DutyType { } } +impl From<&DutyType> for i32 { + fn from(duty_type: &DutyType) -> Self { + match duty_type { + DutyType::Unknown => 0, + DutyType::Proposer => 1, + DutyType::Attester => 2, + DutyType::Signature => 3, + DutyType::Exit => 4, + DutyType::BuilderProposer => 5, + DutyType::BuilderRegistration => 6, + DutyType::Randao => 7, + DutyType::PrepareAggregator => 8, + DutyType::Aggregator => 9, + DutyType::SyncMessage => 10, + DutyType::PrepareSyncContribution => 11, + DutyType::SyncContribution => 12, + DutyType::InfoSync => 13, + DutyType::DutySentinel(_) => 14, + } + } +} + +impl TryFrom for DutyType { + type Error = ParSigExCodecError; + + fn try_from(value: i32) -> Result { + match value { + 0 => Ok(DutyType::Unknown), + 1 => Ok(DutyType::Proposer), + 2 => Ok(DutyType::Attester), + 3 => Ok(DutyType::Signature), + 4 => Ok(DutyType::Exit), + 5 => Ok(DutyType::BuilderProposer), + 6 => Ok(DutyType::BuilderRegistration), + 7 => Ok(DutyType::Randao), + 8 => Ok(DutyType::PrepareAggregator), + 9 => Ok(DutyType::Aggregator), + 10 => Ok(DutyType::SyncMessage), + 11 => Ok(DutyType::PrepareSyncContribution), + 12 => Ok(DutyType::SyncContribution), + 13 => Ok(DutyType::InfoSync), + _ => Err(ParSigExCodecError::InvalidDuty), + } + } +} + /// SlotNumber struct #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct SlotNumber(u64); @@ -192,6 +243,28 @@ impl Duty { } } +impl From<&Duty> for pbcore::Duty { + fn from(duty: &Duty) -> Self { + Self { + slot: duty.slot.inner(), + r#type: i32::from(&duty.duty_type), + } + } +} + +impl TryFrom<&pbcore::Duty> for Duty { + type Error = ParSigExCodecError; + + fn try_from(duty: &pbcore::Duty) -> Result { + let duty_type = DutyType::try_from(duty.r#type)?; + if !duty_type.is_valid() { + return Err(ParSigExCodecError::InvalidDuty); + } + + Ok(Self::new(duty.slot.into(), duty_type)) + } +} + /// The type of proposal. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -452,7 +525,7 @@ impl AsRef<[u8; SIG_LEN]> for Signature { } /// Signed data type -pub trait SignedData: DynClone + DynEq + StdDebug + Send + Sync { +pub trait SignedData: Any + DynClone + DynEq + StdDebug + Send + Sync { /// signature returns the signed duty data's signature. fn signature(&self) -> Result; @@ -517,6 +590,39 @@ impl ParSignedData { } } +impl TryFrom<&ParSignedData> for pbcore::ParSignedData { + type Error = ParSigExCodecError; + + fn try_from(data: &ParSignedData) -> Result { + let encoded = serialize_signed_data(data.signed_data.as_ref())?; + let share_idx = + i32::try_from(data.share_idx).map_err(|_| ParSigExCodecError::InvalidShareIndex)?; + let signature = data.signed_data.signature().map_err(|err| { + ParSigExCodecError::Serialize(serde_json::Error::io(std::io::Error::other( + err.to_string(), + ))) + })?; + + Ok(Self { + data: encoded.into(), + signature: signature.as_ref().to_vec().into(), + share_idx, + }) + } +} + +impl TryFrom<(&DutyType, &pbcore::ParSignedData)> for ParSignedData { + type Error = ParSigExCodecError; + + fn try_from(value: (&DutyType, &pbcore::ParSignedData)) -> Result { + let (duty_type, data) = value; + let share_idx = + u64::try_from(data.share_idx).map_err(|_| ParSigExCodecError::InvalidShareIndex)?; + let signed_data = deserialize_signed_data(duty_type, &data.data)?; + Ok(Self::new_boxed(signed_data, share_idx)) + } +} + /// ParSignedDataSet is a set of partially signed duty data only signed by a /// single threshold BLS share. #[derive(Debug, Clone, PartialEq, Eq, Default)] @@ -554,6 +660,39 @@ impl ParSignedDataSet { } } +impl TryFrom<&ParSignedDataSet> for pbcore::ParSignedDataSet { + type Error = ParSigExCodecError; + + fn try_from(set: &ParSignedDataSet) -> Result { + let mut out = std::collections::BTreeMap::new(); + for (pub_key, value) in set.inner() { + out.insert(pub_key.to_string(), pbcore::ParSignedData::try_from(value)?); + } + + Ok(Self { set: out }) + } +} + +impl TryFrom<(&DutyType, &pbcore::ParSignedDataSet)> for ParSignedDataSet { + type Error = ParSigExCodecError; + + fn try_from(value: (&DutyType, &pbcore::ParSignedDataSet)) -> Result { + let (duty_type, set) = value; + if set.set.is_empty() { + return Err(ParSigExCodecError::InvalidParSignedDataSetFields); + } + + let mut out = Self::new(); + for (pub_key, value) in &set.set { + let pub_key = PubKey::try_from(pub_key.as_str()) + .map_err(|_| ParSigExCodecError::InvalidPubKey(pub_key.clone()))?; + out.insert(pub_key, ParSignedData::try_from((duty_type, value))?); + } + + Ok(out) + } +} + /// SignedDataSet is a set of signed duty data. #[derive(Debug, Clone, PartialEq, Eq)] pub struct SignedDataSet(HashMap); diff --git a/crates/p2p/src/p2p.rs b/crates/p2p/src/p2p.rs index a35a2775..33ba5ab1 100644 --- a/crates/p2p/src/p2p.rs +++ b/crates/p2p/src/p2p.rs @@ -110,6 +110,14 @@ use crate::{ utils, }; +const YAMUX_MAX_NUM_STREAMS: usize = 2_048; + +fn yamux_config() -> yamux::Config { + let mut config = yamux::Config::default(); + config.set_max_num_streams(YAMUX_MAX_NUM_STREAMS); + config +} + /// P2P error. #[derive(Debug, thiserror::Error)] pub enum P2PError { @@ -323,20 +331,17 @@ impl Node { { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) + .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_quic() .with_dns() .map_err(P2PError::failed_to_build_swarm)? - .with_relay_client(noise::Config::new, yamux::Config::default) + .with_relay_client(noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_behaviour(|key, relay_client| { - let builder = - PlutoBehaviourBuilder::default().with_p2p_context(p2p_context.clone()); + let builder = PlutoBehaviourBuilder::default() + .with_p2p_context(p2p_context.clone()) + .with_quic_enabled(true); behaviour_fn(builder, key, relay_client).build(key) }) .map_err(P2PError::failed_to_build_swarm)? @@ -364,15 +369,11 @@ impl Node { { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) + .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_dns() .map_err(P2PError::failed_to_build_swarm)? - .with_relay_client(noise::Config::new, yamux::Config::default) + .with_relay_client(noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_behaviour(|key, relay_client| { let builder = @@ -400,11 +401,7 @@ impl Node { { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) + .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_quic() .with_dns() @@ -435,11 +432,7 @@ impl Node { { let swarm = SwarmBuilder::with_existing_identity(keypair) .with_tokio() - .with_tcp( - tcp::Config::default(), - noise::Config::new, - yamux::Config::default, - ) + .with_tcp(tcp::Config::default(), noise::Config::new, yamux_config) .map_err(P2PError::failed_to_build_swarm)? .with_quic() .with_dns() diff --git a/crates/p2p/src/proto.rs b/crates/p2p/src/proto.rs index 664d6eb3..ed81e0cd 100644 --- a/crates/p2p/src/proto.rs +++ b/crates/p2p/src/proto.rs @@ -16,7 +16,6 @@ pub async fn write_length_delimited( ) -> io::Result<()> { let mut len_buf = unsigned_varint::encode::usize_buffer(); let encoded_len = unsigned_varint::encode::usize(payload.len(), &mut len_buf); - stream.write_all(encoded_len).await?; stream.write_all(payload).await?; stream.flush().await diff --git a/crates/parsigex/Cargo.toml b/crates/parsigex/Cargo.toml new file mode 100644 index 00000000..67335ade --- /dev/null +++ b/crates/parsigex/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "pluto-parsigex" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +publish.workspace = true + +[dependencies] +either.workspace = true +futures.workspace = true +libp2p.workspace = true +prost.workspace = true +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true +unsigned-varint.workspace = true +pluto-core.workspace = true +pluto-p2p.workspace = true + +[dev-dependencies] +anyhow.workspace = true +clap.workspace = true +hex.workspace = true +pluto-cluster.workspace = true +pluto-tracing.workspace = true +tokio-util.workspace = true +serde_json.workspace = true + +[lints] +workspace = true diff --git a/crates/parsigex/examples/parsigex.rs b/crates/parsigex/examples/parsigex.rs new file mode 100644 index 00000000..b32f956d --- /dev/null +++ b/crates/parsigex/examples/parsigex.rs @@ -0,0 +1,565 @@ +#![allow(missing_docs)] +//! Partial-signature exchange example. +//! +//! Each node periodically broadcasts a synthetic [`ParSignedDataSet`] to all +//! cluster peers over the relay-routed libp2p network and logs every dataset it +//! receives from others. +//! +//! # Running a multi-node setup +//! +//! ## 1. Create a cluster +//! +//! Use the Charon Go CLI to generate per-node data directories, each containing +//! a `charon-enr-private-key` and a shared `cluster-lock.json`: +//! +//! ```bash +//! charon create cluster --name parsigex-test --nodes 3 --threshold 2 --no-verify \ +//! --cluster-dir ./cluster +//! ``` +//! +//! This writes `./cluster/node{0,1,2}/` — each directory is ready to use as +//! `--data-dir`. +//! +//! ## 2. Run each node +//! +//! Obol operates public relay servers. Pass one or more via `--relays` and +//! point `--data-dir` at the corresponding node directory from Step 1: +//! +//! ```bash +//! # Terminal 1 +//! cargo run -p pluto-parsigex --example parsigex -- \ +//! --relays https://0.relay.obol.tech,https://1.relay.obol.tech \ +//! --data-dir ./cluster/node0 --share-idx 1 +//! +//! # Terminal 2 +//! cargo run -p pluto-parsigex --example parsigex -- \ +//! --relays https://0.relay.obol.tech,https://1.relay.obol.tech \ +//! --data-dir ./cluster/node1 --share-idx 2 +//! +//! # Terminal 3 +//! cargo run -p pluto-parsigex --example parsigex -- \ +//! --relays https://0.relay.obol.tech,https://1.relay.obol.tech \ +//! --data-dir ./cluster/node2 --share-idx 3 +//! ``` +//! +//! Nodes discover each other through the relay and exchange partial signatures +//! every `--broadcast-every` seconds (default: 10). Look for log lines: +//! +//! ```text +//! INFO received partial signature set peer=... duty=... entries=... +//! INFO broadcasted sample partial signature set request_id=... duty=... +//! ``` +//! +//! `--relays` also accepts raw libp2p multiaddrs +//! (`/ip4/IP/tcp/PORT/p2p/PEER_ID`) and multiple comma-separated values. + +use std::{ + collections::{HashMap, HashSet}, + path::PathBuf, + time::Duration, +}; + +use anyhow::{Context, Result, anyhow}; +use clap::Parser; +use futures::StreamExt; +use libp2p::{ + identify, ping, + relay::{self}, + swarm::{NetworkBehaviour, SwarmEvent}, +}; +use pluto_cluster::lock::Lock; +use pluto_core::{ + signeddata::SignedRandao, + types::{Duty, DutyType, ParSignedDataSet, PubKey, SlotNumber}, +}; +use pluto_p2p::{ + behaviours::pluto::PlutoBehaviourEvent, + bootnode, + config::P2PConfig, + gater, k1, + p2p::{Node, NodeType}, + peer::peer_id_from_key, + relay::{MutableRelayReservation, RelayRouter}, +}; +use pluto_parsigex::{self as parsigex, DutyGater, Event, Handle, Verifier}; +use pluto_tracing::TracingConfig; +use tokio::fs; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn}; + +#[derive(NetworkBehaviour)] +#[behaviour(to_swarm = "CombinedBehaviourEvent")] +struct CombinedBehaviour { + relay: relay::client::Behaviour, + relay_reservation: MutableRelayReservation, + relay_router: RelayRouter, + parsigex: parsigex::Behaviour, +} + +#[derive(Debug)] +enum CombinedBehaviourEvent { + ParSigEx(Event), + Relay(relay::client::Event), +} + +impl From for CombinedBehaviourEvent { + fn from(event: Event) -> Self { + Self::ParSigEx(event) + } +} + +impl From for CombinedBehaviourEvent { + fn from(event: relay::client::Event) -> Self { + Self::Relay(event) + } +} + +impl From for CombinedBehaviourEvent { + fn from(value: std::convert::Infallible) -> Self { + match value {} + } +} + +#[derive(Debug, Parser)] +#[command(name = "parsigex-example")] +#[command(about = "Demonstrates partial signature exchange over the bootnode/relay P2P path")] +struct Args { + /// Relay URLs or multiaddrs. + #[arg(long, value_delimiter = ',')] + relays: Vec, + + /// Directory holding the p2p private key and cluster lock. + #[arg(long)] + data_dir: PathBuf, + + /// TCP listen addresses. + #[arg(long, value_delimiter = ',', default_value = "0.0.0.0:0")] + tcp_addrs: Vec, + + /// UDP listen addresses used for QUIC. + #[arg(long, value_delimiter = ',', default_value = "0.0.0.0:0")] + udp_addrs: Vec, + + /// Whether to filter private addresses from advertisements. + #[arg(long, default_value_t = false)] + filter_private_addrs: bool, + + /// External IP address to advertise. + #[arg(long)] + external_ip: Option, + + /// External hostname to advertise. + #[arg(long)] + external_host: Option, + + /// Whether to disable socket reuse-port. + #[arg(long, default_value_t = false)] + disable_reuse_port: bool, + + /// Emit a sample partial signature every N seconds. + #[arg(long, default_value_t = 10)] + broadcast_every: u64, + + /// Share index to use in the sample partial signature. + #[arg(long, default_value_t = 1)] + share_idx: u64, + + /// Log level. + #[arg(long, default_value = "info")] + log_level: String, +} + +fn make_sample_set(slot: u64, share_idx: u64) -> ParSignedDataSet { + let share_byte = u8::try_from(share_idx % 255).unwrap_or(1); + let pub_key = PubKey::new([share_byte; 48]); + + let mut set = ParSignedDataSet::new(); + set.insert( + pub_key, + SignedRandao::new_partial(slot / 32, [share_byte; 96], share_idx), + ); + set +} + +fn log_received(duty: &Duty, set: &ParSignedDataSet, peer: &libp2p::PeerId) { + let entries = set + .inner() + .iter() + .map(|(pub_key, data)| format!("{pub_key}:share_idx={}", data.share_idx)) + .collect::>() + .join(", "); + + info!(peer = %peer, duty = %duty, entries = %entries, "received partial signature set"); +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + pluto_tracing::init( + &TracingConfig::builder() + .with_default_console() + .override_env_filter(&args.log_level) + .build(), + )?; + + let key = k1::load_priv_key(&args.data_dir).with_context(|| { + format!( + "failed to load private key from {}", + args.data_dir.display() + ) + })?; + let local_peer_id = peer_id_from_key(key.public_key()) + .context("failed to derive local peer ID from private key")?; + + let lock_path = args.data_dir.join("cluster-lock.json"); + let lock_str = fs::read_to_string(&lock_path) + .await + .with_context(|| format!("failed to read {}", lock_path.display()))?; + let lock: Lock = serde_json::from_str(&lock_str) + .with_context(|| format!("failed to parse {}", lock_path.display()))?; + + let cancel = CancellationToken::new(); + let lock_hash_hex = hex::encode(&lock.lock_hash); + let relays = bootnode::new_relays(cancel.child_token(), &args.relays, &lock_hash_hex) + .await + .context("failed to resolve relays")?; + + let known_peers = lock + .peer_ids() + .context("failed to derive peer IDs from lock")?; + if !known_peers.contains(&local_peer_id) { + return Err(anyhow!( + "local peer ID {local_peer_id} not found in cluster lock" + )); + } + let conn_gater = gater::ConnGater::new( + gater::Config::closed() + .with_relays(relays.clone()) + .with_peer_ids(known_peers.clone()), + ); + + let verifier: Verifier = + std::sync::Arc::new(|_duty, _pubkey, _data| Box::pin(async { Ok(()) })); + let duty_gater: DutyGater = std::sync::Arc::new(|duty| duty.duty_type != DutyType::Unknown); + let handle_slot = std::sync::Arc::new(tokio::sync::Mutex::new(1_u64)); + + let p2p_config = P2PConfig { + relays: vec![], + external_ip: args.external_ip.clone(), + external_host: args.external_host.clone(), + tcp_addrs: args.tcp_addrs.clone(), + udp_addrs: args.udp_addrs.clone(), + disable_reuse_port: args.disable_reuse_port, + }; + + let relay_peer_ids: HashSet<_> = relays + .iter() + .filter_map(|relay| relay.peer().ok().flatten().map(|peer| peer.id)) + .collect(); + + let mut parsigex_handle: Option = None; + let mut node: Node = Node::new( + p2p_config, + key, + NodeType::QUIC, + args.filter_private_addrs, + known_peers.clone(), + |builder, keypair, relay_client| { + let p2p_context = builder.p2p_context(); + let local_peer_id = keypair.public().to_peer_id(); + let config = parsigex::Config::new( + local_peer_id, + p2p_context.clone(), + verifier.clone(), + duty_gater.clone(), + ) + .with_timeout(Duration::from_secs(10)); + let (parsigex, handle) = parsigex::Behaviour::new(config, local_peer_id); + parsigex_handle = Some(handle); + + builder + .with_gater(conn_gater) + .with_inner(CombinedBehaviour { + parsigex, + relay: relay_client, + relay_reservation: MutableRelayReservation::new(relays.clone()), + relay_router: RelayRouter::new(relays.clone(), p2p_context, local_peer_id), + }) + }, + )?; + + let parsigex_handle = + parsigex_handle.ok_or_else(|| anyhow!("parsigex handle should be created"))?; + + info!( + peer_id = %node.local_peer_id(), + data_dir = %args.data_dir.display(), + known_peers = ?known_peers, + relays = ?args.relays, + "parsigex example started" + ); + + let mut ticker = tokio::time::interval(Duration::from_secs(args.broadcast_every)); + let mut pending_broadcasts: HashMap = HashMap::new(); + + loop { + tokio::select! { + _ = tokio::signal::ctrl_c() => { + info!("ctrl+c received, shutting down"); + break; + } + _ = ticker.tick() => { + info!("broadcasting sample partial signature set"); + let mut slot = handle_slot.lock().await; + let duty = Duty::new(SlotNumber::new(*slot), DutyType::Randao); + let data_set = make_sample_set(*slot, args.share_idx); + + match parsigex_handle.broadcast(duty.clone(), data_set.clone()).await { + Ok(request_id) => { + pending_broadcasts.insert(request_id, (duty.clone(), args.share_idx)); + info!( + request_id, + duty = %duty, + share_idx = args.share_idx, + "queued sample partial signature set for broadcast" + ); + *slot = slot.saturating_add(1); + } + Err(error) => { + warn!(%error, "broadcast failed"); + } + } + } + event = node.select_next_some() => { + let peer_type = |peer_id: &libp2p::PeerId| { + if relay_peer_ids.contains(peer_id) { + "RELAY" + } else if known_peers.contains(peer_id) { + "PEER" + } else { + "UNKNOWN" + } + }; + + match event { + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::Relay(relay::client::Event::ReservationReqAccepted { + relay_peer_id, + renewal, + limit, + }), + )) => { + info!( + relay_peer_id = %relay_peer_id, + peer_type = peer_type(&relay_peer_id), + renewal, + limit = ?limit, + "relay reservation accepted" + ); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::Relay(relay::client::Event::OutboundCircuitEstablished { + relay_peer_id, + limit, + }), + )) => { + info!( + relay_peer_id = %relay_peer_id, + peer_type = peer_type(&relay_peer_id), + limit = ?limit, + "outbound relay circuit established" + ); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::Relay(relay::client::Event::InboundCircuitEstablished { + src_peer_id, + limit, + }), + )) => { + info!( + src_peer_id = %src_peer_id, + peer_type = peer_type(&src_peer_id), + limit = ?limit, + "inbound relay circuit established" + ); + } + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint, + num_established, + .. + } => { + let address = match &endpoint { + libp2p::core::ConnectedPoint::Dialer { address, .. } => address, + libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => { + send_back_addr + } + }; + info!( + peer_id = %peer_id, + peer_type = peer_type(&peer_id), + address = %address, + num_established, + "connection established" + ); + } + SwarmEvent::ConnectionClosed { + peer_id, + endpoint, + num_established, + cause, + .. + } => { + let address = match &endpoint { + libp2p::core::ConnectedPoint::Dialer { address, .. } => address, + libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => { + send_back_addr + } + }; + info!( + peer_id = %peer_id, + peer_type = peer_type(&peer_id), + address = %address, + num_established, + cause = ?cause, + "connection closed" + ); + } + SwarmEvent::OutgoingConnectionError { + peer_id, + error, + connection_id, + } => { + warn!( + peer_id = ?peer_id, + connection_id = ?connection_id, + error = %error, + "outgoing connection failed" + ); + } + SwarmEvent::IncomingConnectionError { + connection_id, + local_addr, + send_back_addr, + error, + .. + } => { + warn!( + connection_id = ?connection_id, + local_addr = %local_addr, + send_back_addr = %send_back_addr, + error = %error, + "incoming connection failed" + ); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Identify( + identify::Event::Received { peer_id, info, .. }, + )) => { + info!( + peer_id = %peer_id, + peer_type = peer_type(&peer_id), + agent_version = %info.agent_version, + protocol_version = %info.protocol_version, + listen_addrs = ?info.listen_addrs, + "identify received" + ); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Ping(ping::Event { + peer, + result, + .. + })) => match result { + Ok(rtt) => { + info!(peer_id = %peer, peer_type = peer_type(&peer), rtt = ?rtt, "ping succeeded"); + } + Err(error) => { + warn!(peer_id = %peer, peer_type = peer_type(&peer), error = %error, "ping failed"); + } + }, + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::Received { + peer, + duty, + data_set, + .. + }), + )) => { + log_received(&duty, &data_set, &peer); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::Error { peer, error, .. }), + )) => { + warn!(peer = %peer, error = %error, "parsigex protocol error"); + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::BroadcastError { + request_id, + peer, + error, + }), + )) => { + match pending_broadcasts.get(&request_id) { + Some((duty, share_idx)) => { + warn!( + request_id, + duty = %duty, + share_idx, + peer = ?peer, + error = %error, + "sample partial signature broadcast failed" + ); + } + None => { + warn!( + request_id, + peer = ?peer, + error = %error, + "partial signature broadcast failed" + ); + } + } + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::BroadcastComplete { + request_id, + }), + )) => { + if let Some((duty, share_idx)) = pending_broadcasts.remove(&request_id) { + info!( + request_id, + duty = %duty, + share_idx, + "broadcasted sample partial signature set" + ); + } else { + info!(request_id, "partial signature broadcast completed"); + } + } + SwarmEvent::Behaviour(PlutoBehaviourEvent::Inner( + CombinedBehaviourEvent::ParSigEx(Event::BroadcastFailed { + request_id, + }), + )) => { + if let Some((duty, share_idx)) = pending_broadcasts.remove(&request_id) { + warn!( + request_id, + duty = %duty, + share_idx, + "sample partial signature broadcast finished with failures" + ); + } else { + warn!(request_id, "partial signature broadcast finished with failures"); + } + } + SwarmEvent::NewListenAddr { address, .. } => { + info!(address = %address, "listening"); + } + _ => {} + } + } + } + } + + Ok(()) +} diff --git a/crates/parsigex/src/behaviour.rs b/crates/parsigex/src/behaviour.rs new file mode 100644 index 00000000..71212288 --- /dev/null +++ b/crates/parsigex/src/behaviour.rs @@ -0,0 +1,464 @@ +//! Network behaviour and control handle for partial signature exchange. + +use std::{ + collections::{HashMap, VecDeque}, + future::Future, + pin::Pin, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + task::{Context, Poll}, + time::Duration, +}; + +use either::Either; +use libp2p::{ + Multiaddr, PeerId, + swarm::{ + ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, NotifyHandler, THandler, + THandlerInEvent, THandlerOutEvent, ToSwarm, dummy, + }, +}; +use tokio::sync::{Mutex, mpsc}; + +use pluto_core::types::{Duty, ParSignedData, ParSignedDataSet, PubKey}; +use pluto_p2p::p2p_context::P2PContext; + +use super::{Handler, encode_message}; +use crate::{ + error::{Error, Failure, Result, VerifyError}, + handler::{FromHandler, ToHandler}, +}; + +/// Future returned by verifier callbacks. +pub type VerifyFuture = + Pin> + Send + 'static>>; + +/// Verifier callback type. +pub type Verifier = + Arc VerifyFuture + Send + Sync + 'static>; + +/// Duty gate callback type. +pub type DutyGater = Arc bool + Send + Sync + 'static>; + +/// Future returned by received subscriber callbacks. +pub type ReceivedSubFuture = Pin + Send + 'static>>; + +/// Subscriber callback for received partial signature sets. +/// +/// Called when a verified partial signature set is received from a peer. +pub type ReceivedSub = + Arc ReceivedSubFuture + Send + Sync + 'static>; + +/// Helper to create a received subscriber from a closure. +pub fn received_subscriber(f: F) -> ReceivedSub +where + F: Fn(Duty, ParSignedDataSet) -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, +{ + Arc::new(move |duty, set| Box::pin(f(duty, set))) +} + +/// Event emitted by the partial signature exchange behaviour. +#[derive(Debug)] +pub enum Event { + /// A verified partial signature set was received from a peer. + Received { + /// The remote peer. + peer: PeerId, + /// Connection on which it was received. + connection: ConnectionId, + /// Duty associated with the data set. + duty: Duty, + /// Partial signature set. + data_set: ParSignedDataSet, + }, + /// A peer sent invalid data or verification failed. + Error { + /// The remote peer. + peer: PeerId, + /// Connection on which the error occurred. + connection: ConnectionId, + /// Failure reason. + error: Failure, + }, + /// Broadcast failed. + BroadcastError { + /// Request identifier. + request_id: u64, + /// Peer for which the broadcast failed, if known. + peer: Option, + /// Failure reason. + error: Failure, + }, + /// Broadcast completed successfully for all targeted peers. + BroadcastComplete { + /// Request identifier. + request_id: u64, + }, + /// Broadcast failed after one or more peer failures. + BroadcastFailed { + /// Request identifier. + request_id: u64, + }, +} + +#[derive(Debug)] +struct PendingBroadcast { + remaining: usize, + failed: bool, +} + +#[derive(Debug)] +enum Command { + Broadcast { + request_id: u64, + duty: Duty, + data_set: ParSignedDataSet, + }, +} + +/// Shared subscriber list between [`Handle`] and [`Behaviour`]. +#[derive(Default)] +struct SharedSubs { + subs: Mutex>, +} + +/// Async handle for outbound partial signature broadcasts. +#[derive(Clone)] +pub struct Handle { + tx: mpsc::UnboundedSender, + next_request_id: Arc, + shared_subs: Arc, +} + +impl std::fmt::Debug for Handle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Handle") + .field("next_request_id", &self.next_request_id) + .finish_non_exhaustive() + } +} + +impl Handle { + /// Broadcasts a partial signature set to all peers except self. + pub async fn broadcast(&self, duty: Duty, data_set: ParSignedDataSet) -> Result { + let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed); + self.tx + .send(Command::Broadcast { + request_id, + duty, + data_set, + }) + .map_err(|_| Error::Closed)?; + Ok(request_id) + } + + /// Registers a callback when a verified partial signature set is received + /// from a peer. + /// + /// This is not thread safe with respect to message delivery ordering — it + /// must be called before the swarm starts processing events. + pub async fn subscribe(&self, sub: ReceivedSub) { + self.shared_subs.subs.lock().await.push(sub); + } +} + +/// Configuration for the partial signature exchange behaviour. +#[derive(Clone)] +pub struct Config { + peer_id: PeerId, + p2p_context: P2PContext, + verifier: Verifier, + duty_gater: DutyGater, + timeout: Duration, +} + +impl Config { + /// Creates a new configuration. + pub fn new( + peer_id: PeerId, + p2p_context: P2PContext, + verifier: Verifier, + duty_gater: DutyGater, + ) -> Self { + Self { + peer_id, + p2p_context, + verifier, + duty_gater, + timeout: Duration::from_secs(20), + } + } + + /// Sets the send/receive timeout. + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } +} + +/// Behaviour for partial signature exchange. +pub struct Behaviour { + config: Config, + rx: mpsc::UnboundedReceiver, + pending_events: VecDeque>, + pending_broadcasts: HashMap, + shared_subs: Arc, +} + +impl Behaviour { + /// Creates a behaviour and a clonable broadcast handle. + pub fn new(config: Config, peer_id: PeerId) -> (Self, Handle) { + debug_assert_eq!(config.peer_id, peer_id); + let (tx, rx) = mpsc::unbounded_channel(); + let shared_subs = Arc::new(SharedSubs::default()); + let handle = Handle { + tx, + next_request_id: Arc::new(AtomicU64::new(0)), + shared_subs: shared_subs.clone(), + }; + ( + Self { + config, + rx, + pending_events: VecDeque::new(), + pending_broadcasts: HashMap::new(), + shared_subs, + }, + handle, + ) + } + + fn connection_handler_for_peer(&self, peer: PeerId) -> THandler { + if !self.config.p2p_context.is_known_peer(&peer) { + return Either::Right(dummy::ConnectionHandler); + } + Either::Left(Handler::new( + self.config.timeout, + self.config.verifier.clone(), + self.config.duty_gater.clone(), + peer, + )) + } + + fn handle_command(&mut self, command: Command) { + match command { + Command::Broadcast { + request_id, + duty, + data_set, + } => { + let message = match encode_message(&duty, &data_set) { + Ok(message) => message, + Err(err) => { + self.emit_broadcast_error( + request_id, + None, + Failure::Codec(err.to_string()), + ); + return; + } + }; + + let peers: Vec<_> = self + .config + .p2p_context + .known_peers() + .iter() + .copied() + .collect(); + let mut targeted = 0usize; + for peer in peers { + if peer == self.config.peer_id { + continue; + } + + if self + .config + .p2p_context + .peer_store_lock() + .connections_to_peer(&peer) + .is_empty() + { + self.emit_broadcast_error( + request_id, + Some(peer), + Failure::io(format!("peer {peer} is not connected")), + ); + continue; + } + + self.pending_events.push_back(ToSwarm::NotifyHandler { + peer_id: peer, + handler: NotifyHandler::Any, + event: ToHandler::Send { + request_id, + payload: message.clone(), + }, + }); + targeted = targeted.saturating_add(1); + } + + if targeted == 0 { + return; + } + + self.pending_broadcasts.insert( + request_id, + PendingBroadcast { + remaining: targeted, + failed: false, + }, + ); + } + } + } + + fn finish_broadcast_result(&mut self, request_id: u64, failed: bool) { + let Some(entry) = self.pending_broadcasts.get_mut(&request_id) else { + return; + }; + + entry.failed |= failed; + entry.remaining = entry.remaining.saturating_sub(1); + if entry.remaining == 0 { + let failed = self + .pending_broadcasts + .remove(&request_id) + .map(|entry| entry.failed) + .unwrap_or(failed); + if failed { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::BroadcastFailed { + request_id, + })); + } else { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::BroadcastComplete { + request_id, + })); + } + } + } + + fn emit_broadcast_error(&mut self, request_id: u64, peer: Option, error: Failure) { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::BroadcastError { + request_id, + peer, + error, + })); + } + + fn handle_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: FromHandler, + ) { + match event { + FromHandler::Received { duty, data_set } => { + self.notify_subscribers(duty.clone(), data_set.clone()); + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::Received { + peer: peer_id, + connection: connection_id, + duty, + data_set, + })); + } + FromHandler::InboundError(error) => { + self.pending_events + .push_back(ToSwarm::GenerateEvent(Event::Error { + peer: peer_id, + connection: connection_id, + error, + })); + } + FromHandler::OutboundSuccess { request_id } => { + self.finish_broadcast_result(request_id, false); + } + FromHandler::OutboundError { request_id, error } => { + self.finish_broadcast_result(request_id, true); + self.emit_broadcast_error(request_id, Some(peer_id), error); + } + } + } + + /// Notifies all registered subscribers of a received partial signature set. + /// + /// Each subscriber is invoked in a spawned task since `poll()` is + /// synchronous. This matches Go's intended behaviour (see Go TODO to call + /// subscribers async). + fn notify_subscribers(&self, duty: Duty, data_set: ParSignedDataSet) { + let shared_subs = self.shared_subs.clone(); + tokio::spawn(async move { + let subs = shared_subs.subs.lock().await.clone(); + for sub in &subs { + sub(duty.clone(), data_set.clone()).await; + } + }); + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = Either; + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> std::result::Result, ConnectionDenied> { + Ok(self.connection_handler_for_peer(peer)) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + peer: PeerId, + _addr: &Multiaddr, + _role_override: libp2p::core::Endpoint, + _port_use: libp2p::core::transport::PortUse, + ) -> std::result::Result, ConnectionDenied> { + Ok(self.connection_handler_for_peer(peer)) + } + + fn on_swarm_event(&mut self, _event: FromSwarm) {} + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + let event = match event { + Either::Left(event) => event, + Either::Right(unreachable) => match unreachable {}, + }; + self.handle_handler_event(peer_id, connection_id, event); + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(event.map_in(Either::Left)); + } + + while let Poll::Ready(Some(command)) = self.rx.poll_recv(cx) { + self.handle_command(command); + } + + if let Some(event) = self.pending_events.pop_front() { + return Poll::Ready(event.map_in(Either::Left)); + } + + Poll::Pending + } +} diff --git a/crates/parsigex/src/error.rs b/crates/parsigex/src/error.rs new file mode 100644 index 00000000..c9eb70e5 --- /dev/null +++ b/crates/parsigex/src/error.rs @@ -0,0 +1,76 @@ +//! Error types for the partial signature exchange protocol. + +use libp2p::PeerId; +use pluto_core::ParSigExCodecError; + +/// Result type for partial signature exchange. +pub type Result = std::result::Result; + +/// Handler-to-behaviour failure. +#[derive(Debug, Clone, thiserror::Error)] +pub enum Failure { + /// Stream negotiation or operation timed out. + #[error("parsigex timed out")] + Timeout, + /// Invalid payload received. + #[error("invalid parsigex payload")] + InvalidPayload, + /// Duty not accepted by the gater. + #[error("invalid duty")] + InvalidDuty, + /// Signature verification failed. + #[error("invalid partial signature")] + InvalidPartialSignature, + /// I/O error. + #[error("{0}")] + Io(String), + /// Codec error. + #[error("codec error: {0}")] + Codec(String), +} + +impl Failure { + pub(crate) fn io(error: impl std::fmt::Display) -> Self { + Self::Io(error.to_string()) + } +} + +/// Error type for signature verification callbacks. +#[derive(Debug, thiserror::Error)] +pub enum VerifyError { + /// Unknown validator public key. + #[error("unknown pubkey, not part of cluster lock")] + UnknownPubKey, + /// Invalid share index for the validator. + #[error("invalid shareIdx")] + InvalidShareIndex, + /// Invalid signed-data family for the duty. + #[error("invalid eth2 signed data")] + InvalidSignedDataFamily, + /// Generic verification error. + #[error("{0}")] + Other(String), +} + +/// Error type for partial signature exchange operations. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Message conversion failed. + #[error(transparent)] + Codec(#[from] ParSigExCodecError), + /// Handle channel closed. + #[error("parsigex handle closed")] + Closed, + /// Broadcast failed for a peer. + #[error("broadcast to peer {peer} failed: {source}")] + BroadcastPeer { + /// Peer for which the broadcast failed. + peer: PeerId, + /// Source failure. + #[source] + source: Failure, + }, + /// Peer is not currently connected. + #[error("peer {0} is not connected")] + PeerNotConnected(PeerId), +} diff --git a/crates/parsigex/src/handler.rs b/crates/parsigex/src/handler.rs new file mode 100644 index 00000000..f3babc2e --- /dev/null +++ b/crates/parsigex/src/handler.rs @@ -0,0 +1,272 @@ +//! Connection handler for the partial signature exchange protocol. + +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::Duration, +}; + +use futures::{FutureExt, StreamExt, future::BoxFuture, stream::FuturesUnordered}; +use libp2p::{ + PeerId, + core::upgrade::ReadyUpgrade, + swarm::{ + ConnectionHandler, ConnectionHandlerEvent, StreamProtocol, StreamUpgradeError, + SubstreamProtocol, + handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + }, + }, +}; +use tokio::time::timeout; + +use pluto_core::types::{Duty, ParSignedDataSet}; + +use super::{DutyGater, PROTOCOL_NAME, Verifier, protocol}; +use crate::error::Failure; + +/// Command sent from the behaviour to a handler. +#[derive(Debug)] +pub enum ToHandler { + /// Send the encoded payload to the remote peer. + Send { + /// Request identifier used to correlate broadcast completions. + request_id: u64, + /// Encoded protobuf payload. + payload: Vec, + }, +} + +/// Event sent from the handler back to the behaviour. +#[derive(Debug)] +pub enum FromHandler { + /// A verified message was received. + Received { + /// Duty from the message. + duty: Duty, + /// Verified partial signature set. + data_set: ParSignedDataSet, + }, + /// An inbound message failed decoding, gating, or verification. + InboundError(Failure), + /// Outbound send completed successfully. + OutboundSuccess { + /// Request identifier. + request_id: u64, + }, + /// Outbound send failed. + OutboundError { + /// Request identifier. + request_id: u64, + /// Failure reason. + error: Failure, + }, +} + +/// Outbound open info that carries the request context through stream +/// negotiation. +pub struct PendingOpen { + request_id: u64, + payload: Vec, +} + +type ActiveFuture = BoxFuture<'static, Option>; + +/// Connection handler for parsigex. +pub struct Handler { + timeout: Duration, + verifier: Verifier, + duty_gater: DutyGater, + pending_open: VecDeque, + active_futures: FuturesUnordered, +} + +impl Handler { + /// Creates a new handler for one connection. + pub fn new( + timeout: Duration, + verifier: Verifier, + duty_gater: DutyGater, + _peer: PeerId, + ) -> Self { + Self { + timeout, + verifier, + duty_gater, + pending_open: VecDeque::new(), + active_futures: FuturesUnordered::new(), + } + } + + fn handle_fully_negotiated_inbound(&mut self, mut stream: libp2p::swarm::Stream) { + stream.ignore_for_keep_alive(); + let verifier = self.verifier.clone(); + let duty_gater = self.duty_gater.clone(); + let t = self.timeout; + + self.active_futures.push( + async move { + Some( + match timeout(t, do_recv(stream, verifier, duty_gater)).await { + Ok(Ok((duty, data_set))) => FromHandler::Received { duty, data_set }, + Ok(Err(e)) => FromHandler::InboundError(e), + Err(_) => FromHandler::InboundError(Failure::Timeout), + }, + ) + } + .boxed(), + ); + } + + fn handle_fully_negotiated_outbound( + &mut self, + mut stream: libp2p::swarm::Stream, + info: PendingOpen, + ) { + stream.ignore_for_keep_alive(); + let PendingOpen { + request_id, + payload, + } = info; + let t = self.timeout; + + self.active_futures.push( + async move { + Some(match timeout(t, do_send(stream, payload)).await { + Ok(Ok(())) => FromHandler::OutboundSuccess { request_id }, + Ok(Err(e)) => FromHandler::OutboundError { + request_id, + error: e, + }, + Err(_) => FromHandler::OutboundError { + request_id, + error: Failure::Timeout, + }, + }) + } + .boxed(), + ); + } + + fn handle_dial_upgrade_error(&mut self, info: PendingOpen, error: StreamUpgradeError) + where + E: std::error::Error + Send + Sync + 'static, + { + let request_id = info.request_id; + let failure = match error { + StreamUpgradeError::Timeout => Failure::Timeout, + StreamUpgradeError::NegotiationFailed => Failure::io("protocol negotiation failed"), + StreamUpgradeError::Apply(e) => Failure::io(e), + StreamUpgradeError::Io(e) => Failure::io(e), + }; + self.active_futures.push( + async move { + Some(FromHandler::OutboundError { + request_id, + error: failure, + }) + } + .boxed(), + ); + } +} + +impl ConnectionHandler for Handler { + type FromBehaviour = ToHandler; + type InboundOpenInfo = (); + type InboundProtocol = ReadyUpgrade; + type OutboundOpenInfo = PendingOpen; + type OutboundProtocol = ReadyUpgrade; + type ToBehaviour = FromHandler; + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), ()) + } + + fn on_behaviour_event(&mut self, event: Self::FromBehaviour) { + match event { + ToHandler::Send { + request_id, + payload, + } => { + self.pending_open.push_back(PendingOpen { + request_id, + payload, + }); + } + } + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent, + > { + if let Some(pending) = self.pending_open.pop_front() { + return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(ReadyUpgrade::new(PROTOCOL_NAME), pending), + }); + } + + while let Poll::Ready(Some(event)) = self.active_futures.poll_next_unpin(cx) { + if let Some(event) = event { + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); + } + } + + Poll::Pending + } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol: stream, + .. + }) => self.handle_fully_negotiated_inbound(stream), + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol: stream, + info, + .. + }) => self.handle_fully_negotiated_outbound(stream, info), + ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => { + self.handle_dial_upgrade_error(info, error); + } + _ => {} + } + } +} + +async fn do_recv( + mut stream: libp2p::swarm::Stream, + verifier: Verifier, + duty_gater: DutyGater, +) -> Result<(Duty, ParSignedDataSet), Failure> { + let bytes = protocol::recv_message(&mut stream) + .await + .map_err(Failure::io)?; + let (duty, data_set) = protocol::decode_message(&bytes).map_err(|_| Failure::InvalidPayload)?; + if !duty_gater(&duty) { + return Err(Failure::InvalidDuty); + } + for (pub_key, par_sig) in data_set.inner() { + verifier(duty.clone(), *pub_key, par_sig.clone()) + .await + .map_err(|_| Failure::InvalidPartialSignature)?; + } + Ok((duty, data_set)) +} + +async fn do_send(mut stream: libp2p::swarm::Stream, payload: Vec) -> Result<(), Failure> { + protocol::send_message(&mut stream, &payload) + .await + .map_err(Failure::io) +} diff --git a/crates/parsigex/src/lib.rs b/crates/parsigex/src/lib.rs new file mode 100644 index 00000000..e1abdfd3 --- /dev/null +++ b/crates/parsigex/src/lib.rs @@ -0,0 +1,24 @@ +//! Partial signature exchange protocol. + +mod behaviour; +mod error; +mod handler; +mod protocol; + +pub use behaviour::{ + Behaviour, Config, DutyGater, Event, Handle, ReceivedSub, ReceivedSubFuture, Verifier, + received_subscriber, +}; +pub use error::{Error, Failure, Result, VerifyError}; +pub use handler::Handler; +pub use protocol::{decode_message, encode_message}; + +use libp2p::swarm::StreamProtocol; + +/// The protocol name for partial signature exchange (version 2.0.0). +pub const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/charon/parsigex/2.0.0"); + +/// Returns the supported protocols in precedence order. +pub fn protocols() -> Vec { + vec![PROTOCOL_NAME] +} diff --git a/crates/parsigex/src/protocol.rs b/crates/parsigex/src/protocol.rs new file mode 100644 index 00000000..93ea0050 --- /dev/null +++ b/crates/parsigex/src/protocol.rs @@ -0,0 +1,70 @@ +//! Wire protocol helpers for partial signature exchange. + +use std::io; + +use futures::AsyncWriteExt; +use libp2p::swarm::Stream; +use prost::Message; + +use pluto_core::{ + corepb::v1::{core as pbcore, parsigex as pbparsigex}, + types::{Duty, ParSignedDataSet}, +}; +use pluto_p2p::proto; + +use super::{Error, Result as ParasigexResult}; + +/// Encodes a protobuf message to bytes. +pub fn encode_protobuf(message: &M) -> Vec { + let mut buf = Vec::with_capacity(message.encoded_len()); + message + .encode(&mut buf) + .expect("vec-backed protobuf encoding cannot fail"); + buf +} + +/// Decodes a protobuf message from bytes. +pub fn decode_protobuf( + bytes: &[u8], +) -> std::result::Result { + M::decode(bytes) +} + +/// Encodes a partial signature exchange message. +pub fn encode_message(duty: &Duty, data_set: &ParSignedDataSet) -> ParasigexResult> { + let pb = pbparsigex::ParSigExMsg { + duty: Some(pbcore::Duty::from(duty)), + data_set: Some(pbcore::ParSignedDataSet::try_from(data_set)?), + }; + + Ok(encode_protobuf(&pb)) +} + +/// Decodes a partial signature exchange message. +pub fn decode_message(bytes: &[u8]) -> ParasigexResult<(Duty, ParSignedDataSet)> { + let pb: pbparsigex::ParSigExMsg = decode_protobuf(bytes) + .map_err(|_| Error::from(pluto_core::ParSigExCodecError::InvalidMessageFields))?; + let duty_pb = pb + .duty + .ok_or(pluto_core::ParSigExCodecError::InvalidMessageFields)?; + let data_set_pb = pb + .data_set + .ok_or(pluto_core::ParSigExCodecError::InvalidMessageFields)?; + let duty = Duty::try_from(&duty_pb)?; + let data_set = ParSignedDataSet::try_from((&duty.duty_type, &data_set_pb))?; + Ok((duty, data_set)) +} + +/// Sends one protobuf message on the stream and closes the write side. +pub async fn send_message(stream: &mut Stream, payload: &[u8]) -> io::Result<()> { + proto::write_length_delimited(stream, payload).await?; + let _ = stream.close().await; + Ok(()) +} + +/// Receives one protobuf payload from the stream and closes the write side. +pub async fn recv_message(stream: &mut Stream) -> io::Result> { + let bytes = proto::read_length_delimited(stream, proto::MAX_MESSAGE_SIZE).await?; + let _ = stream.close().await; + Ok(bytes) +} diff --git a/crates/peerinfo/src/protocol.rs b/crates/peerinfo/src/protocol.rs index 150a0c57..e59fcbb2 100644 --- a/crates/peerinfo/src/protocol.rs +++ b/crates/peerinfo/src/protocol.rs @@ -19,6 +19,7 @@ use std::{ use chrono::{DateTime, Utc}; use libp2p::{PeerId, swarm::Stream}; use pluto_core::version::{self, SemVer, SemVerError}; +use pluto_p2p::proto::MAX_MESSAGE_SIZE; use regex::Regex; use tokio::sync::Mutex; use tracing::{info, warn}; @@ -29,9 +30,6 @@ use crate::{ peerinfopb::v1::peerinfo::PeerInfo, }; -/// Maximum message size (64KB should be plenty for peer info). -const MAX_MESSAGE_SIZE: usize = 64 * 1024; - static GIT_HASH_RE: LazyLock = LazyLock::new(|| Regex::new(r"^[0-9a-f]{7}$").expect("invalid regex")); @@ -585,8 +583,8 @@ mod tests { async fn test_read_protobuf_message_too_large() { // Create a buffer with a length prefix that exceeds MAX_MESSAGE_SIZE let mut buf = Vec::new(); - let large_len = MAX_MESSAGE_SIZE + 1; - let mut len_buf = unsigned_varint::encode::usize_buffer(); + let large_len = pluto_p2p::proto::MAX_MESSAGE_SIZE + 1; + let mut len_buf: [u8; 10] = unsigned_varint::encode::usize_buffer(); let encoded_len = unsigned_varint::encode::usize(large_len, &mut len_buf); buf.extend_from_slice(encoded_len);