From 4961ce008897182aaac6da758de6ae7de0aaa23c Mon Sep 17 00:00:00 2001 From: elizabeth Date: Mon, 7 Jul 2025 17:02:39 -0400 Subject: [PATCH 1/9] initial impl with incoming/outgoing message impl --- Cargo.lock | 1200 +++++++++++++++++++++++++++++++++-- Cargo.toml | 3 +- crates/p2p/Cargo.toml | 15 + crates/p2p/src/behaviour.rs | 165 +++++ crates/p2p/src/lib.rs | 299 +++++++++ crates/p2p/src/message.rs | 122 ++++ crates/p2p/src/protocol.rs | 73 +++ 7 files changed, 1821 insertions(+), 56 deletions(-) create mode 100644 crates/p2p/Cargo.toml create mode 100644 crates/p2p/src/behaviour.rs create mode 100644 crates/p2p/src/lib.rs create mode 100644 crates/p2p/src/message.rs create mode 100644 crates/p2p/src/protocol.rs diff --git a/Cargo.lock b/Cargo.lock index 6e8a731e..93983a60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -226,6 +226,31 @@ dependencies = [ "generic-array", ] +[[package]] +name = "aes" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + +[[package]] +name = "aes-gcm" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "831010a0f742e1209b3bcea8fab6a8e149051ba6099432c8cb2cc117dec3ead1" +dependencies = [ + "aead", + "aes", + "cipher", + "ctr", + "ghash", + "subtle", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -1235,6 +1260,12 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "arrayref" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb" + [[package]] name = "arrayvec" version = "0.7.6" @@ -1293,6 +1324,36 @@ dependencies = [ "serde_json", ] +[[package]] +name = "async-io" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1237c0ae75a0f3765f58910ff9cdd0a12eeb39ab2f4c7de23262f337f0aacbb3" +dependencies = [ + "async-lock", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite 2.6.0", + "parking", + "polling", + "rustix 1.0.7", + "slab", + "tracing", + "windows-sys 0.59.0", +] + +[[package]] +name = "async-lock" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -1337,6 +1398,19 @@ dependencies = [ "rustc_version 0.4.1", ] +[[package]] +name = "asynchronous-codec" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a860072022177f903e59730004fb5dc13db9275b79bb2aef7ba8ce831956c233" +dependencies = [ + "bytes", + "futures-sink", + "futures-util", + "memchr", + "pin-project-lite", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -1420,6 +1494,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "base-x" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270" + [[package]] name = "base16ct" version = "0.2.0" @@ -1528,6 +1608,15 @@ dependencies = [ "wyz", ] +[[package]] +name = "blake2" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" +dependencies = [ + "digest 0.10.7", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -1563,7 +1652,7 @@ dependencies = [ "hex", "http 1.3.1", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-named-pipe", "hyper-util", "hyperlocal", @@ -1620,6 +1709,15 @@ dependencies = [ "alloc-stdlib", ] +[[package]] +name = "bs58" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf88ba1141d185c399bee5288d850d63b8369520c1eafc32a0430b5b6c287bf4" +dependencies = [ + "tinyvec", +] + [[package]] name = "bumpalo" version = "3.17.0" @@ -1678,6 +1776,15 @@ dependencies = [ "serde", ] +[[package]] +name = "cbor4ii" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "472931dd4dfcc785075b09be910147f9c6258883fc4591d0dac6116392b2daa6" +dependencies = [ + "serde", +] + [[package]] name = "cc" version = "1.2.20" @@ -1721,6 +1828,19 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "chacha20poly1305" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10cd79432192d1c0f4e1a0fef9527696cc039165d729fb41b3f4f4f354c2dc35" +dependencies = [ + "aead", + "chacha20", + "cipher", + "poly1305", + "zeroize", +] + [[package]] name = "chrono" version = "0.4.41" @@ -1941,6 +2061,15 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505" +dependencies = [ + "memchr", +] + [[package]] name = "cpufeatures" version = "0.2.17" @@ -2075,6 +2204,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "ctr" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" +dependencies = [ + "cipher", +] + [[package]] name = "ctrlc" version = "3.4.6" @@ -2169,6 +2307,26 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" +[[package]] +name = "data-encoding-macro" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47ce6c96ea0102f01122a185683611bd5ac8d99e62bc59dd12e6bda344ee673d" +dependencies = [ + "data-encoding", + "data-encoding-macro-internal", +] + +[[package]] +name = "data-encoding-macro-internal" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d162beedaa69905488a8da94f5ac3edb4dd4788b732fadb7bd120b2625c1976" +dependencies = [ + "data-encoding", + "syn 2.0.101", +] + [[package]] name = "der" version = "0.7.10" @@ -2634,6 +2792,27 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "event-listener" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "eyre" version = "0.6.12" @@ -2735,7 +2914,7 @@ dependencies = [ "futures-core", "futures-sink", "nanorand", - "spin", + "spin 0.9.8", ] [[package]] @@ -2801,6 +2980,16 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-bounded" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91f328e7fb845fc832912fb6a34f40cf6d1888c92f974d1893a54e97b5ff542e" +dependencies = [ + "futures-timer", + "futures-util", +] + [[package]] name = "futures-buffered" version = "0.2.11" @@ -2811,7 +3000,7 @@ dependencies = [ "diatomic-waker", "futures-core", "pin-project-lite", - "spin", + "spin 0.9.8", ] [[package]] @@ -2839,6 +3028,7 @@ dependencies = [ "futures-core", "futures-task", "futures-util", + "num_cpus", ] [[package]] @@ -2886,6 +3076,17 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "futures-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f2f12607f92c69b12ed746fabf9ca4f5c482cba46679c1a75b874ed7c26adb" +dependencies = [ + "futures-io", + "rustls", + "rustls-pki-types", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -2898,6 +3099,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.31" @@ -2973,6 +3180,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "ghash" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0d8a4362ccb29cb0b265253fb0a2728f592895ee6854fd9bc13f2ffda266ff1" +dependencies = [ + "opaque-debug", + "polyval", +] + [[package]] name = "gimli" version = "0.31.1" @@ -3072,7 +3289,7 @@ dependencies = [ "regex", "reqwest", "reqwest-middleware", - "ring", + "ring 0.17.14", "serde", "serde_json", "sha2", @@ -3177,6 +3394,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "hex" version = "0.4.3" @@ -3195,6 +3418,31 @@ dependencies = [ "arrayvec", ] +[[package]] +name = "hickory-proto" +version = "0.24.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92652067c9ce6f66ce53cc38d1169daa36e6e7eb7dd3b63b5103bd9d97117248" +dependencies = [ + "async-trait", + "cfg-if", + "data-encoding", + "enum-as-inner", + "futures-channel", + "futures-io", + "futures-util", + "idna 1.0.3", + "ipnet", + "once_cell", + "rand 0.8.5", + "socket2", + "thiserror 1.0.69", + "tinyvec", + "tokio", + "tracing", + "url", +] + [[package]] name = "hickory-proto" version = "0.25.2" @@ -3212,7 +3460,7 @@ dependencies = [ "ipnet", "once_cell", "rand 0.9.1", - "ring", + "ring 0.17.14", "thiserror 2.0.12", "tinyvec", "tokio", @@ -3220,6 +3468,27 @@ dependencies = [ "url", ] +[[package]] +name = "hickory-resolver" +version = "0.24.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbb117a1ca520e111743ab2f6688eddee69db4e0ea242545a604dce8a66fd22e" +dependencies = [ + "cfg-if", + "futures-util", + "hickory-proto 0.24.4", + "ipconfig", + "lru-cache", + "once_cell", + "parking_lot 0.12.3", + "rand 0.8.5", + "resolv-conf", + "smallvec", + "thiserror 1.0.69", + "tokio", + "tracing", +] + [[package]] name = "hickory-resolver" version = "0.25.2" @@ -3228,7 +3497,7 @@ checksum = "dc62a9a99b0bfb44d2ab95a7208ac952d31060efc16241c87eaf36406fecf87a" dependencies = [ "cfg-if", "futures-util", - "hickory-proto", + "hickory-proto 0.25.2", "ipconfig", "moka", "once_cell", @@ -3241,6 +3510,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "hkdf" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" +dependencies = [ + "hmac", +] + [[package]] name = "hmac" version = "0.12.1" @@ -3315,6 +3593,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -3334,7 +3623,7 @@ dependencies = [ "bytes", "futures-core", "http 1.3.1", - "http-body", + "http-body 1.0.1", "pin-project-lite", ] @@ -3350,6 +3639,30 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +[[package]] +name = "hyper" +version = "0.14.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.3.26", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.6.0" @@ -3361,7 +3674,7 @@ dependencies = [ "futures-util", "h2 0.4.9", "http 1.3.1", - "http-body", + "http-body 1.0.1", "httparse", "httpdate", "itoa", @@ -3378,7 +3691,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278" dependencies = [ "hex", - "hyper", + "hyper 1.6.0", "hyper-util", "pin-project-lite", "tokio", @@ -3394,7 +3707,7 @@ checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2" dependencies = [ "futures-util", "http 1.3.1", - "hyper", + "hyper 1.6.0", "hyper-util", "rustls", "rustls-pki-types", @@ -3412,7 +3725,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-util", "native-tls", "tokio", @@ -3430,8 +3743,8 @@ dependencies = [ "futures-channel", "futures-util", "http 1.3.1", - "http-body", - "hyper", + "http-body 1.0.1", + "hyper 1.6.0", "libc", "pin-project-lite", "socket2", @@ -3448,7 +3761,7 @@ checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7" dependencies = [ "hex", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-util", "pin-project-lite", "tokio", @@ -3634,12 +3947,64 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "if-addrs" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cabb0019d51a643781ff15c9c8a3e5dedc365c47211270f4e8f82812fedd8f0a" +dependencies = [ + "libc", + "windows-sys 0.48.0", +] + +[[package]] +name = "if-watch" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdf9d64cfcf380606e64f9a0bcf493616b65331199f984151a6fa11a7b3cde38" +dependencies = [ + "async-io", + "core-foundation", + "fnv", + "futures", + "if-addrs", + "ipnet", + "log", + "netlink-packet-core", + "netlink-packet-route 0.17.1", + "netlink-proto", + "netlink-sys", + "rtnetlink 0.13.1", + "system-configuration", + "tokio", + "windows 0.52.0", +] + [[package]] name = "if_chain" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb56e1aa765b4b4f3aadfab769793b7087bb03a4ea4920644a6d238e2df5b9ed" +[[package]] +name = "igd-next" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "064d90fec10d541084e7b39ead8875a5a80d9114a2b18791565253bae25f49e4" +dependencies = [ + "async-trait", + "attohttpc", + "bytes", + "futures", + "http 0.2.12", + "hyper 0.14.32", + "log", + "rand 0.8.5", + "tokio", + "url", + "xmltree", +] + [[package]] name = "igd-next" version = "0.15.1" @@ -3652,7 +4017,7 @@ dependencies = [ "futures", "http 1.3.1", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-util", "log", "rand 0.8.5", @@ -3801,9 +4166,9 @@ dependencies = [ "derive_more 1.0.0", "ed25519-dalek", "futures-util", - "hickory-resolver", + "hickory-resolver 0.25.2", "http 1.3.1", - "igd-next", + "igd-next 0.15.1", "instant", "iroh-base", "iroh-metrics", @@ -3818,9 +4183,9 @@ dependencies = [ "pkarr", "portmapper", "rand 0.8.5", - "rcgen", + "rcgen 0.13.2", "reqwest", - "ring", + "ring 0.17.14", "rustls", "rustls-webpki 0.102.8", "serde", @@ -3900,7 +4265,7 @@ dependencies = [ "bytes", "getrandom 0.2.16", "rand 0.8.5", - "ring", + "ring 0.17.14", "rustc-hash 2.1.1", "rustls", "rustls-pki-types", @@ -3936,10 +4301,10 @@ dependencies = [ "cfg_aliases", "data-encoding", "derive_more 1.0.0", - "hickory-resolver", + "hickory-resolver 0.25.2", "http 1.3.1", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-util", "iroh-base", "iroh-metrics", @@ -4061,7 +4426,7 @@ dependencies = [ "base64 0.22.1", "js-sys", "pem", - "ring", + "ring 0.17.14", "serde", "serde_json", "simple_asn1", @@ -4113,32 +4478,464 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] -name = "lazycell" -version = "1.3.0" +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + +[[package]] +name = "libc" +version = "0.2.172" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" + +[[package]] +name = "libloading" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" +dependencies = [ + "cfg-if", + "windows-targets 0.52.6", +] + +[[package]] +name = "libm" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9627da5196e5d8ed0b0495e61e518847578da83483c37288316d9b2e03a7f72" + +[[package]] +name = "libp2p" +version = "0.54.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbbe80f9c7e00526cd6b838075b9c171919404a4732cb2fa8ece0a093223bfc4" +dependencies = [ + "bytes", + "either", + "futures", + "futures-timer", + "getrandom 0.2.16", + "libp2p-allow-block-list", + "libp2p-autonat", + "libp2p-connection-limits", + "libp2p-core", + "libp2p-dns", + "libp2p-identify", + "libp2p-identity", + "libp2p-kad", + "libp2p-mdns", + "libp2p-metrics", + "libp2p-noise", + "libp2p-ping", + "libp2p-quic", + "libp2p-request-response", + "libp2p-swarm", + "libp2p-tcp", + "libp2p-upnp", + "libp2p-yamux", + "multiaddr", + "pin-project", + "rw-stream-sink", + "thiserror 1.0.69", +] + +[[package]] +name = "libp2p-allow-block-list" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1027ccf8d70320ed77e984f273bc8ce952f623762cb9bf2d126df73caef8041" +dependencies = [ + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "void", +] + +[[package]] +name = "libp2p-autonat" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a083675f189803d0682a2726131628e808144911dad076858bfbe30b13065499" +dependencies = [ + "async-trait", + "asynchronous-codec", + "bytes", + "either", + "futures", + "futures-bounded", + "futures-timer", + "libp2p-core", + "libp2p-identity", + "libp2p-request-response", + "libp2p-swarm", + "quick-protobuf", + "quick-protobuf-codec", + "rand 0.8.5", + "rand_core 0.6.4", + "thiserror 1.0.69", + "tracing", + "void", + "web-time", +] + +[[package]] +name = "libp2p-connection-limits" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d003540ee8baef0d254f7b6bfd79bac3ddf774662ca0abf69186d517ef82ad8" +dependencies = [ + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "void", +] + +[[package]] +name = "libp2p-core" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a61f26c83ed111104cd820fe9bc3aaabbac5f1652a1d213ed6e900b7918a1298" +dependencies = [ + "either", + "fnv", + "futures", + "futures-timer", + "libp2p-identity", + "multiaddr", + "multihash", + "multistream-select", + "once_cell", + "parking_lot 0.12.3", + "pin-project", + "quick-protobuf", + "rand 0.8.5", + "rw-stream-sink", + "smallvec", + "thiserror 1.0.69", + "tracing", + "unsigned-varint 0.8.0", + "void", + "web-time", +] + +[[package]] +name = "libp2p-dns" +version = "0.42.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97f37f30d5c7275db282ecd86e54f29dd2176bd3ac656f06abf43bedb21eb8bd" +dependencies = [ + "async-trait", + "futures", + "hickory-resolver 0.24.4", + "libp2p-core", + "libp2p-identity", + "parking_lot 0.12.3", + "smallvec", + "tracing", +] + +[[package]] +name = "libp2p-identify" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1711b004a273be4f30202778856368683bd9a83c4c7dcc8f848847606831a4e3" +dependencies = [ + "asynchronous-codec", + "either", + "futures", + "futures-bounded", + "futures-timer", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "lru 0.12.5", + "quick-protobuf", + "quick-protobuf-codec", + "smallvec", + "thiserror 1.0.69", + "tracing", + "void", +] + +[[package]] +name = "libp2p-identity" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3104e13b51e4711ff5738caa1fb54467c8604c2e94d607e27745bcf709068774" +dependencies = [ + "bs58", + "ed25519-dalek", + "hkdf", + "multihash", + "quick-protobuf", + "rand 0.8.5", + "sha2", + "thiserror 2.0.12", + "tracing", + "zeroize", +] + +[[package]] +name = "libp2p-kad" +version = "0.46.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced237d0bd84bbebb7c2cad4c073160dacb4fe40534963c32ed6d4c6bb7702a3" +dependencies = [ + "arrayvec", + "asynchronous-codec", + "bytes", + "either", + "fnv", + "futures", + "futures-bounded", + "futures-timer", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "quick-protobuf", + "quick-protobuf-codec", + "rand 0.8.5", + "sha2", + "smallvec", + "thiserror 1.0.69", + "tracing", + "uint", + "void", + "web-time", +] + +[[package]] +name = "libp2p-mdns" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14b8546b6644032565eb29046b42744aee1e9f261ed99671b2c93fb140dba417" +dependencies = [ + "data-encoding", + "futures", + "hickory-proto 0.24.4", + "if-watch", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "rand 0.8.5", + "smallvec", + "socket2", + "tokio", + "tracing", + "void", +] + +[[package]] +name = "libp2p-metrics" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ebafa94a717c8442d8db8d3ae5d1c6a15e30f2d347e0cd31d057ca72e42566" +dependencies = [ + "futures", + "libp2p-core", + "libp2p-identify", + "libp2p-identity", + "libp2p-kad", + "libp2p-ping", + "libp2p-swarm", + "pin-project", + "prometheus-client", + "web-time", +] + +[[package]] +name = "libp2p-noise" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36b137cb1ae86ee39f8e5d6245a296518912014eaa87427d24e6ff58cfc1b28c" +dependencies = [ + "asynchronous-codec", + "bytes", + "curve25519-dalek", + "futures", + "libp2p-core", + "libp2p-identity", + "multiaddr", + "multihash", + "once_cell", + "quick-protobuf", + "rand 0.8.5", + "sha2", + "snow", + "static_assertions", + "thiserror 1.0.69", + "tracing", + "x25519-dalek", + "zeroize", +] + +[[package]] +name = "libp2p-ping" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "005a34420359223b974ee344457095f027e51346e992d1e0dcd35173f4cdd422" +dependencies = [ + "either", + "futures", + "futures-timer", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "rand 0.8.5", + "tracing", + "void", + "web-time", +] + +[[package]] +name = "libp2p-quic" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46352ac5cd040c70e88e7ff8257a2ae2f891a4076abad2c439584a31c15fd24e" +dependencies = [ + "bytes", + "futures", + "futures-timer", + "if-watch", + "libp2p-core", + "libp2p-identity", + "libp2p-tls", + "parking_lot 0.12.3", + "quinn", + "rand 0.8.5", + "ring 0.17.14", + "rustls", + "socket2", + "thiserror 1.0.69", + "tokio", + "tracing", +] + +[[package]] +name = "libp2p-request-response" +version = "0.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1356c9e376a94a75ae830c42cdaea3d4fe1290ba409a22c809033d1b7dcab0a6" +dependencies = [ + "async-trait", + "cbor4ii", + "futures", + "futures-bounded", + "futures-timer", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "rand 0.8.5", + "serde", + "smallvec", + "tracing", + "void", + "web-time", +] + +[[package]] +name = "libp2p-swarm" +version = "0.45.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7dd6741793d2c1fb2088f67f82cf07261f25272ebe3c0b0c311e0c6b50e851a" +dependencies = [ + "either", + "fnv", + "futures", + "futures-timer", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm-derive", + "lru 0.12.5", + "multistream-select", + "once_cell", + "rand 0.8.5", + "smallvec", + "tokio", + "tracing", + "void", + "web-time", +] + +[[package]] +name = "libp2p-swarm-derive" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206e0aa0ebe004d778d79fb0966aa0de996c19894e2c0605ba2f8524dd4443d8" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.101", +] + +[[package]] +name = "libp2p-tcp" +version = "0.42.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +checksum = "ad964f312c59dcfcac840acd8c555de8403e295d39edf96f5240048b5fcaa314" +dependencies = [ + "futures", + "futures-timer", + "if-watch", + "libc", + "libp2p-core", + "libp2p-identity", + "socket2", + "tokio", + "tracing", +] [[package]] -name = "libc" -version = "0.2.172" +name = "libp2p-tls" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" +checksum = "47b23dddc2b9c355f73c1e36eb0c3ae86f7dc964a3715f0731cfad352db4d847" +dependencies = [ + "futures", + "futures-rustls", + "libp2p-core", + "libp2p-identity", + "rcgen 0.11.3", + "ring 0.17.14", + "rustls", + "rustls-webpki 0.101.7", + "thiserror 1.0.69", + "x509-parser", + "yasna", +] [[package]] -name = "libloading" -version = "0.8.6" +name = "libp2p-upnp" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" +checksum = "01bf2d1b772bd3abca049214a3304615e6a36fa6ffc742bdd1ba774486200b8f" dependencies = [ - "cfg-if", - "windows-targets 0.52.6", + "futures", + "futures-timer", + "igd-next 0.14.3", + "libp2p-core", + "libp2p-swarm", + "tokio", + "tracing", + "void", ] [[package]] -name = "libm" -version = "0.2.13" +name = "libp2p-yamux" +version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9627da5196e5d8ed0b0495e61e518847578da83483c37288316d9b2e03a7f72" +checksum = "788b61c80789dba9760d8c669a5bedb642c8267555c803fabd8396e4ca5c5882" +dependencies = [ + "either", + "futures", + "libp2p-core", + "thiserror 1.0.69", + "tracing", + "yamux 0.12.1", + "yamux 0.13.5", +] [[package]] name = "libredox" @@ -4159,12 +4956,24 @@ dependencies = [ "zlib-rs", ] +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + [[package]] name = "linux-raw-sys" version = "0.4.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" +[[package]] +name = "linux-raw-sys" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" + [[package]] name = "litemap" version = "0.7.5" @@ -4251,6 +5060,15 @@ dependencies = [ "hashbrown 0.15.2", ] +[[package]] +name = "lru-cache" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" +dependencies = [ + "linked-hash-map", +] + [[package]] name = "mach" version = "0.3.2" @@ -4375,9 +5193,9 @@ dependencies = [ "colored", "futures-util", "http 1.3.1", - "http-body", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-util", "log", "rand 0.9.1", @@ -4407,6 +5225,60 @@ dependencies = [ "uuid", ] +[[package]] +name = "multiaddr" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe6351f60b488e04c1d21bc69e56b89cb3f5e8f5d22557d6e8031bdfd79b6961" +dependencies = [ + "arrayref", + "byteorder", + "data-encoding", + "libp2p-identity", + "multibase", + "multihash", + "percent-encoding", + "serde", + "static_assertions", + "unsigned-varint 0.8.0", + "url", +] + +[[package]] +name = "multibase" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b3539ec3c1f04ac9748a260728e855f261b4977f5c3406612c884564f329404" +dependencies = [ + "base-x", + "data-encoding", + "data-encoding-macro", +] + +[[package]] +name = "multihash" +version = "0.19.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b430e7953c29dd6a09afc29ff0bb69c6e306329ee6794700aee27b76a1aea8d" +dependencies = [ + "core2", + "unsigned-varint 0.8.0", +] + +[[package]] +name = "multistream-select" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea0df8e5eec2298a62b326ee4f0d7fe1a6b90a09dfcf9df37b38f947a8c42f19" +dependencies = [ + "bytes", + "futures", + "log", + "pin-project", + "smallvec", + "unsigned-varint 0.7.2", +] + [[package]] name = "n0-future" version = "0.1.3" @@ -4651,6 +5523,12 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43794a0ace135be66a25d3ae77d41b91615fb68ae937f904090203e81f755b65" +[[package]] +name = "nohash-hasher" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" + [[package]] name = "nom" version = "7.1.3" @@ -4741,7 +5619,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi", + "hermit-abi 0.3.9", "libc", ] @@ -4934,6 +5812,17 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "p2p" +version = "0.3.10" +dependencies = [ + "anyhow", + "libp2p", + "serde", + "tokio", + "void", +] + [[package]] name = "parity-scale-codec" version = "3.7.4" @@ -5216,6 +6105,21 @@ dependencies = [ "pnet_macros_support", ] +[[package]] +name = "polling" +version = "3.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b53a684391ad002dd6a596ceb6c74fd004fdce75f4be2e3f615068abbea5fd50" +dependencies = [ + "cfg-if", + "concurrent-queue", + "hermit-abi 0.5.2", + "pin-project-lite", + "rustix 1.0.7", + "tracing", + "windows-sys 0.59.0", +] + [[package]] name = "poly1305" version = "0.8.0" @@ -5227,6 +6131,18 @@ dependencies = [ "universal-hash", ] +[[package]] +name = "polyval" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d1fe60d06143b2430aa532c94cfe9e29783047f06c0d7fd359a9a51b729fa25" +dependencies = [ + "cfg-if", + "cpufeatures", + "opaque-debug", + "universal-hash", +] + [[package]] name = "portable-atomic" version = "1.11.0" @@ -5253,7 +6169,7 @@ dependencies = [ "derive_more 1.0.0", "futures-lite 2.6.0", "futures-util", - "igd-next", + "igd-next 0.15.1", "iroh-metrics", "libc", "netwatch", @@ -5574,6 +6490,28 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quick-protobuf" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d6da84cc204722a989e01ba2f6e1e276e190f22263d0cb6ce8526fcdb0d2e1f" +dependencies = [ + "byteorder", +] + +[[package]] +name = "quick-protobuf-codec" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15a0580ab32b169745d7a39db2ba969226ca16738931be152a3209b409de2474" +dependencies = [ + "asynchronous-codec", + "bytes", + "quick-protobuf", + "thiserror 1.0.69", + "unsigned-varint 0.8.0", +] + [[package]] name = "quinn" version = "0.11.7" @@ -5582,6 +6520,7 @@ checksum = "c3bd15a6f2967aef83887dcb9fec0014580467e33720d073560cf015a5683012" dependencies = [ "bytes", "cfg_aliases", + "futures-io", "pin-project-lite", "quinn-proto", "quinn-udp", @@ -5603,7 +6542,7 @@ dependencies = [ "bytes", "getrandom 0.3.2", "rand 0.9.1", - "ring", + "ring 0.17.14", "rustc-hash 2.1.1", "rustls", "rustls-pki-types", @@ -5765,6 +6704,18 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rcgen" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c4f3084aa3bc7dfbba4eff4fab2a54db4324965d8872ab933565e6fbd83bc6" +dependencies = [ + "pem", + "ring 0.16.20", + "time", + "yasna", +] + [[package]] name = "rcgen" version = "0.13.2" @@ -5772,7 +6723,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75e669e5202259b5314d1ea5397316ad400819437857b90861765f24c4cf80a2" dependencies = [ "pem", - "ring", + "ring 0.17.14", "rustls-pki-types", "time", "yasna", @@ -5911,9 +6862,9 @@ dependencies = [ "futures-util", "h2 0.4.9", "http 1.3.1", - "http-body", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-rustls", "hyper-tls", "hyper-util", @@ -5981,6 +6932,21 @@ dependencies = [ "subtle", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin 0.5.2", + "untrusted 0.7.1", + "web-sys", + "winapi", +] + [[package]] name = "ring" version = "0.17.14" @@ -5991,7 +6957,7 @@ dependencies = [ "cfg-if", "getrandom 0.2.16", "libc", - "untrusted", + "untrusted 0.9.0", "windows-sys 0.52.0", ] @@ -6168,7 +7134,20 @@ dependencies = [ "bitflags 2.9.0", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.15", + "windows-sys 0.59.0", +] + +[[package]] +name = "rustix" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c71e83d6afe7ff64890ec6b71d6a69bb8a610ab78ce364b3352876bb4c801266" +dependencies = [ + "bitflags 2.9.0", + "errno", + "libc", + "linux-raw-sys 0.9.4", "windows-sys 0.59.0", ] @@ -6181,7 +7160,7 @@ dependencies = [ "aws-lc-rs", "log", "once_cell", - "ring", + "ring 0.17.14", "rustls-pki-types", "rustls-webpki 0.103.1", "subtle", @@ -6206,15 +7185,25 @@ dependencies = [ "web-time", ] +[[package]] +name = "rustls-webpki" +version = "0.101.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" +dependencies = [ + "ring 0.17.14", + "untrusted 0.9.0", +] + [[package]] name = "rustls-webpki" version = "0.102.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" dependencies = [ - "ring", + "ring 0.17.14", "rustls-pki-types", - "untrusted", + "untrusted 0.9.0", ] [[package]] @@ -6224,9 +7213,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fef8b8769aaccf73098557a87cd1816b4f9c7c16811c9c77142aa695c16f2c03" dependencies = [ "aws-lc-rs", - "ring", + "ring 0.17.14", "rustls-pki-types", - "untrusted", + "untrusted 0.9.0", ] [[package]] @@ -6247,6 +7236,17 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "rw-stream-sink" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8c9026ff5d2f23da5e45bbc283f156383001bfb09c4e44256d02c1a685fe9a1" +dependencies = [ + "futures", + "pin-project", + "static_assertions", +] + [[package]] name = "ryu" version = "1.0.20" @@ -6705,6 +7705,23 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" +[[package]] +name = "snow" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "850948bee068e713b8ab860fe1adc4d109676ab4c3b621fd8147f06b261f2f85" +dependencies = [ + "aes-gcm", + "blake2", + "chacha20poly1305", + "curve25519-dalek", + "rand_core 0.6.4", + "ring 0.17.14", + "rustc_version 0.4.1", + "sha2", + "subtle", +] + [[package]] name = "socket2" version = "0.5.9" @@ -6715,6 +7732,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "spin" version = "0.9.8" @@ -6845,7 +7868,7 @@ dependencies = [ "lazy_static", "md-5", "rand 0.8.5", - "ring", + "ring 0.17.14", "subtle", "thiserror 1.0.69", "tokio", @@ -7010,7 +8033,7 @@ dependencies = [ "cfg-if", "fastrand 2.3.0", "once_cell", - "rustix", + "rustix 0.38.44", "windows-sys 0.59.0", ] @@ -7565,6 +8588,24 @@ dependencies = [ "subtle", ] +[[package]] +name = "unsigned-varint" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6889a77d49f1f013504cec6bf97a2c730394adedaeb1deb5ea08949a50541105" + +[[package]] +name = "unsigned-varint" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" + +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "untrusted" version = "0.9.0" @@ -7784,6 +8825,12 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" + [[package]] name = "vte" version = "0.14.1" @@ -8005,7 +9052,7 @@ dependencies = [ "either", "home", "once_cell", - "rustix", + "rustix 0.38.44", ] [[package]] @@ -8670,6 +9717,18 @@ dependencies = [ "tap", ] +[[package]] +name = "x25519-dalek" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7e468321c81fb07fa7f4c636c3972b9100f0346e5b6a9f2bd0603a52f7ed277" +dependencies = [ + "curve25519-dalek", + "rand_core 0.6.4", + "serde", + "zeroize", +] + [[package]] name = "x509-parser" version = "0.16.0" @@ -8702,6 +9761,37 @@ dependencies = [ "xml-rs", ] +[[package]] +name = "yamux" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed0164ae619f2dc144909a9f082187ebb5893693d8c0196e8085283ccd4b776" +dependencies = [ + "futures", + "log", + "nohash-hasher", + "parking_lot 0.12.3", + "pin-project", + "rand 0.8.5", + "static_assertions", +] + +[[package]] +name = "yamux" +version = "0.13.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3da1acad1c2dc53f0dde419115a38bd8221d8c3e47ae9aeceaf453266d29307e" +dependencies = [ + "futures", + "log", + "nohash-hasher", + "parking_lot 0.12.3", + "pin-project", + "rand 0.9.1", + "static_assertions", + "web-time", +] + [[package]] name = "yasna" version = "0.5.2" diff --git a/Cargo.toml b/Cargo.toml index 878eec1c..7bc5fc2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "crates/validator", "crates/shared", "crates/orchestrator", + "crates/p2p", "crates/dev-utils", ] resolver = "2" @@ -48,7 +49,7 @@ edition = "2021" match_same_arms = "warn" unused_async = "warn" uninlined_format_args = "warn" +manual_let_else = "warn" [workspace.lints.rust] unreachable_pub = "warn" -manual_let_else = "warn" \ No newline at end of file diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml new file mode 100644 index 00000000..2d5d94ff --- /dev/null +++ b/crates/p2p/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "p2p" +version.workspace = true +edition.workspace = true + +[dependencies] +libp2p = { version = "0.54", features = ["request-response", "identify", "ping", "mdns", "noise", "tcp", "autonat", "kad", "tokio", "cbor", "macros", "yamux"] } +void = "1.0" + +anyhow = {workspace = true} +serde = {workspace = true} +tokio = {workspace = true, features = ["sync"]} + +[lints] +workspace = true diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs new file mode 100644 index 00000000..6b66394e --- /dev/null +++ b/crates/p2p/src/behaviour.rs @@ -0,0 +1,165 @@ +use anyhow::Context as _; +use anyhow::Result; +use libp2p::autonat; +use libp2p::connection_limits; +use libp2p::connection_limits::ConnectionLimits; +use libp2p::identify; +use libp2p::identity; +use libp2p::kad; +use libp2p::kad::store::MemoryStore; +use libp2p::mdns; +use libp2p::ping; +use libp2p::request_response; +use libp2p::swarm::NetworkBehaviour; +use std::time::Duration; + +use crate::message::IncomingMessage; +use crate::message::{Request, Response}; +use crate::Protocols; +use crate::PRIME_STREAM_PROTOCOL; + +#[derive(NetworkBehaviour)] +#[behaviour(to_swarm = "BehaviourEvent")] +pub(crate) struct Behaviour { + // connection gating + connection_limits: connection_limits::Behaviour, + + // discovery + mdns: mdns::tokio::Behaviour, + kademlia: kad::Behaviour, + + // protocols + identify: identify::Behaviour, + ping: ping::Behaviour, + request_response: request_response::cbor::Behaviour, + + // nat traversal + autonat: autonat::Behaviour, +} + +#[derive(Debug)] +pub(crate) enum BehaviourEvent { + Autonat(autonat::Event), + Identify(identify::Event), + Kademlia(kad::Event), + Mdns(mdns::Event), + Ping(ping::Event), + RequestResponse(request_response::Event), +} + +impl From for BehaviourEvent { + fn from(_: void::Void) -> Self { + unreachable!("void::Void cannot be converted to BehaviourEvent") + } +} + +impl From for BehaviourEvent { + fn from(event: autonat::Event) -> Self { + BehaviourEvent::Autonat(event) + } +} + +impl From for BehaviourEvent { + fn from(event: kad::Event) -> Self { + BehaviourEvent::Kademlia(event) + } +} + +impl From for BehaviourEvent { + fn from(event: libp2p::mdns::Event) -> Self { + BehaviourEvent::Mdns(event) + } +} + +impl From for BehaviourEvent { + fn from(event: ping::Event) -> Self { + BehaviourEvent::Ping(event) + } +} + +impl From for BehaviourEvent { + fn from(event: identify::Event) -> Self { + BehaviourEvent::Identify(event) + } +} + +impl From> for BehaviourEvent { + fn from(event: request_response::Event) -> Self { + BehaviourEvent::RequestResponse(event) + } +} + +impl Behaviour { + pub(crate) fn new( + keypair: &identity::Keypair, + protocols: Protocols, + agent_version: String, + ) -> Result { + let peer_id = keypair.public().to_peer_id(); + + let protocols = protocols.into_iter().map(|protocol| { + ( + protocol.as_stream_protocol(), + request_response::ProtocolSupport::Full, // TODO: configure inbound/outbound based on node role and protocol + ) + }); + + let autonat = autonat::Behaviour::new(peer_id, autonat::Config::default()); + let connection_limits = connection_limits::Behaviour::new( + ConnectionLimits::default().with_max_established(Some(100)), + ); + + let mdns = mdns::tokio::Behaviour::new(mdns::Config::default(), peer_id) + .context("failed to create mDNS behaviour")?; + let kademlia = kad::Behaviour::new(peer_id, MemoryStore::new(peer_id)); + + let identify = identify::Behaviour::new( + identify::Config::new(PRIME_STREAM_PROTOCOL.to_string(), keypair.public()) + .with_agent_version(agent_version), + ); + let ping = ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(10))); + + Ok(Self { + autonat, + connection_limits, + kademlia, + mdns, + identify, + ping, + request_response: request_response::cbor::Behaviour::new( + protocols, + request_response::Config::default(), + ), + }) + } + + pub(crate) fn request_response( + &mut self, + ) -> &mut request_response::cbor::Behaviour { + &mut self.request_response + } +} + +impl BehaviourEvent { + pub(crate) async fn handle(self, message_tx: tokio::sync::mpsc::Sender) { + match self { + BehaviourEvent::Autonat(_event) => {} + BehaviourEvent::Identify(_event) => {} + BehaviourEvent::Kademlia(_event) => { // TODO: potentially on outbound queries + } + BehaviourEvent::Mdns(_event) => {} + BehaviourEvent::Ping(_event) => {} + BehaviourEvent::RequestResponse(event) => match event { + request_response::Event::Message { peer, message } => { + let _ = message_tx + .send(IncomingMessage { + peer: peer.clone(), + message, + }) + .await; + } + _ => {} + }, + } + } +} diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs new file mode 100644 index 00000000..adde16af --- /dev/null +++ b/crates/p2p/src/lib.rs @@ -0,0 +1,299 @@ +use anyhow::Context; +use anyhow::Result; +use libp2p::futures::stream::FuturesUnordered; +use libp2p::multiaddr::Protocol; +use libp2p::noise; +use libp2p::swarm::SwarmEvent; +use libp2p::tcp; +use libp2p::yamux; +use libp2p::Multiaddr; +use libp2p::Swarm; +use libp2p::SwarmBuilder; +use libp2p::{identity, PeerId, Transport}; +use std::time::Duration; + +mod behaviour; +mod message; +mod protocol; + +use behaviour::Behaviour; +use message::{IncomingMessage, OutgoingMessage, OutgoingMessageInner}; +use protocol::Protocols; + +pub const PRIME_STREAM_PROTOCOL: libp2p::StreamProtocol = + libp2p::StreamProtocol::new("/prime/1.0.0"); +// TODO: force this to be passed by the user +pub const DEFAULT_AGENT_VERSION: &str = "prime-node/0.1.0"; + +pub struct Node { + peer_id: PeerId, + listen_addrs: Vec, + swarm: Swarm, + bootnodes: Vec, + + // channel for sending incoming messages to the consumer of this library + incoming_message_tx: tokio::sync::mpsc::Sender, + + // channel for receiving outgoing messages from the consumer of this library + outgoing_message_rx: tokio::sync::mpsc::Receiver, +} + +impl Node { + pub fn peer_id(&self) -> PeerId { + self.peer_id + } + + pub fn listen_addrs(&self) -> &[libp2p::Multiaddr] { + &self.listen_addrs + } + + /// Returns the multiaddresses that this node is listening on, with the peer ID included. + pub fn multiaddrs(&self) -> Vec { + self.listen_addrs + .iter() + .map(|addr| addr.clone().with(Protocol::P2p(self.peer_id))) + .collect() + } + + pub async fn run(self) -> Result<()> { + use libp2p::futures::StreamExt as _; + + let Node { + peer_id: _, + listen_addrs, + mut swarm, + bootnodes, + incoming_message_tx, + mut outgoing_message_rx, + } = self; + + for addr in listen_addrs { + swarm + .listen_on(addr) + .context("swarm failed to listen on multiaddr")?; + } + + let futures = FuturesUnordered::new(); + for bootnode in bootnodes { + futures.push(swarm.dial(bootnode)) + } + let results: Vec<_> = futures.into_iter().collect(); + for result in results { + match result { + Ok(_) => {} + Err(_e) => { + // TODO: log this error + } + } + } + + loop { + tokio::select! { + Some(message) = outgoing_message_rx.recv() => { + match message.message { + OutgoingMessageInner::Request(request) => { + swarm.behaviour_mut().request_response().send_request(&message.peer, request); + } + OutgoingMessageInner::Response((channel, response)) => { + if let Err(_e) = swarm.behaviour_mut().request_response().send_response(channel, response) { + // log error + } + } + } + } + event = swarm.select_next_some() => { + match event { + SwarmEvent::NewListenAddr { + listener_id: _, + address: _, + } => {} + SwarmEvent::ExternalAddrConfirmed { address: _ } => {} + SwarmEvent::Behaviour(event) => event.handle(incoming_message_tx.clone()).await, + _ => continue, + } + }, + } + } + } +} + +pub struct NodeBuilder { + port: Option, + listen_addrs: Vec, + keypair: Option, + agent_version: Option, + protocols: Protocols, + bootnodes: Vec, +} + +impl NodeBuilder { + pub fn new() -> Self { + Self { + port: None, + listen_addrs: Vec::new(), + keypair: None, + agent_version: None, + protocols: Protocols::new(), + bootnodes: Vec::new(), + } + } + + pub fn with_port(mut self, port: u16) -> Self { + self.port = Some(port); + self + } + + pub fn with_listen_addr(mut self, addr: libp2p::Multiaddr) -> Self { + self.listen_addrs.push(addr); + self + } + + pub fn with_keypair(mut self, keypair: identity::Keypair) -> Self { + self.keypair = Some(keypair); + self + } + + pub fn with_agent_version(mut self, agent_version: String) -> Self { + self.agent_version = Some(agent_version); + self + } + + pub fn with_validator_authentication(mut self) -> Self { + self.protocols = self.protocols.with_validator_authentication(); + self + } + + pub fn with_hardware_challenge(mut self) -> Self { + self.protocols = self.protocols.with_hardware_challenge(); + self + } + + pub fn with_invite(mut self) -> Self { + self.protocols = self.protocols.with_invite(); + self + } + + pub fn with_get_task_logs(mut self) -> Self { + self.protocols = self.protocols.with_get_task_logs(); + self + } + + pub fn with_restart(mut self) -> Self { + self.protocols = self.protocols.with_restart(); + self + } + + pub fn with_bootnode(mut self, bootnode: Multiaddr) -> Self { + self.bootnodes.push(bootnode); + self + } + + pub fn with_bootnodes(mut self, bootnodes: I) -> Self + where + I: IntoIterator, + T: Into, + { + for bootnode in bootnodes { + self.bootnodes.push(bootnode.into()); + } + self + } + + pub fn try_build( + self, + ) -> Result<( + Node, + tokio::sync::mpsc::Receiver, + tokio::sync::mpsc::Sender, + )> { + let Self { + port, + mut listen_addrs, + keypair, + agent_version, + protocols, + bootnodes, + } = self; + + let keypair = keypair.unwrap_or(identity::Keypair::generate_ed25519()); + let peer_id = keypair.public().to_peer_id(); + + let transport = create_transport(&keypair)?; + let behaviour = Behaviour::new( + &keypair, + protocols, + agent_version.unwrap_or(DEFAULT_AGENT_VERSION.to_string()), + ) + .context("failed to create behaviour")?; + + let swarm = SwarmBuilder::with_existing_identity(keypair) + .with_tokio() + .with_other_transport(|_| transport)? + .with_behaviour(|_| behaviour)? + .build(); + + if listen_addrs.is_empty() { + let port = port.unwrap_or(0); + let listen_addr = format!("/ip4/0.0.0.0/tcp/{port}") + .parse() + .expect("can parse valid multiaddr"); + listen_addrs.push(listen_addr); + } + + let (incoming_message_tx, incoming_message_rx) = tokio::sync::mpsc::channel(100); + let (outgoing_message_tx, outgoing_message_rx) = tokio::sync::mpsc::channel(100); + + Ok(( + Node { + peer_id, + swarm, + listen_addrs, + bootnodes, + incoming_message_tx, + outgoing_message_rx, + }, + incoming_message_rx, + outgoing_message_tx, + )) + } +} + +fn create_transport( + keypair: &identity::Keypair, +) -> Result> { + let transport = tcp::tokio::Transport::new(tcp::Config::default()) + .upgrade(libp2p::core::upgrade::Version::V1) + .authenticate(noise::Config::new(keypair)?) + .multiplex(yamux::Config::default()) + .timeout(Duration::from_secs(20)) + .boxed(); + + Ok(transport) +} + +#[cfg(test)] +mod test { + use super::NodeBuilder; + use crate::message; + + #[tokio::test] + async fn two_nodes_can_connect() -> anyhow::Result<()> { + let node1 = NodeBuilder::new().with_get_task_logs().try_build().unwrap(); + let (node1, mut incoming_message_rx1, outgoing_message_tx1) = node1; + + let node2 = NodeBuilder::new() + .with_get_task_logs() + .with_bootnodes(node1.multiaddrs()) + .try_build() + .unwrap(); + let (node2, mut incoming_message_rx2, outgoing_message_tx2) = node2; + + // Start both nodes in separate tasks + tokio::spawn(async move { node1.run().await }); + tokio::spawn(async move { node2.run().await }); + + let request = message::Request::GetTaskLogs; + + Ok(()) + } +} diff --git a/crates/p2p/src/message.rs b/crates/p2p/src/message.rs new file mode 100644 index 00000000..99b740db --- /dev/null +++ b/crates/p2p/src/message.rs @@ -0,0 +1,122 @@ +use libp2p::PeerId; +use serde::{Deserialize, Serialize}; +use std::time::SystemTime; + +#[derive(Debug)] +pub struct IncomingMessage { + pub peer: PeerId, + pub message: libp2p::request_response::Message, +} + +#[derive(Debug)] +pub struct OutgoingMessage { + pub peer: PeerId, + pub message: OutgoingMessageInner, +} + +#[derive(Debug)] +pub enum OutgoingMessageInner { + Request(Request), + Response( + ( + libp2p::request_response::ResponseChannel, + Response, + ), + ), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Request { + ValidatorAuthentication(ValidatorAuthenticationRequest), + HardwareChallenge(HardwareChallengeRequest), + Invite(InviteRequest), + GetTaskLogs, + Restart, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum Response { + ValidatorAuthentication(ValidatorAuthenticationResponse), + HardwareChallenge(HardwareChallengeResponse), + Invite(InviteResponse), + GetTaskLogs(GetTaskLogsResponse), + Restart(RestartResponse), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ValidatorAuthenticationRequest { + Initiation(ValidationAuthenticationInitiationRequest), + Solution(ValidationAuthenticationSolutionRequest), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ValidatorAuthenticationResponse { + Initiation(ValidationAuthenticationInitiationResponse), + Solution(ValidationAuthenticationSolutionResponse), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ValidationAuthenticationInitiationRequest { + pub message: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ValidationAuthenticationInitiationResponse { + pub signed_message: String, + pub message: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ValidationAuthenticationSolutionRequest { + pub signed_message: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ValidationAuthenticationSolutionResponse { + Granted, + Rejected, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HardwareChallengeRequest { + pub challenge: String, // TODO + pub timestamp: SystemTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HardwareChallengeResponse { + pub response: String, // TODO + pub timestamp: SystemTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum InviteRequestUrl { + MasterUrl(String), + MasterIpPort(String, u16), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct InviteRequest { + pub invite: String, + pub pool_id: u32, + pub url: InviteRequestUrl, + pub timestamp: u64, + pub expiration: [u8; 32], + pub nonce: [u8; 32], +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum InviteResponse { + Ok, + Error(String), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetTaskLogsResponse { + pub logs: Result, String>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RestartResponse { + pub result: Result<(), String>, +} diff --git a/crates/p2p/src/protocol.rs b/crates/p2p/src/protocol.rs new file mode 100644 index 00000000..5186ac44 --- /dev/null +++ b/crates/p2p/src/protocol.rs @@ -0,0 +1,73 @@ +use libp2p::StreamProtocol; +use std::{collections::HashSet, hash::Hash}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) enum Protocol { + // validator -> worker + ValidatorAuthentication, + // validator -> worker + HardwareChallenge, + // orchestrator -> worker + Invite, + // any -> worker + GetTaskLogs, + // any -> worker + Restart, +} + +impl Protocol { + pub(crate) fn as_stream_protocol(&self) -> StreamProtocol { + match self { + Protocol::ValidatorAuthentication => { + StreamProtocol::new("/prime/validator_authentication/1.0.0") + } + Protocol::HardwareChallenge => StreamProtocol::new("/prime/hardware_challenge/1.0.0"), + Protocol::Invite => StreamProtocol::new("/prime/invite/1.0.0"), + Protocol::GetTaskLogs => StreamProtocol::new("/prime/get_task_logs/1.0.0"), + Protocol::Restart => StreamProtocol::new("/prime/restart/1.0.0"), + } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct Protocols(HashSet); + +impl Protocols { + pub(crate) fn new() -> Self { + Self(HashSet::new()) + } + + pub(crate) fn with_validator_authentication(mut self) -> Self { + self.0.insert(Protocol::ValidatorAuthentication); + self + } + + pub(crate) fn with_hardware_challenge(mut self) -> Self { + self.0.insert(Protocol::HardwareChallenge); + self + } + + pub(crate) fn with_invite(mut self) -> Self { + self.0.insert(Protocol::Invite); + self + } + + pub(crate) fn with_get_task_logs(mut self) -> Self { + self.0.insert(Protocol::GetTaskLogs); + self + } + + pub(crate) fn with_restart(mut self) -> Self { + self.0.insert(Protocol::Restart); + self + } +} + +impl IntoIterator for Protocols { + type Item = Protocol; + type IntoIter = std::collections::hash_set::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} From f5365d9762b69e103e55f60b7f304ef52418c1d3 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Mon, 7 Jul 2025 17:49:08 -0400 Subject: [PATCH 2/9] request-response protocol working --- crates/p2p/src/behaviour.rs | 23 +++++++- crates/p2p/src/lib.rs | 102 +++++++++++++++++++++++++++++------- crates/p2p/src/message.rs | 25 ++++++--- 3 files changed, 121 insertions(+), 29 deletions(-) diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index 6b66394e..cd6606bb 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -151,6 +151,7 @@ impl BehaviourEvent { BehaviourEvent::Ping(_event) => {} BehaviourEvent::RequestResponse(event) => match event { request_response::Event::Message { peer, message } => { + println!("received message from peer {peer:?}: {message:?}"); let _ = message_tx .send(IncomingMessage { peer: peer.clone(), @@ -158,7 +159,27 @@ impl BehaviourEvent { }) .await; } - _ => {} + request_response::Event::ResponseSent { peer, request_id } => { + println!("response sent to peer {peer:?} for request ID {request_id:?}"); + } + request_response::Event::InboundFailure { + peer, + request_id, + error, + } => { + println!( + "inbound failure from peer {peer:?} for request ID {request_id:?}: {error}" + ); + } + request_response::Event::OutboundFailure { + peer, + request_id, + error, + } => { + println!( + "outbound failure to peer {peer:?} for request ID {request_id:?}: {error}" + ); + } }, } } diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index adde16af..777e8689 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -1,7 +1,6 @@ use anyhow::Context; use anyhow::Result; use libp2p::futures::stream::FuturesUnordered; -use libp2p::multiaddr::Protocol; use libp2p::noise; use libp2p::swarm::SwarmEvent; use libp2p::tcp; @@ -17,7 +16,7 @@ mod message; mod protocol; use behaviour::Behaviour; -use message::{IncomingMessage, OutgoingMessage, OutgoingMessageInner}; +use message::{IncomingMessage, OutgoingMessage}; use protocol::Protocols; pub const PRIME_STREAM_PROTOCOL: libp2p::StreamProtocol = @@ -51,7 +50,11 @@ impl Node { pub fn multiaddrs(&self) -> Vec { self.listen_addrs .iter() - .map(|addr| addr.clone().with(Protocol::P2p(self.peer_id))) + .map(|addr| { + addr.clone() + .with_p2p(self.peer_id) + .expect("can add peer ID to multiaddr") + }) .collect() } @@ -81,8 +84,9 @@ impl Node { for result in results { match result { Ok(_) => {} - Err(_e) => { + Err(e) => { // TODO: log this error + println!("failed to dial bootnode: {e:?}"); } } } @@ -90,13 +94,15 @@ impl Node { loop { tokio::select! { Some(message) = outgoing_message_rx.recv() => { - match message.message { - OutgoingMessageInner::Request(request) => { - swarm.behaviour_mut().request_response().send_request(&message.peer, request); + match message { + OutgoingMessage::Request((peer, request)) => { + swarm.behaviour_mut().request_response().send_request(&peer, request); } - OutgoingMessageInner::Response((channel, response)) => { - if let Err(_e) = swarm.behaviour_mut().request_response().send_response(channel, response) { + OutgoingMessage::Response((channel, response)) => { + println!("sending response on channel"); + if let Err(e) = swarm.behaviour_mut().request_response().send_response(channel, response) { // log error + println!("failed to send response: {e:?}"); } } } @@ -105,9 +111,22 @@ impl Node { match event { SwarmEvent::NewListenAddr { listener_id: _, - address: _, - } => {} - SwarmEvent::ExternalAddrConfirmed { address: _ } => {} + address, + } => { + println!("new listen address: {address}"); + } + SwarmEvent::ExternalAddrConfirmed { address } => { + println!("external address confirmed: {address}"); + } + SwarmEvent::ConnectionClosed { + peer_id, + cause, + endpoint: _, + connection_id: _, + num_established: _, + } => { + println!("connection closed with peer {peer_id}: {cause:?}"); + } SwarmEvent::Behaviour(event) => event.handle(incoming_message_tx.clone()).await, _ => continue, } @@ -230,6 +249,9 @@ impl NodeBuilder { .with_tokio() .with_other_transport(|_| transport)? .with_behaviour(|_| behaviour)? + .with_swarm_config(|cfg| { + cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)) // don't disconnect from idle peers + }) .build(); if listen_addrs.is_empty() { @@ -277,23 +299,63 @@ mod test { use crate::message; #[tokio::test] - async fn two_nodes_can_connect() -> anyhow::Result<()> { - let node1 = NodeBuilder::new().with_get_task_logs().try_build().unwrap(); - let (node1, mut incoming_message_rx1, outgoing_message_tx1) = node1; + async fn two_nodes_can_connect_and_do_request_response() { + let (node1, mut incoming_message_rx1, outgoing_message_tx1) = + NodeBuilder::new().with_get_task_logs().try_build().unwrap(); + let node1_peer_id = node1.peer_id(); - let node2 = NodeBuilder::new() + let (node2, mut incoming_message_rx2, outgoing_message_tx2) = NodeBuilder::new() .with_get_task_logs() .with_bootnodes(node1.multiaddrs()) .try_build() .unwrap(); - let (node2, mut incoming_message_rx2, outgoing_message_tx2) = node2; + let node2_peer_id = node2.peer_id(); - // Start both nodes in separate tasks tokio::spawn(async move { node1.run().await }); tokio::spawn(async move { node2.run().await }); - let request = message::Request::GetTaskLogs; + // TODO: implement a way to get peer count + tokio::time::sleep(std::time::Duration::from_secs(2)).await; - Ok(()) + // send request from node1->node2 + let request = message::Request::GetTaskLogs; + outgoing_message_tx1 + .send(request.into_outgoing_message(node2_peer_id)) + .await + .unwrap(); + let message = incoming_message_rx2.recv().await.unwrap(); + assert_eq!(message.peer, node1_peer_id); + let libp2p::request_response::Message::Request { + request_id: _, + request: message::Request::GetTaskLogs, + channel, + } = message.message + else { + panic!("expected a GetTaskLogs request message"); + }; + + println!("received request from node1"); + + // send response from node2->node1 + let response = message::Response::GetTaskLogs(message::GetTaskLogsResponse { + logs: Ok(vec!["log1".to_string(), "log2".to_string()]), + }); + outgoing_message_tx2 + .send(response.into_outgoing_message(channel)) + .await + .unwrap(); + let message = incoming_message_rx1.recv().await.unwrap(); + assert_eq!(message.peer, node2_peer_id); + let libp2p::request_response::Message::Response { + request_id: _, + response: message::Response::GetTaskLogs(response), + } = message.message + else { + panic!("expected a GetTaskLogs response message"); + }; + assert_eq!( + response.logs, + Ok(vec!["log1".to_string(), "log2".to_string()]) + ); } } diff --git a/crates/p2p/src/message.rs b/crates/p2p/src/message.rs index 99b740db..97c07dff 100644 --- a/crates/p2p/src/message.rs +++ b/crates/p2p/src/message.rs @@ -9,14 +9,8 @@ pub struct IncomingMessage { } #[derive(Debug)] -pub struct OutgoingMessage { - pub peer: PeerId, - pub message: OutgoingMessageInner, -} - -#[derive(Debug)] -pub enum OutgoingMessageInner { - Request(Request), +pub enum OutgoingMessage { + Request((PeerId, Request)), Response( ( libp2p::request_response::ResponseChannel, @@ -34,6 +28,12 @@ pub enum Request { Restart, } +impl Request { + pub fn into_outgoing_message(self, peer: PeerId) -> OutgoingMessage { + OutgoingMessage::Request((peer, Request::from(self))) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Response { ValidatorAuthentication(ValidatorAuthenticationResponse), @@ -43,6 +43,15 @@ pub enum Response { Restart(RestartResponse), } +impl Response { + pub fn into_outgoing_message( + self, + channel: libp2p::request_response::ResponseChannel, + ) -> OutgoingMessage { + OutgoingMessage::Response((channel, Response::from(self))) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ValidatorAuthenticationRequest { Initiation(ValidationAuthenticationInitiationRequest), From 565ed95e32bd9e66208823ae629497be13063bb8 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Mon, 7 Jul 2025 18:02:46 -0400 Subject: [PATCH 3/9] clippy --- crates/p2p/src/behaviour.rs | 8 ++------ crates/p2p/src/lib.rs | 6 ++++++ crates/p2p/src/message.rs | 20 ++++++++++---------- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index cd6606bb..54f264dd 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -152,12 +152,8 @@ impl BehaviourEvent { BehaviourEvent::RequestResponse(event) => match event { request_response::Event::Message { peer, message } => { println!("received message from peer {peer:?}: {message:?}"); - let _ = message_tx - .send(IncomingMessage { - peer: peer.clone(), - message, - }) - .await; + // if this errors, user dropped their incoming message channel + let _ = message_tx.send(IncomingMessage { peer, message }).await; } request_response::Event::ResponseSent { peer, request_id } => { println!("response sent to peer {peer:?} for request ID {request_id:?}"); diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 777e8689..9f07d8d0 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -145,6 +145,12 @@ pub struct NodeBuilder { bootnodes: Vec, } +impl Default for NodeBuilder { + fn default() -> Self { + Self::new() + } +} + impl NodeBuilder { pub fn new() -> Self { Self { diff --git a/crates/p2p/src/message.rs b/crates/p2p/src/message.rs index 97c07dff..54d757c1 100644 --- a/crates/p2p/src/message.rs +++ b/crates/p2p/src/message.rs @@ -30,7 +30,7 @@ pub enum Request { impl Request { pub fn into_outgoing_message(self, peer: PeerId) -> OutgoingMessage { - OutgoingMessage::Request((peer, Request::from(self))) + OutgoingMessage::Request((peer, self)) } } @@ -48,40 +48,40 @@ impl Response { self, channel: libp2p::request_response::ResponseChannel, ) -> OutgoingMessage { - OutgoingMessage::Response((channel, Response::from(self))) + OutgoingMessage::Response((channel, self)) } } #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ValidatorAuthenticationRequest { - Initiation(ValidationAuthenticationInitiationRequest), - Solution(ValidationAuthenticationSolutionRequest), + Initiation(ValidatorAuthenticationInitiationRequest), + Solution(ValidatorAuthenticationSolutionRequest), } #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ValidatorAuthenticationResponse { - Initiation(ValidationAuthenticationInitiationResponse), - Solution(ValidationAuthenticationSolutionResponse), + Initiation(ValidatorAuthenticationInitiationResponse), + Solution(ValidatorAuthenticationSolutionResponse), } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ValidationAuthenticationInitiationRequest { +pub struct ValidatorAuthenticationInitiationRequest { pub message: String, } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ValidationAuthenticationInitiationResponse { +pub struct ValidatorAuthenticationInitiationResponse { pub signed_message: String, pub message: String, } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ValidationAuthenticationSolutionRequest { +pub struct ValidatorAuthenticationSolutionRequest { pub signed_message: String, } #[derive(Debug, Clone, Serialize, Deserialize)] -pub enum ValidationAuthenticationSolutionResponse { +pub enum ValidatorAuthenticationSolutionResponse { Granted, Rejected, } From a5321211a3919879e4467f07a306a4ad661060c9 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Mon, 7 Jul 2025 18:05:19 -0400 Subject: [PATCH 4/9] clippy --- crates/p2p/src/behaviour.rs | 1 + crates/p2p/src/message.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index 54f264dd..e2737d57 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -37,6 +37,7 @@ pub(crate) struct Behaviour { autonat: autonat::Behaviour, } +#[allow(clippy::large_enum_variant)] #[derive(Debug)] pub(crate) enum BehaviourEvent { Autonat(autonat::Event), diff --git a/crates/p2p/src/message.rs b/crates/p2p/src/message.rs index 54d757c1..9013a8ca 100644 --- a/crates/p2p/src/message.rs +++ b/crates/p2p/src/message.rs @@ -8,6 +8,7 @@ pub struct IncomingMessage { pub message: libp2p::request_response::Message, } +#[allow(clippy::large_enum_variant)] #[derive(Debug)] pub enum OutgoingMessage { Request((PeerId, Request)), From 7bd100916bf65ca251cea390b38e954d67966de0 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Wed, 9 Jul 2025 12:05:29 -0400 Subject: [PATCH 5/9] add full hardware challenge message --- Cargo.lock | 2 + crates/p2p/Cargo.toml | 2 + crates/p2p/src/challenge_message.rs | 89 +++++++++++++++++++++++++++++ crates/p2p/src/lib.rs | 45 +++++++++++---- crates/p2p/src/message.rs | 57 +++++++++++++++--- 5 files changed, 177 insertions(+), 18 deletions(-) create mode 100644 crates/p2p/src/challenge_message.rs diff --git a/Cargo.lock b/Cargo.lock index a64a46e4..ae652ad4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6263,8 +6263,10 @@ version = "0.3.11" dependencies = [ "anyhow", "libp2p", + "nalgebra", "serde", "tokio", + "tokio-util", "void", ] diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml index 2d5d94ff..ba52d570 100644 --- a/crates/p2p/Cargo.toml +++ b/crates/p2p/Cargo.toml @@ -8,8 +8,10 @@ libp2p = { version = "0.54", features = ["request-response", "identify", "ping", void = "1.0" anyhow = {workspace = true} +nalgebra = {workspace = true} serde = {workspace = true} tokio = {workspace = true, features = ["sync"]} +tokio-util = { workspace = true, features = ["rt"] } [lints] workspace = true diff --git a/crates/p2p/src/challenge_message.rs b/crates/p2p/src/challenge_message.rs new file mode 100644 index 00000000..639cc602 --- /dev/null +++ b/crates/p2p/src/challenge_message.rs @@ -0,0 +1,89 @@ +use nalgebra::DMatrix; +use serde::{ + de::{self, Visitor}, + Deserialize, Deserializer, Serialize, Serializer, +}; +use std::fmt; + +#[derive(Debug, Clone)] +pub struct FixedF64(pub f64); + +impl Serialize for FixedF64 { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + // adjust precision as needed + serializer.serialize_str(&format!("{:.12}", self.0)) + } +} + +impl<'de> Deserialize<'de> for FixedF64 { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct FixedF64Visitor; + + impl Visitor<'_> for FixedF64Visitor { + type Value = FixedF64; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a string representing a fixed precision float") + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + value + .parse::() + .map(FixedF64) + .map_err(|_| E::custom(format!("invalid f64: {value}"))) + } + } + + deserializer.deserialize_str(FixedF64Visitor) + } +} + +impl PartialEq for FixedF64 { + fn eq(&self, other: &Self) -> bool { + format!("{:.10}", self.0) == format!("{:.10}", other.0) + } +} + +#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] +pub struct ChallengeRequest { + pub rows_a: usize, + pub cols_a: usize, + pub data_a: Vec, + pub rows_b: usize, + pub cols_b: usize, + pub data_b: Vec, + pub timestamp: Option, +} + +#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] +pub struct ChallengeResponse { + pub result: Vec, + pub rows: usize, + pub cols: usize, +} + +pub fn calc_matrix(req: &ChallengeRequest) -> ChallengeResponse { + // convert FixedF64 to f64 + let data_a: Vec = req.data_a.iter().map(|x| x.0).collect(); + let data_b: Vec = req.data_b.iter().map(|x| x.0).collect(); + let a = DMatrix::from_vec(req.rows_a, req.cols_a, data_a); + let b = DMatrix::from_vec(req.rows_b, req.cols_b, data_b); + let c = a * b; + + let data_c: Vec = c.iter().map(|x| FixedF64(*x)).collect(); + + ChallengeResponse { + rows: c.nrows(), + cols: c.ncols(), + result: data_c, + } +} diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 9f07d8d0..46105a36 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -5,20 +5,28 @@ use libp2p::noise; use libp2p::swarm::SwarmEvent; use libp2p::tcp; use libp2p::yamux; -use libp2p::Multiaddr; use libp2p::Swarm; use libp2p::SwarmBuilder; -use libp2p::{identity, PeerId, Transport}; +use libp2p::{identity, Transport}; use std::time::Duration; mod behaviour; +mod challenge_message; mod message; mod protocol; use behaviour::Behaviour; -use message::{IncomingMessage, OutgoingMessage}; use protocol::Protocols; +// TODO: put these in a mod +pub use challenge_message::*; +pub use message::*; + +pub type Libp2pIncomingMessage = libp2p::request_response::Message; +pub type ResponseChannel = libp2p::request_response::ResponseChannel; +pub type PeerId = libp2p::PeerId; +pub type Multiaddr = libp2p::Multiaddr; + pub const PRIME_STREAM_PROTOCOL: libp2p::StreamProtocol = libp2p::StreamProtocol::new("/prime/1.0.0"); // TODO: force this to be passed by the user @@ -29,6 +37,7 @@ pub struct Node { listen_addrs: Vec, swarm: Swarm, bootnodes: Vec, + cancellation_token: tokio_util::sync::CancellationToken, // channel for sending incoming messages to the consumer of this library incoming_message_tx: tokio::sync::mpsc::Sender, @@ -66,6 +75,7 @@ impl Node { listen_addrs, mut swarm, bootnodes, + cancellation_token, incoming_message_tx, mut outgoing_message_rx, } = self; @@ -93,6 +103,10 @@ impl Node { loop { tokio::select! { + _ = cancellation_token.cancelled() => { + println!("cancellation token triggered, shutting down node"); + break Ok(()); + } Some(message) = outgoing_message_rx.recv() => { match message { OutgoingMessage::Request((peer, request)) => { @@ -143,6 +157,7 @@ pub struct NodeBuilder { agent_version: Option, protocols: Protocols, bootnodes: Vec, + cancellation_token: Option, } impl Default for NodeBuilder { @@ -160,6 +175,7 @@ impl NodeBuilder { agent_version: None, protocols: Protocols::new(), bootnodes: Vec::new(), + cancellation_token: None, } } @@ -224,6 +240,14 @@ impl NodeBuilder { self } + pub fn with_cancellation_token( + mut self, + cancellation_token: tokio_util::sync::CancellationToken, + ) -> Self { + self.cancellation_token = Some(cancellation_token); + self + } + pub fn try_build( self, ) -> Result<( @@ -238,6 +262,7 @@ impl NodeBuilder { agent_version, protocols, bootnodes, + cancellation_token, } = self; let keypair = keypair.unwrap_or(identity::Keypair::generate_ed25519()); @@ -279,6 +304,7 @@ impl NodeBuilder { bootnodes, incoming_message_tx, outgoing_message_rx, + cancellation_token: cancellation_token.unwrap_or_default(), }, incoming_message_rx, outgoing_message_tx, @@ -343,9 +369,8 @@ mod test { println!("received request from node1"); // send response from node2->node1 - let response = message::Response::GetTaskLogs(message::GetTaskLogsResponse { - logs: Ok(vec!["log1".to_string(), "log2".to_string()]), - }); + let response = + message::Response::GetTaskLogs(message::GetTaskLogsResponse::Ok("logs".to_string())); outgoing_message_tx2 .send(response.into_outgoing_message(channel)) .await @@ -359,9 +384,9 @@ mod test { else { panic!("expected a GetTaskLogs response message"); }; - assert_eq!( - response.logs, - Ok(vec!["log1".to_string(), "log2".to_string()]) - ); + let message::GetTaskLogsResponse::Ok(logs) = response else { + panic!("expected a successful GetTaskLogs response"); + }; + assert_eq!(logs, "logs"); } } diff --git a/crates/p2p/src/message.rs b/crates/p2p/src/message.rs index 9013a8ca..c0fd2d66 100644 --- a/crates/p2p/src/message.rs +++ b/crates/p2p/src/message.rs @@ -2,6 +2,9 @@ use libp2p::PeerId; use serde::{Deserialize, Serialize}; use std::time::SystemTime; +use crate::ChallengeRequest; +use crate::ChallengeResponse; + #[derive(Debug)] pub struct IncomingMessage { pub peer: PeerId, @@ -72,13 +75,19 @@ pub struct ValidatorAuthenticationInitiationRequest { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ValidatorAuthenticationInitiationResponse { - pub signed_message: String, + pub signature: String, pub message: String, } +impl From for Response { + fn from(response: ValidatorAuthenticationInitiationResponse) -> Self { + Response::ValidatorAuthentication(ValidatorAuthenticationResponse::Initiation(response)) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ValidatorAuthenticationSolutionRequest { - pub signed_message: String, + pub signature: String, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -87,18 +96,30 @@ pub enum ValidatorAuthenticationSolutionResponse { Rejected, } +impl From for Response { + fn from(response: ValidatorAuthenticationSolutionResponse) -> Self { + Response::ValidatorAuthentication(ValidatorAuthenticationResponse::Solution(response)) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct HardwareChallengeRequest { - pub challenge: String, // TODO + pub challenge: ChallengeRequest, pub timestamp: SystemTime, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct HardwareChallengeResponse { - pub response: String, // TODO + pub response: ChallengeResponse, pub timestamp: SystemTime, } +impl From for Response { + fn from(response: HardwareChallengeResponse) -> Self { + Response::HardwareChallenge(response) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum InviteRequestUrl { MasterUrl(String), @@ -121,12 +142,32 @@ pub enum InviteResponse { Error(String), } +impl From for Response { + fn from(response: InviteResponse) -> Self { + Response::Invite(response) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct GetTaskLogsResponse { - pub logs: Result, String>, +pub enum GetTaskLogsResponse { + Ok(String), + Error(String), +} + +impl From for Response { + fn from(response: GetTaskLogsResponse) -> Self { + Response::GetTaskLogs(response) + } } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RestartResponse { - pub result: Result<(), String>, +pub enum RestartResponse { + Ok, + Error(String), +} + +impl From for Response { + fn from(response: RestartResponse) -> Self { + Response::Restart(response) + } } From d6c1a4af1d0dd7a3d236ab23d032f78ec336ef09 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Wed, 9 Jul 2025 12:07:13 -0400 Subject: [PATCH 6/9] move messages to their own dir --- crates/p2p/src/lib.rs | 3 --- .../{challenge_message.rs => message/hardware_challenge.rs} | 0 crates/p2p/src/{message.rs => message/mod.rs} | 5 +++-- 3 files changed, 3 insertions(+), 5 deletions(-) rename crates/p2p/src/{challenge_message.rs => message/hardware_challenge.rs} (100%) rename crates/p2p/src/{message.rs => message/mod.rs} (98%) diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 46105a36..208fb597 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -11,15 +11,12 @@ use libp2p::{identity, Transport}; use std::time::Duration; mod behaviour; -mod challenge_message; mod message; mod protocol; use behaviour::Behaviour; use protocol::Protocols; -// TODO: put these in a mod -pub use challenge_message::*; pub use message::*; pub type Libp2pIncomingMessage = libp2p::request_response::Message; diff --git a/crates/p2p/src/challenge_message.rs b/crates/p2p/src/message/hardware_challenge.rs similarity index 100% rename from crates/p2p/src/challenge_message.rs rename to crates/p2p/src/message/hardware_challenge.rs diff --git a/crates/p2p/src/message.rs b/crates/p2p/src/message/mod.rs similarity index 98% rename from crates/p2p/src/message.rs rename to crates/p2p/src/message/mod.rs index c0fd2d66..64486533 100644 --- a/crates/p2p/src/message.rs +++ b/crates/p2p/src/message/mod.rs @@ -2,8 +2,9 @@ use libp2p::PeerId; use serde::{Deserialize, Serialize}; use std::time::SystemTime; -use crate::ChallengeRequest; -use crate::ChallengeResponse; +mod hardware_challenge; + +pub use hardware_challenge::*; #[derive(Debug)] pub struct IncomingMessage { From ea46820b8a5dae8878e32726e80a65b8fee66911 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Wed, 9 Jul 2025 12:16:19 -0400 Subject: [PATCH 7/9] add general request-response protocol --- crates/p2p/src/lib.rs | 5 +++ crates/p2p/src/message/mod.rs | 62 ++++++++++++++++++++++++++++++++++- crates/p2p/src/protocol.rs | 8 +++++ 3 files changed, 74 insertions(+), 1 deletion(-) diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 208fb597..6e2efca3 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -221,6 +221,11 @@ impl NodeBuilder { self } + pub fn with_general(mut self) -> Self { + self.protocols = self.protocols.with_general(); + self + } + pub fn with_bootnode(mut self, bootnode: Multiaddr) -> Self { self.bootnodes.push(bootnode); self diff --git a/crates/p2p/src/message/mod.rs b/crates/p2p/src/message/mod.rs index 64486533..adff99ac 100644 --- a/crates/p2p/src/message/mod.rs +++ b/crates/p2p/src/message/mod.rs @@ -4,7 +4,7 @@ use std::time::SystemTime; mod hardware_challenge; -pub use hardware_challenge::*; +pub use hardware_challenge::*; #[derive(Debug)] pub struct IncomingMessage { @@ -31,6 +31,7 @@ pub enum Request { Invite(InviteRequest), GetTaskLogs, Restart, + General(GeneralRequest), } impl Request { @@ -46,6 +47,7 @@ pub enum Response { Invite(InviteResponse), GetTaskLogs(GetTaskLogsResponse), Restart(RestartResponse), + General(GeneralResponse), } impl Response { @@ -63,17 +65,35 @@ pub enum ValidatorAuthenticationRequest { Solution(ValidatorAuthenticationSolutionRequest), } +impl From for Request { + fn from(request: ValidatorAuthenticationRequest) -> Self { + Request::ValidatorAuthentication(request) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ValidatorAuthenticationResponse { Initiation(ValidatorAuthenticationInitiationResponse), Solution(ValidatorAuthenticationSolutionResponse), } +impl From for Response { + fn from(response: ValidatorAuthenticationResponse) -> Self { + Response::ValidatorAuthentication(response) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ValidatorAuthenticationInitiationRequest { pub message: String, } +impl From for Request { + fn from(request: ValidatorAuthenticationInitiationRequest) -> Self { + Request::ValidatorAuthentication(ValidatorAuthenticationRequest::Initiation(request)) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ValidatorAuthenticationInitiationResponse { pub signature: String, @@ -91,6 +111,12 @@ pub struct ValidatorAuthenticationSolutionRequest { pub signature: String, } +impl From for Request { + fn from(request: ValidatorAuthenticationSolutionRequest) -> Self { + Request::ValidatorAuthentication(ValidatorAuthenticationRequest::Solution(request)) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum ValidatorAuthenticationSolutionResponse { Granted, @@ -109,6 +135,12 @@ pub struct HardwareChallengeRequest { pub timestamp: SystemTime, } +impl From for Request { + fn from(request: HardwareChallengeRequest) -> Self { + Request::HardwareChallenge(request) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct HardwareChallengeResponse { pub response: ChallengeResponse, @@ -137,6 +169,12 @@ pub struct InviteRequest { pub nonce: [u8; 32], } +impl From for Request { + fn from(request: InviteRequest) -> Self { + Request::Invite(request) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub enum InviteResponse { Ok, @@ -172,3 +210,25 @@ impl From for Response { Response::Restart(response) } } + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GeneralRequest { + data: Vec, +} + +impl From for Request { + fn from(request: GeneralRequest) -> Self { + Request::General(request) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GeneralResponse { + data: Vec, +} + +impl From for Response { + fn from(response: GeneralResponse) -> Self { + Response::General(response) + } +} diff --git a/crates/p2p/src/protocol.rs b/crates/p2p/src/protocol.rs index 5186ac44..df423ef8 100644 --- a/crates/p2p/src/protocol.rs +++ b/crates/p2p/src/protocol.rs @@ -13,6 +13,8 @@ pub(crate) enum Protocol { GetTaskLogs, // any -> worker Restart, + // any -> any + General, } impl Protocol { @@ -25,6 +27,7 @@ impl Protocol { Protocol::Invite => StreamProtocol::new("/prime/invite/1.0.0"), Protocol::GetTaskLogs => StreamProtocol::new("/prime/get_task_logs/1.0.0"), Protocol::Restart => StreamProtocol::new("/prime/restart/1.0.0"), + Protocol::General => StreamProtocol::new("/prime/general/1.0.0"), } } } @@ -61,6 +64,11 @@ impl Protocols { self.0.insert(Protocol::Restart); self } + + pub(crate) fn with_general(mut self) -> Self { + self.0.insert(Protocol::General); + self + } } impl IntoIterator for Protocols { From 94e9e4d462aa021f29a498b6b0c88e927857cab3 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Thu, 10 Jul 2025 12:08:18 -0400 Subject: [PATCH 8/9] implement dialing peers --- Cargo.toml | 2 ++ crates/p2p/src/lib.rs | 40 +++++++++++++++++++++++++++++----------- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4279f156..d4ca7ab8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,8 @@ resolver = "2" [workspace.dependencies] shared = { path = "crates/shared" } +p2p = { path = "crates/p2p" } + actix-web = "4.9.0" clap = { version = "4.5.27", features = ["derive"] } serde = { version = "1.0.219", features = ["derive"] } diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 6e2efca3..4a2b176e 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -1,6 +1,5 @@ use anyhow::Context; use anyhow::Result; -use libp2p::futures::stream::FuturesUnordered; use libp2p::noise; use libp2p::swarm::SwarmEvent; use libp2p::tcp; @@ -23,6 +22,9 @@ pub type Libp2pIncomingMessage = libp2p::request_response::Message; pub type PeerId = libp2p::PeerId; pub type Multiaddr = libp2p::Multiaddr; +pub type Keypair = libp2p::identity::Keypair; +pub type DialSender = + tokio::sync::mpsc::Sender<(Vec, tokio::sync::oneshot::Sender>)>; pub const PRIME_STREAM_PROTOCOL: libp2p::StreamProtocol = libp2p::StreamProtocol::new("/prime/1.0.0"); @@ -36,6 +38,9 @@ pub struct Node { bootnodes: Vec, cancellation_token: tokio_util::sync::CancellationToken, + dial_rx: + tokio::sync::mpsc::Receiver<(Vec, tokio::sync::oneshot::Sender>)>, + // channel for sending incoming messages to the consumer of this library incoming_message_tx: tokio::sync::mpsc::Sender, @@ -73,6 +78,7 @@ impl Node { mut swarm, bootnodes, cancellation_token, + mut dial_rx, incoming_message_tx, mut outgoing_message_rx, } = self; @@ -83,17 +89,12 @@ impl Node { .context("swarm failed to listen on multiaddr")?; } - let futures = FuturesUnordered::new(); for bootnode in bootnodes { - futures.push(swarm.dial(bootnode)) - } - let results: Vec<_> = futures.into_iter().collect(); - for result in results { - match result { + match swarm.dial(bootnode.clone()) { Ok(_) => {} Err(e) => { - // TODO: log this error - println!("failed to dial bootnode: {e:?}"); + // log error + println!("failed to dial bootnode {bootnode}: {e:?}"); } } } @@ -104,6 +105,19 @@ impl Node { println!("cancellation token triggered, shutting down node"); break Ok(()); } + Some((addrs, res_tx)) = dial_rx.recv() => { + let mut res = Ok(()); + for addr in addrs { + match swarm.dial(addr.clone()) { + Ok(_) => {} + Err(e) => { + res = Err(anyhow::anyhow!("failed to dial {addr}: {e:?}")); + break; + } + } + } + let _ = res_tx.send(res); + } Some(message) = outgoing_message_rx.recv() => { match message { OutgoingMessage::Request((peer, request)) => { @@ -254,6 +268,7 @@ impl NodeBuilder { self, ) -> Result<( Node, + DialSender, tokio::sync::mpsc::Receiver, tokio::sync::mpsc::Sender, )> { @@ -295,6 +310,7 @@ impl NodeBuilder { listen_addrs.push(listen_addr); } + let (dial_tx, dial_rx) = tokio::sync::mpsc::channel(100); let (incoming_message_tx, incoming_message_rx) = tokio::sync::mpsc::channel(100); let (outgoing_message_tx, outgoing_message_rx) = tokio::sync::mpsc::channel(100); @@ -304,10 +320,12 @@ impl NodeBuilder { swarm, listen_addrs, bootnodes, + dial_rx, incoming_message_tx, outgoing_message_rx, cancellation_token: cancellation_token.unwrap_or_default(), }, + dial_tx, incoming_message_rx, outgoing_message_tx, )) @@ -334,11 +352,11 @@ mod test { #[tokio::test] async fn two_nodes_can_connect_and_do_request_response() { - let (node1, mut incoming_message_rx1, outgoing_message_tx1) = + let (node1, _, mut incoming_message_rx1, outgoing_message_tx1) = NodeBuilder::new().with_get_task_logs().try_build().unwrap(); let node1_peer_id = node1.peer_id(); - let (node2, mut incoming_message_rx2, outgoing_message_tx2) = NodeBuilder::new() + let (node2, _, mut incoming_message_rx2, outgoing_message_tx2) = NodeBuilder::new() .with_get_task_logs() .with_bootnodes(node1.multiaddrs()) .try_build() From a8af70634b017787edbefc5d0577a93dcfd0d539 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Thu, 10 Jul 2025 16:50:45 -0400 Subject: [PATCH 9/9] use tracing --- Cargo.lock | 1 + Cargo.toml | 1 + crates/p2p/Cargo.toml | 1 + crates/p2p/src/behaviour.rs | 9 +++++---- crates/p2p/src/lib.rs | 18 +++++++----------- crates/worker/Cargo.toml | 2 +- 6 files changed, 16 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ae652ad4..c16f0570 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6267,6 +6267,7 @@ dependencies = [ "serde", "tokio", "tokio-util", + "tracing", "void", ] diff --git a/Cargo.toml b/Cargo.toml index d4ca7ab8..1bc9e2ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ rand_core_v6 = { package = "rand_core", version = "0.6.4", features = ["std"] } ipld-core = "0.4" rust-ipfs = "0.14" cid = "0.11" +tracing = "0.1.41" [workspace.package] version = "0.3.11" diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml index ba52d570..bb670107 100644 --- a/crates/p2p/Cargo.toml +++ b/crates/p2p/Cargo.toml @@ -12,6 +12,7 @@ nalgebra = {workspace = true} serde = {workspace = true} tokio = {workspace = true, features = ["sync"]} tokio-util = { workspace = true, features = ["rt"] } +tracing = { workspace = true } [lints] workspace = true diff --git a/crates/p2p/src/behaviour.rs b/crates/p2p/src/behaviour.rs index e2737d57..b114b61e 100644 --- a/crates/p2p/src/behaviour.rs +++ b/crates/p2p/src/behaviour.rs @@ -12,6 +12,7 @@ use libp2p::ping; use libp2p::request_response; use libp2p::swarm::NetworkBehaviour; use std::time::Duration; +use tracing::debug; use crate::message::IncomingMessage; use crate::message::{Request, Response}; @@ -152,19 +153,19 @@ impl BehaviourEvent { BehaviourEvent::Ping(_event) => {} BehaviourEvent::RequestResponse(event) => match event { request_response::Event::Message { peer, message } => { - println!("received message from peer {peer:?}: {message:?}"); + debug!("received message from peer {peer:?}: {message:?}"); // if this errors, user dropped their incoming message channel let _ = message_tx.send(IncomingMessage { peer, message }).await; } request_response::Event::ResponseSent { peer, request_id } => { - println!("response sent to peer {peer:?} for request ID {request_id:?}"); + debug!("response sent to peer {peer:?} for request ID {request_id:?}"); } request_response::Event::InboundFailure { peer, request_id, error, } => { - println!( + debug!( "inbound failure from peer {peer:?} for request ID {request_id:?}: {error}" ); } @@ -173,7 +174,7 @@ impl BehaviourEvent { request_id, error, } => { - println!( + debug!( "outbound failure to peer {peer:?} for request ID {request_id:?}: {error}" ); } diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 4a2b176e..0a5637a9 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -8,6 +8,7 @@ use libp2p::Swarm; use libp2p::SwarmBuilder; use libp2p::{identity, Transport}; use std::time::Duration; +use tracing::debug; mod behaviour; mod message; @@ -93,8 +94,7 @@ impl Node { match swarm.dial(bootnode.clone()) { Ok(_) => {} Err(e) => { - // log error - println!("failed to dial bootnode {bootnode}: {e:?}"); + debug!("failed to dial bootnode {bootnode}: {e:?}"); } } } @@ -102,7 +102,7 @@ impl Node { loop { tokio::select! { _ = cancellation_token.cancelled() => { - println!("cancellation token triggered, shutting down node"); + debug!("cancellation token triggered, shutting down node"); break Ok(()); } Some((addrs, res_tx)) = dial_rx.recv() => { @@ -124,10 +124,8 @@ impl Node { swarm.behaviour_mut().request_response().send_request(&peer, request); } OutgoingMessage::Response((channel, response)) => { - println!("sending response on channel"); if let Err(e) = swarm.behaviour_mut().request_response().send_response(channel, response) { - // log error - println!("failed to send response: {e:?}"); + debug!("failed to send response: {e:?}"); } } } @@ -138,10 +136,10 @@ impl Node { listener_id: _, address, } => { - println!("new listen address: {address}"); + debug!("new listen address: {address}"); } SwarmEvent::ExternalAddrConfirmed { address } => { - println!("external address confirmed: {address}"); + debug!("external address confirmed: {address}"); } SwarmEvent::ConnectionClosed { peer_id, @@ -150,7 +148,7 @@ impl Node { connection_id: _, num_established: _, } => { - println!("connection closed with peer {peer_id}: {cause:?}"); + debug!("connection closed with peer {peer_id}: {cause:?}"); } SwarmEvent::Behaviour(event) => event.handle(incoming_message_tx.clone()).await, _ => continue, @@ -386,8 +384,6 @@ mod test { panic!("expected a GetTaskLogs request message"); }; - println!("received request from node1"); - // send response from node2->node1 let response = message::Response::GetTaskLogs(message::GetTaskLogsResponse::Ok("logs".to_string())); diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 18596ba5..0f08e404 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -50,7 +50,7 @@ unicode-width = "0.2.0" rand = "0.9.0" tempfile = "3.14.0" tracing-loki = "0.2.6" -tracing = "0.1.41" +tracing = { workspace = true } tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } tracing-log = "0.2.0" time = "0.3.41"