Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,6 @@ func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PPing, p2p.NewPingService(p2pNode, peerIDs, conf.TestConfig.TestPingConfig))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PEventCollector, p2p.NewEventCollector(p2pNode))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartP2PRouters, p2p.NewRelayRouter(p2pNode, peerIDs, relays))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartForceDirectConns, p2p.ForceDirectConnections(p2pNode, peerIDs))
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartForceQUICConns, p2p.UpgradeToQUICConnections(p2pNode, peerIDs))

return p2pNode, nil
Expand Down
1 change: 0 additions & 1 deletion app/lifecycle/order.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ const (
StartValidatorAPI
StartP2PPing
StartP2PRouters
StartForceDirectConns
StartForceQUICConns
StartP2PConsensus
StartSimulator
Expand Down
28 changes: 13 additions & 15 deletions app/lifecycle/orderstart_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions app/lifecycle/orderstop_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions cmd/testpeers_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,21 @@ func startPeer(t *testing.T, ctx context.Context, conf testPeersConfig, peerPriv

go p2p.NewRelayRouter(peerTCPNode, []peer.ID{hostAsPeer.ID}, relays)(ctx)

// Proactively connect to the host, simulating what happens when all peers run
// testpeers simultaneously: each peer dials the others it is responsible for.
// NewRelayRouter only adds peerstore addresses; something must trigger the dial.
go func() {
for ctx.Err() == nil {
if len(peerTCPNode.Network().ConnsToPeer(hostAsPeer.ID)) > 0 {
return
}

_ = peerTCPNode.Connect(ctx, peer.AddrInfo{ID: hostAsPeer.ID})

time.Sleep(time.Second)
}
}()

peerENR, err := enr.New(peerPrivKey)
require.NoError(t, err)

Expand Down
163 changes: 107 additions & 56 deletions p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic" //nolint:revive // Must be imported with alias
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -106,6 +107,7 @@
libp2p.ConnectionGater(connGater),
// Enable Autonat (required for hole punching)
libp2p.EnableNATService(),
libp2p.EnableHolePunching(holepunch.WithTracer(newHolePunchTracer(ctx))),
libp2p.AddrsFactory(func(internalAddrs []ma.Multiaddr) []ma.Multiaddr {
return filterAdvertisedAddrs(externalAddrs, internalAddrs, filterPrivateAddrs)
}),
Expand Down Expand Up @@ -256,9 +258,15 @@
}

// NewEventCollector returns a lifecycle hook that instruments libp2p events.
func NewEventCollector(p2pNode host.Host) lifecycle.HookFuncCtx {

Check failure on line 261 in p2p/p2p.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 21 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=ObolNetwork_charon&issues=AZ4_jSaMSosnWSdcPU6V&open=AZ4_jSaMSosnWSdcPU6V&pullRequest=4516
return func(ctx context.Context) {
sub, err := p2pNode.EventBus().Subscribe(new(event.EvtLocalReachabilityChanged))
sub, err := p2pNode.EventBus().Subscribe([]any{
new(event.EvtLocalReachabilityChanged),
new(event.EvtLocalAddressesUpdated),
new(event.EvtNATDeviceTypeChanged),
new(event.EvtPeerIdentificationCompleted),
new(event.EvtPeerIdentificationFailed),
})
if err != nil {
log.Error(ctx, "Failed to subscribe to libp2p events", err)
return
Expand All @@ -277,6 +285,41 @@
case event.EvtLocalReachabilityChanged:
log.Info(ctx, "Libp2p reachability changed", z.Any("status", evt.Reachability))
reachableGauge.Set(float64(evt.Reachability))
case event.EvtLocalAddressesUpdated:
var addrs []string

for _, a := range evt.Current {
if a.Action == event.Added {
addrs = append(addrs, a.Address.String())
}
}

if len(addrs) > 0 {
log.Debug(ctx, "Libp2p addresses updated, new addresses added",
z.Any("added", addrs),
z.Any("all_host_addrs", p2pNode.Addrs()),
)
}
case event.EvtNATDeviceTypeChanged:
log.Debug(ctx, "NAT device type changed",
z.Any("transport", evt.TransportProtocol),
z.Any("nat_type", evt.NatDeviceType),
)
case event.EvtPeerIdentificationCompleted:
isPublic := evt.ObservedAddr != nil && manet.IsPublicAddr(evt.ObservedAddr)
supportsDCUtR := slices.Contains(evt.Protocols, holepunch.Protocol)
log.Debug(ctx, "Peer identification completed",
z.Str("peer", PeerName(evt.Peer)),
z.Any("observed_addr", evt.ObservedAddr),
z.Any("conn_local", evt.Conn.LocalMultiaddr()),
z.Any("conn_remote", evt.Conn.RemoteMultiaddr()),
z.Bool("observed_is_public", isPublic),
z.Bool("peer_supports_dcutr", supportsDCUtR),
)
case event.EvtPeerIdentificationFailed:
log.Warn(ctx, "Peer identification failed", evt.Reason,
z.Str("peer", PeerName(evt.Peer)),
)
default:
log.Warn(ctx, "Unknown libp2p event", nil, z.Str("type", fmt.Sprintf("%T", e)))
}
Expand All @@ -285,54 +328,68 @@
}
}

// peerRoutingFunc wraps a function to implement routing.PeerRouting.
type peerRoutingFunc func(context.Context, peer.ID) (peer.AddrInfo, error)

func (f peerRoutingFunc) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) {
return f(ctx, p)
// holePunchTracer implements holepunch.EventTracer to log all DCUtR lifecycle events.
type holePunchTracer struct {
ctx context.Context

Check warning on line 333 in p2p/p2p.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'context.Context' field and pass context as a parameter to methods that need it.

See more on https://sonarcloud.io/project/issues?id=ObolNetwork_charon&issues=AZ4_jSaMSosnWSdcPU6X&open=AZ4_jSaMSosnWSdcPU6X&pullRequest=4516
}

// ForceDirectConnections attempts to establish a direct connection if there is an existing relay connection to the peer.
// The idea is to enable switching to a direct connection as soon as the host has a connection to the peer.
func ForceDirectConnections(p2pNode host.Host, peerIDs []peer.ID) lifecycle.HookFuncCtx {
forceDirectConn := func(ctx context.Context) {
for _, p := range peerIDs {
if p2pNode.ID() == p {
continue // Skip self
}

conns := p2pNode.Network().ConnsToPeer(p)
if len(conns) == 0 {
// Skip if there isn't any existing connection to peer. Note that we only force direct connection
// if there is already an existing relay connection between the host and peer.
continue
}

if isDirectConnAvailable(conns) {
continue
}
func newHolePunchTracer(ctx context.Context) *holePunchTracer {
return &holePunchTracer{ctx: log.WithTopic(ctx, "p2p")}
}

// All existing connections are through relays, so we can try force dialing a direct connection.
err := p2pNode.Connect(network.WithForceDirectDial(ctx, "relay_to_direct"), peer.AddrInfo{ID: p})
if err == nil {
log.Debug(ctx, "Forced direct connection to peer successful", z.Str("peer", PeerName(p)))
}
func (t *holePunchTracer) Trace(evt *holepunch.Event) {
name := PeerName(evt.Remote)
switch e := evt.Evt.(type) {
case *holepunch.StartHolePunchEvt:
log.Debug(t.ctx, "Hole punch started",
z.Str("peer", name),
z.Any("remote_addrs", e.RemoteAddrs),
z.Any("rtt", e.RTT),
)
case *holepunch.EndHolePunchEvt:
if e.Success {
log.Debug(t.ctx, "Hole punch succeeded",
z.Str("peer", name),
z.Any("elapsed", e.EllapsedTime),
)
} else {
log.Warn(t.ctx, "Hole punch failed", errors.New(e.Error),
z.Str("peer", name),
z.Any("elapsed", e.EllapsedTime),
)
}
case *holepunch.HolePunchAttemptEvt:
log.Debug(t.ctx, "Hole punch attempt",
z.Str("peer", name),
z.Int("attempt", e.Attempt),
)
case *holepunch.DirectDialEvt:
if e.Success {
log.Debug(t.ctx, "Direct dial succeeded during hole punch",
z.Str("peer", name),
z.Any("elapsed", e.EllapsedTime),
)
} else {
log.Debug(t.ctx, "Direct dial failed during hole punch",
z.Str("peer", name),
z.Any("elapsed", e.EllapsedTime),
z.Str("error", e.Error),
)
}
case *holepunch.ProtocolErrorEvt:
log.Warn(t.ctx, "Hole punch protocol error", errors.New(e.Error),
z.Str("peer", name),
)
default:
log.Warn(t.ctx, "Unknown hole punch event", nil, z.Str("type", fmt.Sprintf("%T", evt.Evt)))
}
}

return func(ctx context.Context) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
// peerRoutingFunc wraps a function to implement routing.PeerRouting.
type peerRoutingFunc func(context.Context, peer.ID) (peer.AddrInfo, error)

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
forceDirectConn(ctx)
}
}
}
func (f peerRoutingFunc) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) {
return f(ctx, p)
}

