diff --git a/app/app.go b/app/app.go index a85334ab45..227885c9c0 100644 --- a/app/app.go +++ b/app/app.go @@ -404,7 +404,6 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config, life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PPing, p2p.NewPingService(p2pNode, peerIDs, conf.TestConfig.TestPingConfig)) life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PEventCollector, p2p.NewEventCollector(p2pNode)) life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PRouters, p2p.NewRelayRouter(p2pNode, peerIDs, relays)) - life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartForceDirectConns, p2p.ForceDirectConnections(p2pNode, peerIDs)) life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartForceQUICConns, p2p.UpgradeToQUICConnections(p2pNode, peerIDs)) return p2pNode, nil diff --git a/app/lifecycle/order.go b/app/lifecycle/order.go index 4969432e7a..2cc2fbb53d 100644 --- a/app/lifecycle/order.go +++ b/app/lifecycle/order.go @@ -23,7 +23,6 @@ const ( StartValidatorAPI StartP2PPing StartP2PRouters - StartForceDirectConns StartForceQUICConns StartP2PConsensus StartSimulator diff --git a/app/lifecycle/orderstart_string.go b/app/lifecycle/orderstart_string.go index 8f402f0b10..42031b8a68 100644 --- a/app/lifecycle/orderstart_string.go +++ b/app/lifecycle/orderstart_string.go @@ -20,26 +20,24 @@ func _() { _ = x[StartValidatorAPI-7] _ = x[StartP2PPing-8] _ = x[StartP2PRouters-9] - _ = x[StartForceDirectConns-10] - _ = x[StartForceQUICConns-11] - _ = x[StartP2PConsensus-12] - _ = x[StartSimulator-13] - _ = x[StartScheduler-14] - _ = x[StartBuilderRegWatcher-15] - _ = x[StartP2PEventCollector-16] - _ = x[StartPeerInfo-17] - _ = x[StartParSigDB-18] - _ = x[StartStackSnipe-19] + _ = x[StartForceQUICConns-10] + _ = x[StartP2PConsensus-11] + _ = x[StartSimulator-12] + _ = x[StartScheduler-13] + _ = x[StartBuilderRegWatcher-14] + _ = x[StartP2PEventCollector-15] + _ = x[StartPeerInfo-16] + _ = x[StartParSigDB-17] + _ = x[StartStackSnipe-18] } -const _OrderStart_name = "TrackerPrivkeyLockEth1ClientAggSigDBRelayMonitoringAPIDebugAPIValidatorAPIP2PPingP2PRoutersForceDirectConnsForceQUICConnsP2PConsensusSimulatorSchedulerBuilderRegWatcherP2PEventCollectorPeerInfoParSigDBStackSnipe" +const _OrderStart_name = "TrackerPrivkeyLockEth1ClientAggSigDBRelayMonitoringAPIDebugAPIValidatorAPIP2PPingP2PRoutersForceQUICConnsP2PConsensusSimulatorSchedulerBuilderRegWatcherP2PEventCollectorPeerInfoParSigDBStackSnipe" -var _OrderStart_index = [...]uint8{0, 7, 18, 28, 36, 41, 54, 62, 74, 81, 91, 107, 121, 133, 142, 151, 168, 185, 193, 201, 211} +var _OrderStart_index = [...]uint8{0, 7, 18, 28, 36, 41, 54, 62, 74, 81, 91, 105, 117, 126, 135, 152, 169, 177, 185, 195} func (i OrderStart) String() string { - idx := int(i) - 0 - if i < 0 || idx >= len(_OrderStart_index)-1 { + if i < 0 || i >= OrderStart(len(_OrderStart_index)-1) { return "OrderStart(" + strconv.FormatInt(int64(i), 10) + ")" } - return _OrderStart_name[_OrderStart_index[idx]:_OrderStart_index[idx+1]] + return _OrderStart_name[_OrderStart_index[i]:_OrderStart_index[i+1]] } diff --git a/app/lifecycle/orderstop_string.go b/app/lifecycle/orderstop_string.go index 97b41a26fa..6cf1f1c103 100644 --- a/app/lifecycle/orderstop_string.go +++ b/app/lifecycle/orderstop_string.go @@ -28,9 +28,8 @@ const _OrderStop_name = "SchedulerPrivkeyLockRetryerDutyDBBeaconMockValidatorAPI var _OrderStop_index = [...]uint8{0, 9, 20, 27, 33, 43, 55, 62, 71, 78, 86, 99} func (i OrderStop) String() string { - idx := int(i) - 0 - if i < 0 || idx >= len(_OrderStop_index)-1 { + if i < 0 || i >= OrderStop(len(_OrderStop_index)-1) { return "OrderStop(" + strconv.FormatInt(int64(i), 10) + ")" } - return _OrderStop_name[_OrderStop_index[idx]:_OrderStop_index[idx+1]] + return _OrderStop_name[_OrderStop_index[i]:_OrderStop_index[i+1]] } diff --git a/cmd/testpeers_internal_test.go b/cmd/testpeers_internal_test.go index b388a6b9e5..25c14e035d 100644 --- a/cmd/testpeers_internal_test.go +++ b/cmd/testpeers_internal_test.go @@ -576,6 +576,21 @@ func startPeer(t *testing.T, ctx context.Context, conf testPeersConfig, peerPriv go p2p.NewRelayRouter(peerTCPNode, []peer.ID{hostAsPeer.ID}, relays)(ctx) + // Proactively connect to the host, simulating what happens when all peers run + // testpeers simultaneously: each peer dials the others it is responsible for. + // NewRelayRouter only adds peerstore addresses; something must trigger the dial. + go func() { + for ctx.Err() == nil { + if len(peerTCPNode.Network().ConnsToPeer(hostAsPeer.ID)) > 0 { + return + } + + _ = peerTCPNode.Connect(ctx, peer.AddrInfo{ID: hostAsPeer.ID}) + + time.Sleep(time.Second) + } + }() + peerENR, err := enr.New(peerPrivKey) require.NoError(t, err) diff --git a/p2p/p2p.go b/p2p/p2p.go index 999ff7685b..8778a7754b 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -21,6 +21,7 @@ import ( "github.com/libp2p/go-libp2p/core/peerstore" "github.com/libp2p/go-libp2p/core/routing" "github.com/libp2p/go-libp2p/p2p/net/swarm" + "github.com/libp2p/go-libp2p/p2p/protocol/holepunch" quic "github.com/libp2p/go-libp2p/p2p/transport/quic" //nolint:revive // Must be imported with alias "github.com/libp2p/go-libp2p/p2p/transport/tcp" ma "github.com/multiformats/go-multiaddr" @@ -106,6 +107,7 @@ func NewNode(ctx context.Context, cfg Config, key *k1.PrivateKey, connGater Conn libp2p.ConnectionGater(connGater), // Enable Autonat (required for hole punching) libp2p.EnableNATService(), + libp2p.EnableHolePunching(holepunch.WithTracer(newHolePunchTracer(ctx))), libp2p.AddrsFactory(func(internalAddrs []ma.Multiaddr) []ma.Multiaddr { return filterAdvertisedAddrs(externalAddrs, internalAddrs, filterPrivateAddrs) }), @@ -258,7 +260,13 @@ func multiAddrsViaRelay(relayPeer Peer, peerID peer.ID) ([]ma.Multiaddr, error) // NewEventCollector returns a lifecycle hook that instruments libp2p events. func NewEventCollector(p2pNode host.Host) lifecycle.HookFuncCtx { return func(ctx context.Context) { - sub, err := p2pNode.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged)) + sub, err := p2pNode.EventBus().Subscribe([]any{ + new(event.EvtLocalReachabilityChanged), + new(event.EvtLocalAddressesUpdated), + new(event.EvtNATDeviceTypeChanged), + new(event.EvtPeerIdentificationCompleted), + new(event.EvtPeerIdentificationFailed), + }) if err != nil { log.Error(ctx, "Failed to subscribe to libp2p events", err) return @@ -277,6 +285,41 @@ func NewEventCollector(p2pNode host.Host) lifecycle.HookFuncCtx { case event.EvtLocalReachabilityChanged: log.Info(ctx, "Libp2p reachability changed", z.Any("status", evt.Reachability)) reachableGauge.Set(float64(evt.Reachability)) + case event.EvtLocalAddressesUpdated: + var addrs []string + + for _, a := range evt.Current { + if a.Action == event.Added { + addrs = append(addrs, a.Address.String()) + } + } + + if len(addrs) > 0 { + log.Debug(ctx, "Libp2p addresses updated, new addresses added", + z.Any("added", addrs), + z.Any("all_host_addrs", p2pNode.Addrs()), + ) + } + case event.EvtNATDeviceTypeChanged: + log.Debug(ctx, "NAT device type changed", + z.Any("transport", evt.TransportProtocol), + z.Any("nat_type", evt.NatDeviceType), + ) + case event.EvtPeerIdentificationCompleted: + isPublic := evt.ObservedAddr != nil && manet.IsPublicAddr(evt.ObservedAddr) + supportsDCUtR := slices.Contains(evt.Protocols, holepunch.Protocol) + log.Debug(ctx, "Peer identification completed", + z.Str("peer", PeerName(evt.Peer)), + z.Any("observed_addr", evt.ObservedAddr), + z.Any("conn_local", evt.Conn.LocalMultiaddr()), + z.Any("conn_remote", evt.Conn.RemoteMultiaddr()), + z.Bool("observed_is_public", isPublic), + z.Bool("peer_supports_dcutr", supportsDCUtR), + ) + case event.EvtPeerIdentificationFailed: + log.Warn(ctx, "Peer identification failed", evt.Reason, + z.Str("peer", PeerName(evt.Peer)), + ) default: log.Warn(ctx, "Unknown libp2p event", nil, z.Str("type", fmt.Sprintf("%T", e))) } @@ -285,54 +328,68 @@ func NewEventCollector(p2pNode host.Host) lifecycle.HookFuncCtx { } } -// peerRoutingFunc wraps a function to implement routing.PeerRouting. -type peerRoutingFunc func(context.Context, peer.ID) (peer.AddrInfo, error) - -func (f peerRoutingFunc) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) { - return f(ctx, p) +// holePunchTracer implements holepunch.EventTracer to log all DCUtR lifecycle events. +type holePunchTracer struct { + ctx context.Context } -// ForceDirectConnections attempts to establish a direct connection if there is an existing relay connection to the peer. -// The idea is to enable switching to a direct connection as soon as the host has a connection to the peer. -func ForceDirectConnections(p2pNode host.Host, peerIDs []peer.ID) lifecycle.HookFuncCtx { - forceDirectConn := func(ctx context.Context) { - for _, p := range peerIDs { - if p2pNode.ID() == p { - continue // Skip self - } - - conns := p2pNode.Network().ConnsToPeer(p) - if len(conns) == 0 { - // Skip if there isn't any existing connection to peer. Note that we only force direct connection - // if there is already an existing relay connection between the host and peer. - continue - } - - if isDirectConnAvailable(conns) { - continue - } +func newHolePunchTracer(ctx context.Context) *holePunchTracer { + return &holePunchTracer{ctx: log.WithTopic(ctx, "p2p")} +} - // All existing connections are through relays, so we can try force dialing a direct connection. - err := p2pNode.Connect(network.WithForceDirectDial(ctx, "relay_to_direct"), peer.AddrInfo{ID: p}) - if err == nil { - log.Debug(ctx, "Forced direct connection to peer successful", z.Str("peer", PeerName(p))) - } +func (t *holePunchTracer) Trace(evt *holepunch.Event) { + name := PeerName(evt.Remote) + switch e := evt.Evt.(type) { + case *holepunch.StartHolePunchEvt: + log.Debug(t.ctx, "Hole punch started", + z.Str("peer", name), + z.Any("remote_addrs", e.RemoteAddrs), + z.Any("rtt", e.RTT), + ) + case *holepunch.EndHolePunchEvt: + if e.Success { + log.Debug(t.ctx, "Hole punch succeeded", + z.Str("peer", name), + z.Any("elapsed", e.EllapsedTime), + ) + } else { + log.Warn(t.ctx, "Hole punch failed", errors.New(e.Error), + z.Str("peer", name), + z.Any("elapsed", e.EllapsedTime), + ) } + case *holepunch.HolePunchAttemptEvt: + log.Debug(t.ctx, "Hole punch attempt", + z.Str("peer", name), + z.Int("attempt", e.Attempt), + ) + case *holepunch.DirectDialEvt: + if e.Success { + log.Debug(t.ctx, "Direct dial succeeded during hole punch", + z.Str("peer", name), + z.Any("elapsed", e.EllapsedTime), + ) + } else { + log.Debug(t.ctx, "Direct dial failed during hole punch", + z.Str("peer", name), + z.Any("elapsed", e.EllapsedTime), + z.Str("error", e.Error), + ) + } + case *holepunch.ProtocolErrorEvt: + log.Warn(t.ctx, "Hole punch protocol error", errors.New(e.Error), + z.Str("peer", name), + ) + default: + log.Warn(t.ctx, "Unknown hole punch event", nil, z.Str("type", fmt.Sprintf("%T", evt.Evt))) } +} - return func(ctx context.Context) { - ticker := time.NewTicker(1 * time.Minute) - defer ticker.Stop() +// peerRoutingFunc wraps a function to implement routing.PeerRouting. +type peerRoutingFunc func(context.Context, peer.ID) (peer.AddrInfo, error) - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - forceDirectConn(ctx) - } - } - } +func (f peerRoutingFunc) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) { + return f(ctx, p) } // isQUICEnabled returns true if the host has an address or listening address on QUIC. @@ -350,19 +407,6 @@ func isQUICEnabled(h host.Host) bool { return false } -// isDirectConnAvailable returns true if direct connection is available in the given set of connections. -func isDirectConnAvailable(conns []network.Conn) bool { - for _, conn := range conns { - if IsRelayAddr(conn.RemoteMultiaddr()) { - continue - } - - return true - } - - return false -} - // UpgradeToQUICConnections tries to upgrade a direct TCP connection to a direct QUIC connection // if there is known QUIC addresses from the peerstore. func UpgradeToQUICConnections(p2pNode host.Host, peerIDs []peer.ID) lifecycle.HookFuncCtx { @@ -457,7 +501,7 @@ func UpgradeToQUICConnections(p2pNode host.Host, peerIDs []peer.ID) lifecycle.Ho if !hasDirectTCPConn(conns) { log.Debug(ctx, "No direct connection via TCP to peer", z.Str("peer", PeerName(p)), z.Any("conns", conns)) - continue // no direct TPC connection to upgrade to QUIC, ForceDirectConnections shall upgrade to direct + continue // no direct TCP connection to upgrade to QUIC, hole punching shall upgrade to direct } // Get known QUIC addrs from peerstore @@ -733,6 +777,13 @@ func RegisterConnectionLogger(ctx context.Context, p2pNode host.Host, peerIDs [] z.Any("direction", e.Direction), z.Str("type", typ), ) + + if typ == addrTypeRelay && e.Direction == network.DirInbound { + log.Debug(ctx, "Inbound relay connection detected, DCUtR hole punch should initiate", + z.Str("peer", name), + z.Any("peer_address", addr), + ) + } } else if e.Disconnect { log.Debug(ctx, "Libp2p disconnected", z.Str("peer", name), diff --git a/p2p/relay.go b/p2p/relay.go index 95a0b9cb80..7a8829161c 100644 --- a/p2p/relay.go +++ b/p2p/relay.go @@ -105,6 +105,15 @@ func NewRelayReserver(p2pNode host.Host, relay *MutablePeer) lifecycle.HookFuncC // NewRelayRouter returns a life cycle hook that routes peers via relays in libp2p by // continuously adding peer relay addresses to libp2p peer store. +// +// Only relay routes for peers that THIS node should dial are added. For each +// peer pair, the node with the smaller peer ID dials and the node with the +// larger peer ID waits. This asymmetry is required for DCUtR (hole punching) +// to work: libp2p's holepunch service only initiates the DCUtR protocol on +// INBOUND relay connections. If both sides add relay routes and dial +// simultaneously, both see outbound connections and neither side triggers +// DCUtR. By having one side wait, it sees the other's dial as an inbound +// relay connection, which activates DCUtR and enables NAT traversal. func NewRelayRouter(p2pNode host.Host, peers []peer.ID, relays []*MutablePeer) lifecycle.HookFuncCtx { return func(ctx context.Context) { if len(relays) == 0 { @@ -113,13 +122,18 @@ func NewRelayRouter(p2pNode host.Host, peers []peer.ID, relays []*MutablePeer) l ctx = log.WithTopic(ctx, "p2p") + selfID := p2pNode.ID() + ticker := time.NewTicker(routedAddrTTL * 9 / 10) defer ticker.Stop() for { for _, pID := range peers { - if pID == p2pNode.ID() { - // Skip self + if pID == selfID { + continue + } + + if !shouldDialPeer(selfID, pID) { continue } @@ -147,3 +161,10 @@ func NewRelayRouter(p2pNode host.Host, peers []peer.ID, relays []*MutablePeer) l } } } + +// shouldDialPeer returns true if this node should proactively dial the given +// peer via relay. The peer with the smaller ID dials; the peer with the larger +// ID waits for the inbound connection (which triggers DCUtR hole punching). +func shouldDialPeer(self, remote peer.ID) bool { + return self < remote +} diff --git a/testutil/integration/helpers_test.go b/testutil/integration/helpers_test.go index 2516aadb5b..e0dc05006e 100644 --- a/testutil/integration/helpers_test.go +++ b/testutil/integration/helpers_test.go @@ -55,6 +55,17 @@ func (a *asserter) await(ctx context.Context, t *testing.T, expect int) error { return len(actual) >= expect }, a.Timeout, time.Millisecond*10) + // Re-count after assert.Eventually returns to handle the race where context + // was cancelled in the window between assert.Eventually returning true + // (callbacks satisfied) and the ctx.Err() check below. + count := 0 + + a.callbacks.Range(func(any, any) bool { count++; return true }) + + if count >= expect { + return nil + } + if ctx.Err() != nil { return context.Canceled } diff --git a/testutil/integration/ping_test.go b/testutil/integration/ping_test.go index ffb4ac3390..902a1df584 100644 --- a/testutil/integration/ping_test.go +++ b/testutil/integration/ping_test.go @@ -59,7 +59,7 @@ func TestPingCluster(t *testing.T) { t.Run("relay_incorrect_externalhost", func(t *testing.T) { pingCluster(t, pingTest{ BindLocalhost: true, - ExternalIP: "222.222.222.22", + ExternalIP: "192.0.2.1", }) }) }