// isQUICEnabled returns true if the host has an address or listening address on QUIC.
Expand All @@ -350,19 +407,6 @@
return false
}

// isDirectConnAvailable returns true if direct connection is available in the given set of connections.
func isDirectConnAvailable(conns []network.Conn) bool {
for _, conn := range conns {
if IsRelayAddr(conn.RemoteMultiaddr()) {
continue
}

return true
}

return false
}

// UpgradeToQUICConnections tries to upgrade a direct TCP connection to a direct QUIC connection
// if there is known QUIC addresses from the peerstore.
func UpgradeToQUICConnections(p2pNode host.Host, peerIDs []peer.ID) lifecycle.HookFuncCtx {
Expand Down Expand Up @@ -457,7 +501,7 @@

if !hasDirectTCPConn(conns) {
log.Debug(ctx, "No direct connection via TCP to peer", z.Str("peer", PeerName(p)), z.Any("conns", conns))
continue // no direct TPC connection to upgrade to QUIC, ForceDirectConnections shall upgrade to direct
continue // no direct TCP connection to upgrade to QUIC, hole punching shall upgrade to direct
}

// Get known QUIC addrs from peerstore
Expand Down Expand Up @@ -733,6 +777,13 @@
z.Any("direction", e.Direction),
z.Str("type", typ),
)

if typ == addrTypeRelay && e.Direction == network.DirInbound {
log.Debug(ctx, "Inbound relay connection detected, DCUtR hole punch should initiate",
z.Str("peer", name),
z.Any("peer_address", addr),
)
}
} else if e.Disconnect {
log.Debug(ctx, "Libp2p disconnected",
z.Str("peer", name),
Expand Down
25 changes: 23 additions & 2 deletions p2p/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ func NewRelayReserver(p2pNode host.Host, relay *MutablePeer) lifecycle.HookFuncC

// NewRelayRouter returns a life cycle hook that routes peers via relays in libp2p by
// continuously adding peer relay addresses to libp2p peer store.
//
// Only relay routes for peers that THIS node should dial are added. For each
// peer pair, the node with the smaller peer ID dials and the node with the
// larger peer ID waits. This asymmetry is required for DCUtR (hole punching)
// to work: libp2p's holepunch service only initiates the DCUtR protocol on
// INBOUND relay connections. If both sides add relay routes and dial
// simultaneously, both see outbound connections and neither side triggers
// DCUtR. By having one side wait, it sees the other's dial as an inbound
// relay connection, which activates DCUtR and enables NAT traversal.
func NewRelayRouter(p2pNode host.Host, peers []peer.ID, relays []*MutablePeer) lifecycle.HookFuncCtx {
return func(ctx context.Context) {
if len(relays) == 0 {
Expand All @@ -113,13 +122,18 @@ func NewRelayRouter(p2pNode host.Host, peers []peer.ID, relays []*MutablePeer) l

ctx = log.WithTopic(ctx, "p2p")

selfID := p2pNode.ID()

ticker := time.NewTicker(routedAddrTTL * 9 / 10)
defer ticker.Stop()

for {
for _, pID := range peers {
if pID == p2pNode.ID() {
// Skip self
if pID == selfID {
continue
}

if !shouldDialPeer(selfID, pID) {
continue
}

Expand Down Expand Up @@ -147,3 +161,10 @@ func NewRelayRouter(p2pNode host.Host, peers []peer.ID, relays []*MutablePeer) l
}
}
}

// shouldDialPeer returns true if this node should proactively dial the given
// peer via relay. The peer with the smaller ID dials; the peer with the larger
// ID waits for the inbound connection (which triggers DCUtR hole punching).
func shouldDialPeer(self, remote peer.ID) bool {
return self < remote
}
11 changes: 11 additions & 0 deletions testutil/integration/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ func (a *asserter) await(ctx context.Context, t *testing.T, expect int) error {
return len(actual) >= expect
}, a.Timeout, time.Millisecond*10)

// Re-count after assert.Eventually returns to handle the race where context
// was cancelled in the window between assert.Eventually returning true
// (callbacks satisfied) and the ctx.Err() check below.
count := 0

a.callbacks.Range(func(any, any) bool { count++; return true })

if count >= expect {
return nil
}

if ctx.Err() != nil {
return context.Canceled
}
Expand Down
2 changes: 1 addition & 1 deletion testutil/integration/ping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestPingCluster(t *testing.T) {
t.Run("relay_incorrect_externalhost", func(t *testing.T) {
pingCluster(t, pingTest{
BindLocalhost: true,
ExternalIP: "222.222.222.22",
ExternalIP: "192.0.2.1",
})
})
}
Expand Down
Loading