From 336b34d62d8610a12ea110299807772c44ef63e2 Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Wed, 7 Jan 2026 00:34:12 +0100 Subject: [PATCH 1/3] networking: more complete gossipsub spec --- src/lean_spec/subspecs/networking/__init__.py | 4 +- .../subspecs/networking/gossipsub/__init__.py | 95 +++- .../subspecs/networking/gossipsub/control.py | 167 ++++++ .../subspecs/networking/gossipsub/mcache.py | 372 ++++++++++++++ .../subspecs/networking/gossipsub/mesh.py | 358 +++++++++++++ .../subspecs/networking/gossipsub/message.py | 275 +++++++--- .../networking/gossipsub/parameters.py | 160 +++++- .../subspecs/networking/gossipsub/topic.py | 274 +++++++++- .../subspecs/networking/gossipsub/types.py | 45 ++ src/lean_spec/types/__init__.py | 3 +- src/lean_spec/types/byte_arrays.py | 6 + .../subspecs/networking/test_gossipsub.py | 484 +++++++++++------- 12 files changed, 1946 insertions(+), 297 deletions(-) create mode 100644 src/lean_spec/subspecs/networking/gossipsub/control.py create mode 100644 src/lean_spec/subspecs/networking/gossipsub/mcache.py create mode 100644 src/lean_spec/subspecs/networking/gossipsub/mesh.py create mode 100644 src/lean_spec/subspecs/networking/gossipsub/types.py diff --git a/src/lean_spec/subspecs/networking/__init__.py b/src/lean_spec/subspecs/networking/__init__.py index 95b6426c..33ed0b00 100644 --- a/src/lean_spec/subspecs/networking/__init__.py +++ b/src/lean_spec/subspecs/networking/__init__.py @@ -7,7 +7,7 @@ ) from .gossipsub.message import GossipsubMessage from .gossipsub.parameters import GossipsubParameters -from .gossipsub.topic import GossipsubTopic +from .gossipsub.topic import GossipTopic from .reqresp import ( BLOCKS_BY_ROOT_PROTOCOL_V1, STATUS_PROTOCOL_V1, @@ -22,7 +22,7 @@ "MESSAGE_DOMAIN_INVALID_SNAPPY", "MESSAGE_DOMAIN_VALID_SNAPPY", "GossipsubParameters", - "GossipsubTopic", + "GossipTopic", "GossipsubMessage", "BLOCKS_BY_ROOT_PROTOCOL_V1", "STATUS_PROTOCOL_V1", diff --git a/src/lean_spec/subspecs/networking/gossipsub/__init__.py b/src/lean_spec/subspecs/networking/gossipsub/__init__.py index 08426341..20d7f0b2 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/__init__.py +++ b/src/lean_spec/subspecs/networking/gossipsub/__init__.py @@ -1,12 +1,97 @@ -"""Gossipsub specs for the Lean Ethereum consensus specification.""" +""" +Gossipsub Protocol Implementation +================================= -from .message import GossipsubMessage, MessageId -from .parameters import GossipsubParameters -from .topic import GossipsubTopic +Gossipsub is a mesh-based pubsub protocol combining: + +1. **Eager push** within topic meshes for low-latency delivery +2. **Lazy pull** via gossip (IHAVE/IWANT) for reliability + +Key Concepts +------------ + +- **Mesh**: Full message exchange with D peers per topic +- **Fanout**: Temporary peers for publish-only topics +- **Gossip**: IHAVE/IWANT for message dissemination to non-mesh peers +- **IDONTWANT**: Bandwidth optimization (v1.2) + +References: +---------- +- Gossipsub v1.0: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md +- Gossipsub v1.2: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md +- Ethereum P2P: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md +""" + +from .control import ( + ControlMessage, + Graft, + IDontWant, + IHave, + IWant, + Prune, +) +from .mcache import ( + CacheEntry, + MessageCache, + SeenCache, +) +from .mesh import ( + FanoutEntry, + MeshState, + TopicMesh, +) +from .message import GossipsubMessage, SnappyDecompressor +from .parameters import ( + GossipsubParameters, +) +from .topic import ( + ATTESTATION_TOPIC_NAME, + BLOCK_TOPIC_NAME, + ENCODING_POSTFIX, + TOPIC_PREFIX, + GossipTopic, + TopicKind, + format_topic_string, + parse_topic_string, +) +from .types import ( + MessageId, + PeerId, + TopicId, +) __all__ = [ + # Message "GossipsubMessage", + "SnappyDecompressor", + # Topic + "GossipTopic", + "TopicKind", + "TOPIC_PREFIX", + "ENCODING_POSTFIX", + "BLOCK_TOPIC_NAME", + "ATTESTATION_TOPIC_NAME", + "format_topic_string", + "parse_topic_string", + # Parameters "GossipsubParameters", - "GossipsubTopic", + # Control + "ControlMessage", + "Graft", + "Prune", + "IHave", + "IWant", + "IDontWant", + # Mesh + "MeshState", + "TopicMesh", + "FanoutEntry", + # Cache + "MessageCache", + "SeenCache", + "CacheEntry", + # Types "MessageId", + "PeerId", + "TopicId", ] diff --git a/src/lean_spec/subspecs/networking/gossipsub/control.py b/src/lean_spec/subspecs/networking/gossipsub/control.py new file mode 100644 index 00000000..815a0167 --- /dev/null +++ b/src/lean_spec/subspecs/networking/gossipsub/control.py @@ -0,0 +1,167 @@ +""" +Gossipsub Control Messages +========================== + +Control messages orchestrate the gossip mesh topology and message propagation. + +Overview +-------- + +Gossipsub uses control messages piggybacked on regular RPC messages to: + +- Manage mesh membership (GRAFT/PRUNE) +- Enable lazy message propagation (IHAVE/IWANT) +- Reduce bandwidth for large messages (IDONTWANT) + +Control Message Types +--------------------- + ++-------------+----------------------------------------------------------+ +| Message | Purpose | ++=============+==========================================================+ +| GRAFT | Request to join a peer's mesh for a topic | ++-------------+----------------------------------------------------------+ +| PRUNE | Notify peer of removal from mesh | ++-------------+----------------------------------------------------------+ +| IHAVE | Advertise message IDs available for a topic | ++-------------+----------------------------------------------------------+ +| IWANT | Request full messages by their IDs | ++-------------+----------------------------------------------------------+ +| IDONTWANT | Signal that specific messages are not needed (v1.2) | ++-------------+----------------------------------------------------------+ + +Protocol Flow +------------- + +**Mesh Management:** + +1. Peer A sends GRAFT to peer B for topic T +2. Peer B adds A to its mesh for T (or sends PRUNE if refusing) +3. Both peers now exchange full messages for topic T + +**Lazy Pull:** + +1. Peer A receives message M, adds to cache +2. Peer A sends IHAVE with M's ID to non-mesh peers +3. Peer B responds with IWANT if it needs M +4. Peer A sends full message M + +References: +---------- +- Gossipsub v1.0: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md +- Gossipsub v1.2: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md +""" + +from __future__ import annotations + +from lean_spec.subspecs.networking.gossipsub.types import MessageId +from lean_spec.types import StrictBaseModel + + +class Graft(StrictBaseModel): + """Request to join a peer's mesh for a topic. + + Sent when a peer wants to upgrade from gossip-only to full message exchange. + + The receiving peer should add the sender to its mesh unless: + + - The peer is already in the mesh + - The mesh is at capacity (|mesh| >= D_high) + - The peer is in a backoff period from a recent PRUNE + """ + + topic_id: str + """Topic identifier to join the mesh for.""" + + +class Prune(StrictBaseModel): + """Notification of removal from a peer's mesh. + + Sent when: + + - A peer unsubscribes from a topic + - Mesh size exceeds D_high during heartbeat + - A GRAFT is rejected + + The pruned peer should not send GRAFT for this topic + until the backoff period expires. + """ + + topic_id: str + """Topic identifier being pruned from.""" + + +class IHave(StrictBaseModel): + """Advertisement of cached message IDs for a topic. + + Sent to non-mesh peers during heartbeat to enable lazy pull. + Recipients can request any missing messages via IWANT. + + Only includes messages from recent cache windows (mcache_gossip). + """ + + topic_id: str + """Topic the advertised messages belong to.""" + + message_ids: list[MessageId] + """IDs of messages available in the sender's cache.""" + + +class IWant(StrictBaseModel): + """Request for full messages by their IDs. + + Sent in response to IHAVE when the peer needs specific messages. + The peer should respond with the requested messages if still cached. + """ + + message_ids: list[MessageId] + """IDs of messages being requested.""" + + +class IDontWant(StrictBaseModel): + """Signal that specific messages are not needed. + + Introduced in gossipsub v1.2 for bandwidth optimization. + + Sent immediately after receiving a large message to tell mesh peers + not to forward their copy. Only used for messages exceeding the + IDONTWANT size threshold (typically 1KB). + """ + + message_ids: list[MessageId] + """IDs of messages the sender does not want to receive.""" + + +class ControlMessage(StrictBaseModel): + """Container for aggregated control messages. + + Multiple control messages are batched into a single RPC + for efficiency. An RPC can contain any combination of + control message types. + + Example:: + + control = ControlMessage( + grafts=[Graft(topic_id="blocks")], + ihaves=[IHave(topic_id="blocks", message_ids=[msg_id])], + ) + """ + + grafts: list[Graft] = [] + """GRAFT messages requesting mesh membership.""" + + prunes: list[Prune] = [] + """PRUNE messages notifying mesh removal.""" + + ihaves: list[IHave] = [] + """IHAVE messages advertising cached message IDs.""" + + iwants: list[IWant] = [] + """IWANT messages requesting full messages.""" + + idontwants: list[IDontWant] = [] + """IDONTWANT messages declining specific messages (v1.2).""" + + def is_empty(self) -> bool: + """Check if this control message contains no data.""" + return not (self.grafts or self.prunes or self.ihaves or self.iwants or self.idontwants) diff --git a/src/lean_spec/subspecs/networking/gossipsub/mcache.py b/src/lean_spec/subspecs/networking/gossipsub/mcache.py new file mode 100644 index 00000000..9bd02963 --- /dev/null +++ b/src/lean_spec/subspecs/networking/gossipsub/mcache.py @@ -0,0 +1,372 @@ +""" +Gossipsub Message Cache +======================= + +Caches recent messages for gossip dissemination and IWANT responses. + +Overview +-------- + +The message cache enables the lazy pull protocol by storing messages +that can be requested via IWANT after receiving IHAVE advertisements. + +:: + + Peer A Peer B (non-mesh) + | | + |--- IHAVE [msg1, msg2] ------>| + | | + |<----- IWANT [msg2] ----------| + | | + |--- MESSAGE [msg2] ---------->| <- Retrieved from cache + +Sliding Window Design +--------------------- + +The cache is organized as a sliding window of history buckets:: + + +----------+----------+----------+----------+ + | Window 0 | Window 1 | Window 2 | Window 3 | ... + | (newest) | | | (oldest) | + +----------+----------+----------+----------+ + ^ + | + New messages go here + +Each heartbeat: + +1. Oldest window is evicted (messages cleaned up) +2. New empty window is prepended +3. Windows shift: 0 -> 1 -> 2 -> ... + +Key Parameters +-------------- + +- **mcache_len** (6): Total windows retained +- **mcache_gossip** (3): Recent windows included in IHAVE + +Only the first `mcache_gossip` windows are advertised via IHAVE. +Older messages can still be retrieved via IWANT but won't be +actively gossiped. + +Seen Cache +---------- + +A separate `SeenCache` tracks message IDs for deduplication +without storing full messages. Uses TTL-based expiry. + +References: +---------- +- Gossipsub v1.0: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md +""" + +from __future__ import annotations + +from collections import deque +from dataclasses import dataclass, field + +from .message import GossipsubMessage +from .types import MessageId, Timestamp, TopicId + + +@dataclass(slots=True) +class CacheEntry: + """A single entry in the message cache. + + Stores the message along with its topic for efficient retrieval + during IWANT responses and topic-filtered IHAVE gossip. + """ + + message: GossipsubMessage + """The cached gossipsub message.""" + + topic: TopicId + """Topic this message was published to. + + Used to filter messages when generating IHAVE gossip for a specific topic. + """ + + +@dataclass(slots=True) +class MessageCache: + """Sliding window cache for gossipsub messages. + + Maintains recent messages for: + + - **IWANT responses**: Retrieve full messages by ID + - **IHAVE gossip**: Get message IDs for advertisement + + Example:: + + cache = MessageCache(mcache_len=6, mcache_gossip=3) + + # Add messages + cache.put("blocks", msg1) + cache.put("blocks", msg2) + + # Get message IDs for IHAVE + ids = cache.get_gossip_ids("blocks") + + # Respond to IWANT + msg = cache.get(requested_id) + + # Shift window (called each heartbeat) + evicted = cache.shift() + """ + + mcache_len: int = 6 + """Number of history windows to retain. + + Messages are evicted after this many heartbeat intervals. + + Higher values increase memory usage but improve message + availability for late IWANT requests. + """ + + mcache_gossip: int = 3 + """Number of recent windows to include in IHAVE gossip. + + Only messages from the most recent windows are advertised. + Should be less than or equal to mcache_len. + """ + + _windows: deque[set[MessageId]] = field(init=False, repr=False) + """Sliding window of message ID sets. + + Index 0 is the newest window. Each heartbeat, windows shift + right and a new empty window is prepended. + """ + + _by_id: dict[MessageId, CacheEntry] = field(init=False, default_factory=dict, repr=False) + """Message lookup index keyed by ID. + + Provides O(1) retrieval for IWANT responses. + """ + + def __post_init__(self) -> None: + """Initialize the sliding window structure.""" + self._windows = deque(maxlen=self.mcache_len) + self._windows.append(set()) + + def put(self, topic: TopicId, message: GossipsubMessage) -> bool: + """Add a message to the cache. + + Messages are added to the newest window (index 0) and + indexed for fast retrieval. Duplicates are ignored. + + Args: + topic: Topic this message belongs to. + message: Message to cache. + + Returns: + True if added (not a duplicate). + """ + msg_id = message.id + + if msg_id in self._by_id: + return False + + self._windows[0].add(msg_id) + self._by_id[msg_id] = CacheEntry(message=message, topic=topic) + return True + + def get(self, msg_id: MessageId) -> GossipsubMessage | None: + """Retrieve a message by ID. + + Used to respond to IWANT requests from peers. + + Args: + msg_id: Message ID to look up. + + Returns: + The cached message, or None if not found/evicted. + """ + entry = self._by_id.get(msg_id) + return entry.message if entry else None + + def has(self, msg_id: MessageId) -> bool: + """Check if a message is cached. + + Args: + msg_id: Message ID to check. + + Returns: + True if the message is in the cache. + """ + return msg_id in self._by_id + + def get_gossip_ids(self, topic: TopicId) -> list[MessageId]: + """Get message IDs for IHAVE gossip. + + Returns IDs from the most recent `mcache_gossip` windows + that belong to the specified topic. + + Args: + topic: Topic to filter messages by. + + Returns: + List of message IDs for IHAVE advertisement. + """ + result: list[MessageId] = [] + + windows_to_check = min(self.mcache_gossip, len(self._windows)) + + for i in range(windows_to_check): + for msg_id in self._windows[i]: + entry = self._by_id.get(msg_id) + if entry and entry.topic == topic: + result.append(msg_id) + + return result + + def shift(self) -> int: + """Shift the cache window, evicting the oldest. + + Called at each heartbeat to age the cache: + + 1. If at capacity, remove oldest window and its messages + 2. Prepend new empty window + + Returns: + Number of messages evicted. + """ + evicted = 0 + + if len(self._windows) >= self.mcache_len: + oldest = self._windows.pop() + for msg_id in oldest: + if msg_id in self._by_id: + del self._by_id[msg_id] + evicted += 1 + + self._windows.appendleft(set()) + + return evicted + + def clear(self) -> None: + """Clear all cached messages.""" + self._windows.clear() + self._windows.append(set()) + self._by_id.clear() + + def __len__(self) -> int: + """Return the total number of cached messages.""" + return len(self._by_id) + + def __contains__(self, msg_id: MessageId) -> bool: + """Check if a message ID is in the cache.""" + return msg_id in self._by_id + + +@dataclass(slots=True) +class SeenCache: + """TTL-based cache for deduplicating messages. + + Tracks message IDs that have been seen to prevent reprocessing + duplicates. Unlike `MessageCache`, this only stores IDs (not + full messages) with time-based expiry. + + Use Cases + --------- + + - Skip processing of already-seen messages + - Avoid forwarding duplicates to mesh peers + - Bound memory with automatic TTL cleanup + + Example:: + + seen = SeenCache(ttl_seconds=120) + + # Check and mark as seen + if seen.add(msg_id, current_time): + process_message(msg) # First time seeing this + else: + pass # Duplicate, skip + + # Periodic cleanup + removed = seen.cleanup(current_time) + """ + + ttl_seconds: int = 120 + """Time-to-live for entries in seconds. + + Entries older than this are removed during cleanup. + + Should be: + - long enough to cover network propagation, + - short enough to bound memory usage. + """ + + _seen: set[MessageId] = field(default_factory=set, repr=False) + """Set of message IDs that have been seen. + + Provides O(1) membership testing. + """ + + _timestamps: dict[MessageId, Timestamp] = field(default_factory=dict, repr=False) + """Timestamp when each message was first seen. + + Used to determine expiry during cleanup. + """ + + def add(self, msg_id: MessageId, timestamp: Timestamp) -> bool: + """Mark a message as seen. + + Args: + msg_id: Message ID to mark as seen. + timestamp: Current Unix timestamp. + + Returns: + True if newly seen (not a duplicate). + """ + if msg_id in self._seen: + return False + + self._seen.add(msg_id) + self._timestamps[msg_id] = timestamp + return True + + def has(self, msg_id: MessageId) -> bool: + """Check if a message has been seen. + + Args: + msg_id: Message ID to check. + + Returns: + True if the message has been seen. + """ + return msg_id in self._seen + + def cleanup(self, current_time: float) -> int: + """Remove expired entries. + + Should be called periodically (e.g., each heartbeat) + to prevent unbounded memory growth. + + Args: + current_time: Current Unix timestamp. + + Returns: + Number of entries removed. + """ + cutoff = current_time - self.ttl_seconds + expired = [msg_id for msg_id, ts in self._timestamps.items() if ts < cutoff] + + for msg_id in expired: + self._seen.discard(msg_id) + del self._timestamps[msg_id] + + return len(expired) + + def clear(self) -> None: + """Clear all seen entries.""" + self._seen.clear() + self._timestamps.clear() + + def __len__(self) -> int: + """Return the number of seen message IDs.""" + return len(self._seen) + + def __contains__(self, msg_id: MessageId) -> bool: + """Check if a message ID has been seen.""" + return msg_id in self._seen diff --git a/src/lean_spec/subspecs/networking/gossipsub/mesh.py b/src/lean_spec/subspecs/networking/gossipsub/mesh.py new file mode 100644 index 00000000..6d30547f --- /dev/null +++ b/src/lean_spec/subspecs/networking/gossipsub/mesh.py @@ -0,0 +1,358 @@ +""" +Gossipsub Mesh State +==================== + +Manages the mesh topology for gossipsub topics. + +Overview +-------- + +Each subscribed topic maintains a **mesh**: a set of peers for full +message exchange. The mesh is the core data structure enabling +gossipsub's eager push protocol. + +- **Mesh peers**: Exchange full messages immediately (eager push) +- **Non-mesh peers**: Receive IHAVE advertisements, request via IWANT (lazy pull) + +Mesh vs Fanout +-------------- + ++----------+----------------------------------------------------------+ +| Type | Description | ++==========+==========================================================+ +| Mesh | Peers for topics we subscribe to | ++----------+----------------------------------------------------------+ +| Fanout | Temporary peers for topics we publish to but don't | +| | subscribe to. Expires after fanout_ttl. | ++----------+----------------------------------------------------------+ + +Heartbeat Maintenance +--------------------- + +The mesh is maintained through periodic heartbeat: + +1. **Graft** if |mesh| < D_low: add peers up to D +2. **Prune** if |mesh| > D_high: remove peers down to D +3. **Gossip**: send IHAVE to D_lazy non-mesh peers + +References: +---------- +- Gossipsub v1.0: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md +""" + +from __future__ import annotations + +import random +import time +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from .types import PeerId, TopicId + +if TYPE_CHECKING: + from .parameters import GossipsubParameters + + +@dataclass(slots=True) +class FanoutEntry: + """Fanout state for a publish-only topic. + + Tracks peers used when publishing to topics we don't subscribe to. + Fanout entries expire after a period of inactivity (fanout_ttl). + + Unlike mesh peers, fanout peers only receive our published messages. + We don't receive their messages since we're not subscribed. + """ + + peers: set[PeerId] = field(default_factory=set) + """Peers in the fanout for this topic. + + Selected randomly from available topic peers, up to D peers. + """ + + last_published: float = 0.0 + """Unix timestamp of the last publish to this topic. + + Used to determine if the entry has expired. + """ + + def is_stale(self, current_time: float, ttl: float) -> bool: + """Check if this fanout entry has expired. + + Args: + current_time: Current Unix timestamp. + ttl: Time-to-live in seconds. + + Returns: + True if the entry hasn't been used within ttl seconds. + """ + return current_time - self.last_published > ttl + + +@dataclass(slots=True) +class TopicMesh: + """Mesh state for a single topic. + + Represents the set of peers we exchange full messages with + for a specific topic. Mesh membership is managed via + GRAFT and PRUNE control messages. + """ + + peers: set[PeerId] = field(default_factory=set) + """Peers in the mesh for this topic. + + These peers receive all published messages immediately + and forward all received messages to us. + """ + + def add_peer(self, peer_id: PeerId) -> bool: + """Add a peer to this topic's mesh. + + Args: + peer_id: Peer to add. + + Returns: + True if the peer was added, False if already present. + """ + if peer_id in self.peers: + return False + self.peers.add(peer_id) + return True + + def remove_peer(self, peer_id: PeerId) -> bool: + """Remove a peer from this topic's mesh. + + Args: + peer_id: Peer to remove. + + Returns: + True if the peer was removed, False if not present. + """ + if peer_id not in self.peers: + return False + self.peers.discard(peer_id) + return True + + +@dataclass(slots=True) +class MeshState: + """Complete mesh state for all subscribed topics. + + Central data structure managing mesh topology across all topics. + Provides operations for subscription management, peer tracking, + and gossip peer selection. + + Example:: + + state = MeshState(params=GossipsubParameters()) + + # Subscribe and build mesh + state.subscribe("blocks") + state.add_to_mesh("blocks", "peer1") + state.add_to_mesh("blocks", "peer2") + + # Get mesh peers for message forwarding + peers = state.get_mesh_peers("blocks") # {"peer1", "peer2"} + + # Select peers for IHAVE gossip + all_peers = {"peer1", "peer2", "peer3", "peer4"} + gossip_peers = state.select_peers_for_gossip("blocks", all_peers) + """ + + params: GossipsubParameters + """Gossipsub parameters controlling mesh behavior.""" + + _meshes: dict[TopicId, TopicMesh] = field(default_factory=dict, repr=False) + """Mesh state for each subscribed topic. Keyed by topic ID.""" + + _fanouts: dict[TopicId, FanoutEntry] = field(default_factory=dict, repr=False) + """Fanout state for publish-only topics. Keyed by topic ID.""" + + _subscriptions: set[TopicId] = field(default_factory=set, repr=False) + """Set of topics we are subscribed to.""" + + @property + def d(self) -> int: + """Target mesh size per topic.""" + return self.params.d + + @property + def d_low(self) -> int: + """Low watermark - graft when mesh is smaller.""" + return self.params.d_low + + @property + def d_high(self) -> int: + """High watermark - prune when mesh is larger.""" + return self.params.d_high + + @property + def d_lazy(self) -> int: + """Number of peers for IHAVE gossip.""" + return self.params.d_lazy + + def subscribe(self, topic: TopicId) -> None: + """Subscribe to a topic, initializing its mesh. + + If we have fanout peers for this topic, they are + promoted to the mesh automatically. + + Args: + topic: Topic identifier to subscribe to. + """ + if topic in self._subscriptions: + return + + self._subscriptions.add(topic) + + # Promote fanout peers to mesh if any + mesh = TopicMesh() + if topic in self._fanouts: + mesh.peers = self._fanouts[topic].peers.copy() + del self._fanouts[topic] + self._meshes[topic] = mesh + + def unsubscribe(self, topic: TopicId) -> set[PeerId]: + """Unsubscribe from a topic. + + Args: + topic: Topic identifier to unsubscribe from. + + Returns: + Set of peers that were in the mesh (need PRUNE). + """ + self._subscriptions.discard(topic) + mesh = self._meshes.pop(topic, None) + return mesh.peers if mesh else set() + + def is_subscribed(self, topic: TopicId) -> bool: + """Check if subscribed to a topic. + + Args: + topic: Topic identifier to check. + + Returns: + True if subscribed. + """ + return topic in self._subscriptions + + def get_mesh_peers(self, topic: TopicId) -> set[PeerId]: + """Get mesh peers for a topic. + + Args: + topic: Topic identifier. + + Returns: + Copy of the mesh peer set, or empty set if not subscribed. + """ + mesh = self._meshes.get(topic) + return mesh.peers.copy() if mesh else set() + + def add_to_mesh(self, topic: TopicId, peer_id: PeerId) -> bool: + """Add a peer to a topic's mesh. + + Args: + topic: Topic identifier. + peer_id: Peer to add. + + Returns: + True if added, False if already present or not subscribed. + """ + mesh = self._meshes.get(topic) + if mesh is None: + return False + return mesh.add_peer(peer_id) + + def remove_from_mesh(self, topic: TopicId, peer_id: PeerId) -> bool: + """Remove a peer from a topic's mesh. + + Args: + topic: Topic identifier. + peer_id: Peer to remove. + + Returns: + True if removed, False if not present or not subscribed. + """ + mesh = self._meshes.get(topic) + if mesh is None: + return False + return mesh.remove_peer(peer_id) + + def get_fanout_peers(self, topic: TopicId) -> set[PeerId]: + """Get fanout peers for a topic. + + Args: + topic: Topic identifier. + + Returns: + Copy of the fanout peer set, or empty set if none. + """ + fanout = self._fanouts.get(topic) + return fanout.peers.copy() if fanout else set() + + def update_fanout(self, topic: TopicId, available_peers: set[PeerId]) -> set[PeerId]: + """Update fanout for publishing to a non-subscribed topic. + + For subscribed topics, returns mesh peers instead. + + Args: + topic: Topic identifier. + available_peers: All known peers for this topic. + + Returns: + Peers to publish to (mesh or fanout). + """ + if topic in self._subscriptions: + return self.get_mesh_peers(topic) + + fanout = self._fanouts.get(topic) + if fanout is None: + fanout = FanoutEntry() + self._fanouts[topic] = fanout + + fanout.last_published = time.time() + + # Fill fanout up to D peers + if len(fanout.peers) < self.d: + candidates = available_peers - fanout.peers + needed = self.d - len(fanout.peers) + new_peers = random.sample(list(candidates), min(needed, len(candidates))) + fanout.peers.update(new_peers) + + return fanout.peers.copy() + + def cleanup_fanouts(self, ttl: float) -> int: + """Remove expired fanout entries. + + Args: + ttl: Time-to-live in seconds. + + Returns: + Number of entries removed. + """ + current_time = time.time() + stale = [t for t, f in self._fanouts.items() if f.is_stale(current_time, ttl)] + for topic in stale: + del self._fanouts[topic] + return len(stale) + + def select_peers_for_gossip(self, topic: TopicId, all_topic_peers: set[PeerId]) -> list[PeerId]: + """Select non-mesh peers for IHAVE gossip. + + Randomly selects up to D_lazy peers from those not in the mesh. + These peers receive IHAVE messages during heartbeat. + + Args: + topic: Topic identifier. + all_topic_peers: All known peers subscribed to this topic. + + Returns: + List of peers to send IHAVE gossip to. + """ + mesh_peers = self.get_mesh_peers(topic) + candidates = list(all_topic_peers - mesh_peers) + + if len(candidates) <= self.d_lazy: + return candidates + + return random.sample(candidates, self.d_lazy) diff --git a/src/lean_spec/subspecs/networking/gossipsub/message.py b/src/lean_spec/subspecs/networking/gossipsub/message.py index fde3fec1..a0eca077 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/message.py +++ b/src/lean_spec/subspecs/networking/gossipsub/message.py @@ -1,104 +1,239 @@ """ -Gossipsub protocol +Gossipsub Message +================= -- Message ID computation based on topic and message data, with snappy decompression handling. +Message representation and ID computation for the gossipsub protocol. + +Overview +-------- + +Each gossipsub message carries a topic and payload. Messages are +identified by a 20-byte ID computed from their contents. + +Message ID Function +------------------- + +Ethereum consensus uses a custom message ID function based on SHA256:: + + message_id = SHA256(domain + uint64_le(len(topic)) + topic + data)[:20] + +**Components:** + ++-----------------+--------------------------------------------------------+ +| Component | Description | ++=================+========================================================+ +| domain | 1-byte prefix indicating snappy validity (0x00/0x01) | ++-----------------+--------------------------------------------------------+ +| uint64_le | Topic length as 8-byte little-endian integer | ++-----------------+--------------------------------------------------------+ +| topic | Topic string as UTF-8 bytes | ++-----------------+--------------------------------------------------------+ +| data | Message payload (decompressed if snappy is valid) | ++-----------------+--------------------------------------------------------+ + +**Domain Bytes:** + +- ``0x01`` (VALID_SNAPPY): Snappy decompression succeeded, use decompressed data +- ``0x00`` (INVALID_SNAPPY): Decompression failed or no decompressor, use raw data + +This ensures messages with compression issues get different IDs, +preventing cache pollution from invalid variants. + +Snappy Compression +------------------ + +Ethereum consensus requires SSZ data to be snappy-compressed. +The message ID computation attempts decompression to determine +which domain byte to use. + +References: +---------- +- `Ethereum P2P spec `_ +- `Gossipsub v1.0 `_ """ -import hashlib -from typing import Annotated, Callable, Optional +from __future__ import annotations -from pydantic import Field +import hashlib +from dataclasses import dataclass, field +from typing import Protocol, runtime_checkable from lean_spec.subspecs.networking.config import ( MESSAGE_DOMAIN_INVALID_SNAPPY, MESSAGE_DOMAIN_VALID_SNAPPY, ) +from lean_spec.types import Bytes20 -MessageId = Annotated[bytes, Field(min_length=20, max_length=20)] -"""A 20-byte ID for gossipsub messages.""" +from .types import MessageId +# ============================================================================= +# Snappy Decompressor Protocol +# ============================================================================= -class GossipsubMessage: - """ - Represents a gossipsub message and manages its ID computation. - This class encapsulates the topic, data, and the logic to generate a - message ID, correctly handling snappy decompression. The generated ID is - cached for efficiency. +@runtime_checkable +class SnappyDecompressor(Protocol): + """Protocol for snappy decompression functions. + + Any callable matching this signature can be used for decompression. + The function should raise an exception if decompression fails. + + Example:: + + import snappy + + # Using python-snappy library + msg = GossipsubMessage( + topic=b"/leanconsensus/...", + raw_data=compressed_data, + snappy_decompress=snappy.decompress, + ) """ - def __init__( - self, - topic: bytes, - data: bytes, - snappy_decompress: Optional[Callable[[bytes], bytes]] = None, - ): - """ - Initializes the message. + def __call__(self, data: bytes) -> bytes: + """Decompress snappy-compressed data. Args: - topic: The topic byte string. - data: The raw message data. - snappy_decompress: Optional snappy decompression function. + data: Compressed bytes. + + Returns: + Decompressed bytes. + + Raises: + Exception: If decompression fails. """ - self.topic: bytes = topic - self.raw_data: bytes = data - self._snappy_decompress = snappy_decompress - # Cache for the computed ID - self._id: Optional[MessageId] = None + ... + + +# ============================================================================= +# Gossipsub Message +# ============================================================================= + + +@dataclass(slots=True) +class GossipsubMessage: + r"""A gossipsub message with lazy ID computation. + + Encapsulates topic, payload, and message ID logic. The ID is + computed lazily on first access and cached thereafter. + + Message ID Computation + ---------------------- + + The 20-byte ID is computed as:: + + SHA256(domain + uint64_le(len(topic)) + topic + data)[:20] + + Where `domain` depends on snappy decompression success. + """ + + topic: bytes + """Topic string as UTF-8 encoded bytes. + + Example: ``b"/leanconsensus/0x12345678/block/ssz_snappy"`` + """ + + raw_data: bytes + """Raw message payload. + + Typically snappy-compressed SSZ data. The actual content + depends on the topic (block, attestation, etc.). + """ + + snappy_decompress: SnappyDecompressor | None = field(default=None, repr=False) + """Optional snappy decompression function. + + If provided, decompression is attempted during ID computation + to determine the domain byte. Pass `snappy.decompress` from + the python-snappy library, or any compatible callable. + """ + + _cached_id: MessageId | None = field( + default=None, init=False, repr=False, compare=False, hash=False + ) + """Cached message ID. + + Computed lazily on first access to `id` property. Once computed, + the same ID is returned for all subsequent accesses. + """ @property def id(self) -> MessageId: - """ - Computes and returns the 20-byte message ID. + """Get the 20-byte message ID. + + Computed lazily on first access using the Ethereum consensus + message ID function. The result is cached. - The ID is computed on first access and then cached. The computation - logic depends on whether the message data can be successfully - decompressed with snappy. + Returns: + 20-byte message ID (Bytes20). """ - # Return the cached ID if it's already been computed - if self._id is not None: - return self._id + if self._cached_id is None: + self._cached_id = self.compute_id(self.topic, self.raw_data, self.snappy_decompress) + return self._cached_id + + @staticmethod + def compute_id( + topic: bytes, + data: bytes, + snappy_decompress: SnappyDecompressor | None = None, + ) -> MessageId: + """Compute a 20-byte message ID from raw data. - # Determine domain and data based on snappy decompression - if self._snappy_decompress: + Implements the Ethereum consensus message ID function:: + + SHA256(domain + uint64_le(len(topic)) + topic + data)[:20] + + Domain Selection + ---------------- + + - If `snappy_decompress` is provided and succeeds: + domain = 0x01, use decompressed data + - Otherwise: + domain = 0x00, use raw data + + Args: + topic: Topic string as bytes. + data: Message payload (potentially compressed). + snappy_decompress: Optional decompression function. + + Returns: + 20-byte message ID. + + Example:: + + msg_id = GossipsubMessage.compute_id( + topic=b"/leanconsensus/0x12345678/block/ssz_snappy", + data=block_bytes, + snappy_decompress=snappy.decompress, + ) + """ + if snappy_decompress is not None: try: - # Try to decompress the data with snappy - decompressed_data = self._snappy_decompress(self.raw_data) - # Valid snappy decompression - use valid domain - domain, data_for_hash = ( - MESSAGE_DOMAIN_VALID_SNAPPY, - decompressed_data, - ) + data_for_hash = snappy_decompress(data) + domain = MESSAGE_DOMAIN_VALID_SNAPPY except Exception: - # Invalid snappy decompression - use invalid domain - domain, data_for_hash = ( - MESSAGE_DOMAIN_INVALID_SNAPPY, - self.raw_data, - ) + data_for_hash = data + domain = MESSAGE_DOMAIN_INVALID_SNAPPY else: - # No decompressor provided - use invalid domain - domain, data_for_hash = ( - MESSAGE_DOMAIN_INVALID_SNAPPY, - self.raw_data, - ) + data_for_hash = data + domain = MESSAGE_DOMAIN_INVALID_SNAPPY - # Compute the raw ID bytes and assign with proper type annotation - self._id: MessageId = self._compute_raw_id(domain, data_for_hash) - return self._id + preimage = bytes(domain) + len(topic).to_bytes(8, "little") + topic + data_for_hash - def _compute_raw_id(self, domain: bytes, message_data: bytes) -> bytes: - """ - Computes SHA256(domain + uint64_le(len(topic)) + topic + message_data)[:20]. + return Bytes20(hashlib.sha256(preimage).digest()[:20]) - Args: - domain: The 4-byte domain for message-id isolation. - message_data: The message data (either decompressed or raw). + @property + def topic_str(self) -> str: + """Get the topic as a UTF-8 string. Returns: - A 20-byte raw bytes digest. + Topic decoded from bytes to string. + """ + return self.topic.decode("utf-8") + + def __hash__(self) -> int: + """Hash based on message ID. + + Allows messages to be used in sets and as dict keys. """ - # Concatenate all components: domain + topic_len + topic + data - data_to_hash = domain + len(self.topic).to_bytes(8, "little") + self.topic + message_data - # Compute SHA256 and take the first 20 bytes - return hashlib.sha256(data_to_hash).digest()[:20] + return hash(self.id) diff --git a/src/lean_spec/subspecs/networking/gossipsub/parameters.py b/src/lean_spec/subspecs/networking/gossipsub/parameters.py index 1246e2ff..6717cb5c 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/parameters.py +++ b/src/lean_spec/subspecs/networking/gossipsub/parameters.py @@ -1,55 +1,171 @@ -"""Gossipsub parameters for the Lean Ethereum consensus specification.""" +""" +Gossipsub Parameters +==================== + +Configuration parameters controlling gossipsub mesh behavior. + +Overview +-------- + +Gossipsub maintains a mesh of peers for each subscribed topic. +These parameters tune the mesh size, timing, and caching behavior. + +Parameter Categories +-------------------- + +**Mesh Degree (D parameters):** + +Controls how many peers are in the mesh for each topic. + +:: + + D_low <= D <= D_high + + D Target mesh size (8 for Ethereum) + D_low Minimum before grafting new peers (6) + D_high Maximum before pruning excess peers (12) + D_lazy Peers to gossip IHAVE messages to (6) + +**Timing:** + +:: + + heartbeat_interval Mesh maintenance frequency (0.7s for Ethereum) + fanout_ttl How long to keep fanout peers (60s) + +**Caching:** + +:: + + mcache_len Total history windows kept (6) + mcache_gossip Windows included in IHAVE gossip (3) + seen_ttl Duplicate detection window + +Ethereum Values +--------------- + +The Ethereum consensus layer specifies: + +- D = 8, D_low = 6, D_high = 12, D_lazy = 6 +- Heartbeat = 700ms (0.7s) +- Message cache = 6 windows, gossip last 3 + +References: +---------- +- Ethereum P2P spec: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md +- Gossipsub v1.0: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.0.md +- Gossipsub v1.2: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.2.md +""" + +from __future__ import annotations from lean_spec.subspecs.chain.config import DEVNET_CONFIG from lean_spec.types import StrictBaseModel class GossipsubParameters(StrictBaseModel): - """A model holding the canonical gossipsub parameters.""" + """Core gossipsub configuration. + + Defines the mesh topology and timing parameters. + + Default values follow the Ethereum consensus P2P specification. + """ - protocol_id: str = "/meshsub/1.0.0" - """The protocol ID for gossip messages.""" + # ------------------------------------------------------------------------- + # Mesh Degree Parameters + # ------------------------------------------------------------------------- d: int = 8 - """The target number of peers for a stable gossip mesh topic.""" + """Target number of mesh peers per topic. - d_low: int = 6 + The heartbeat procedure adjusts the mesh toward this size: + + - If |mesh| < D_low: graft peers up to D + - If |mesh| > D_high: prune peers down to D """ - The low watermark for the number of peers in a stable gossip mesh topic. + + d_low: int = 6 + """Minimum mesh peers before grafting. + + When mesh size drops below this threshold, the heartbeat + will graft new peers to reach the target D. """ d_high: int = 12 - """ - The high watermark for the number of peers in a stable gossip mesh topic. + """Maximum mesh peers before pruning. + + When mesh size exceeds this threshold, the heartbeat + will prune excess peers down to the target D. """ d_lazy: int = 6 - """The target number of peers for gossip-only connections.""" + """Number of non-mesh peers for IHAVE gossip. + + During heartbeat, IHAVE messages are sent to this many + randomly selected peers outside the mesh. This enables + the lazy pull protocol for reliability. + """ + + # ------------------------------------------------------------------------- + # Timing Parameters + # ------------------------------------------------------------------------- heartbeat_interval_secs: float = 0.7 - """The frequency of the gossipsub heartbeat in seconds.""" + """Interval between heartbeat ticks in seconds. + + The heartbeat procedure runs periodically to: + + - Maintain mesh size (graft/prune) + - Send IHAVE gossip to non-mesh peers + - Clean up stale fanout entries + - Shift the message cache window + """ fanout_ttl_secs: int = 60 - """The time-to-live for fanout maps in seconds.""" + """Time-to-live for fanout entries in seconds. + + Fanout peers are used when publishing to topics we don't + subscribe to. Entries expire after this duration of + inactivity to free resources. + """ + + # ------------------------------------------------------------------------- + # Message Cache Parameters + # ------------------------------------------------------------------------- mcache_len: int = 6 - """The number of history windows to retain full messages in the cache.""" + """Total number of history windows in the message cache. + + Messages are stored for this many heartbeat intervals. + After mcache_len heartbeats, messages are evicted. + """ mcache_gossip: int = 3 - """The number of history windows to gossip about.""" + """Number of recent windows included in IHAVE gossip. + + Only messages from the most recent mcache_gossip windows + are advertised via IHAVE. Older cached messages can still + be retrieved via IWANT but won't be actively gossiped. + """ seen_ttl_secs: int = ( int(DEVNET_CONFIG.seconds_per_slot) * int(DEVNET_CONFIG.justification_lookback_slots) * 2 ) - """ - The expiry time in seconds for the cache of seen message IDs. + """Time-to-live for seen message IDs in seconds. - This is calculated as SECONDS_PER_SLOT * JUSTIFICATION_LOOKBACK_SLOTS * 2. + Message IDs are tracked to detect duplicates. This should + be long enough to cover network propagation delays but + short enough to bound memory usage. """ - validation_mode: str = "strict_no_sign" - """The message validation mode. `strict_no_sign` requires the author, - sequence number and signature fields of a message to be empty. Any message - that contains these fields is considered invalid. In some libp2p - implementations, this mode is also known as Anonymous mode. + # ------------------------------------------------------------------------- + # IDONTWANT Optimization (v1.2) + # ------------------------------------------------------------------------- + + idontwant_message_size_threshold: int = 1000 + """Minimum message size in bytes to trigger IDONTWANT. + + When receiving a message larger than this threshold, + immediately send IDONTWANT to mesh peers to prevent + redundant transmissions. Set to 1KB by default. """ diff --git a/src/lean_spec/subspecs/networking/gossipsub/topic.py b/src/lean_spec/subspecs/networking/gossipsub/topic.py index 5d651036..0bb2040b 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/topic.py +++ b/src/lean_spec/subspecs/networking/gossipsub/topic.py @@ -1,45 +1,267 @@ -"""Gossipsub topics""" +""" +Gossipsub Topics +================ +Topic definitions for the Lean Ethereum gossipsub network. + +Overview +-------- + +Gossipsub organizes messages by topic. Each topic identifies a specific +message type (blocks, attestations, etc.) within a specific fork. + +Topic Format +------------ + +Topics follow a structured format:: + + /{prefix}/{fork_digest}/{topic_name}/{encoding} + + Example: /leanconsensus/0x12345678/block/ssz_snappy + +**Components:** + ++----------------+----------------------------------------------------------+ +| Component | Description | ++================+==========================================================+ +| prefix | Network identifier (`leanconsensus`) | ++----------------+----------------------------------------------------------+ +| fork_digest | 4-byte fork identifier as hex (`0x12345678`) | ++----------------+----------------------------------------------------------+ +| topic_name | Message type (`block`, `attestation`) | ++----------------+----------------------------------------------------------+ +| encoding | Serialization format (always `ssz_snappy`) | ++----------------+----------------------------------------------------------+ + +Fork Digest +----------- + +The fork digest ensures peers on different forks don't exchange +incompatible messages. It's derived from the fork version and +genesis validators root. + +Topic Types +----------- + ++----------------+----------------------------------------------------------+ +| Topic | Content | ++================+==========================================================+ +| block | Signed beacon blocks | ++----------------+----------------------------------------------------------+ +| attestation | Signed attestations | ++----------------+----------------------------------------------------------+ + +References: +---------- +- Ethereum P2P: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md +""" + +from __future__ import annotations + +from dataclasses import dataclass from enum import Enum -from typing import Any, Type -from lean_spec.subspecs.containers.attestation import SignedAttestation -from lean_spec.subspecs.containers.block.block import SignedBlockWithAttestation +TOPIC_PREFIX: str = "leanconsensus" +"""Network prefix for Lean consensus gossip topics. + +Identifies this network in topic strings. Different networks +(mainnet, testnets) may use different prefixes. +""" + +ENCODING_POSTFIX: str = "ssz_snappy" +"""Encoding suffix for SSZ with Snappy compression. + +All Ethereum consensus gossip messages use SSZ serialization +with Snappy compression. +""" + +BLOCK_TOPIC_NAME: str = "block" +"""Topic name for block messages. +Used in the topic string to identify signed beacon block messages. +""" -class GossipsubTopic(Enum): +ATTESTATION_TOPIC_NAME: str = "attestation" +"""Topic name for attestation messages. + +Used in the topic string to identify signed attestation messages. +""" + + +class TopicKind(Enum): + """Gossip topic types. + + Enumerates the different message types that can be gossiped. + + Each variant corresponds to a specific `topic_name` in the + topic string format. """ - Enumerates gossip topics, bundling a topic's name with its payload type. - Attributes: - value (str): The network name of the topic (e.g., "block"). - payload_type (Type): The class representing the data structure - of the topic's message (e.g., `SignedBlock`). + BLOCK = BLOCK_TOPIC_NAME + """Signed beacon block messages.""" + + ATTESTATION = ATTESTATION_TOPIC_NAME + """Signed attestation messages.""" + + def __str__(self) -> str: + """Return the topic name string.""" + return self.value + + +@dataclass(frozen=True, slots=True) +class GossipTopic: + """A fully-qualified gossipsub topic. + + Immutable representation of a topic that combines the message type + and fork digest. Can be converted to/from the string format. """ - def __init__(self, value: str, payload_type: Type[Any]): + kind: TopicKind + """The topic type (block, attestation, etc.). + + Determines what kind of messages are exchanged on this topic. + """ + + fork_digest: str + """Fork digest as 0x-prefixed hex string. + + Identifies the fork this topic belongs to. + + Peers must match on fork digest to exchange messages on a topic. + """ + + def __str__(self) -> str: + """Return the full topic string. + + Returns: + Topic in format `/{prefix}/{fork}/{name}/{encoding}` + """ + return f"/{TOPIC_PREFIX}/{self.fork_digest}/{self.kind}/{ENCODING_POSTFIX}" + + def __bytes__(self) -> bytes: + """Return the topic string as UTF-8 bytes. + + Returns: + Topic string encoded as bytes. """ - Initializes the GossipTopic. + return str(self).encode("utf-8") + + @classmethod + def from_string(cls, topic_str: str) -> GossipTopic: + """Parse a topic string into a GossipTopic. Args: - value: The topic in string. - payload_type: The associated gossip. + topic_str: Full topic string to parse. + + Returns: + Parsed GossipTopic instance. + + Raises: + ValueError: If the topic string is malformed. + + Example:: + + topic = GossipTopic.from_string("/leanconsensus/0x12345678/block/ssz_snappy") """ - self._value_ = value - self.payload_type = payload_type + parts = topic_str.lstrip("/").split("/") - BLOCK = ("block", SignedBlockWithAttestation) - """ - Topic for gossiping new blocks. + if len(parts) != 4: + raise ValueError(f"Invalid topic format: expected 4 parts, got {len(parts)}") - - `value`: "block" - - `payload_type`: `SignedBlockWithAttestation` - """ + prefix, fork_digest, topic_name, encoding = parts + + if prefix != TOPIC_PREFIX: + raise ValueError(f"Invalid prefix: expected '{TOPIC_PREFIX}', got '{prefix}'") + + if encoding != ENCODING_POSTFIX: + raise ValueError(f"Invalid encoding: expected '{ENCODING_POSTFIX}', got '{encoding}'") + + try: + kind = TopicKind(topic_name) + except ValueError: + raise ValueError(f"Unknown topic: '{topic_name}'") from None + + return cls(kind=kind, fork_digest=fork_digest) + + @classmethod + def block(cls, fork_digest: str) -> GossipTopic: + """Create a block topic for the given fork. + + Args: + fork_digest: Fork digest as 0x-prefixed hex string. + + Returns: + GossipTopic for block messages. + """ + return cls(kind=TopicKind.BLOCK, fork_digest=fork_digest) + + @classmethod + def attestation(cls, fork_digest: str) -> GossipTopic: + """Create an attestation topic for the given fork. - ATTESTATION = ("attestation", SignedAttestation) + Args: + fork_digest: Fork digest as 0x-prefixed hex string. + + Returns: + GossipTopic for attestation messages. + """ + return cls(kind=TopicKind.ATTESTATION, fork_digest=fork_digest) + + +def format_topic_string( + topic_name: str, + fork_digest: str, + prefix: str = TOPIC_PREFIX, + encoding: str = ENCODING_POSTFIX, +) -> str: + """Format a complete gossip topic string. + + Low-level function for constructing topic strings. For most cases, + use `GossipTopic` instead. + + Args: + topic_name: Message type (e.g., "block", "attestation"). + fork_digest: Fork digest as 0x-prefixed hex string. + prefix: Network prefix (defaults to TOPIC_PREFIX). + encoding: Encoding suffix (defaults to ENCODING_POSTFIX). + + Returns: + Formatted topic string. + + Example:: + + topic_str = format_topic_string("block", "0x12345678") + assert topic_str == "/leanconsensus/0x12345678/block/ssz_snappy" """ - Topic for gossiping new attestations. + return f"/{prefix}/{fork_digest}/{topic_name}/{encoding}" - - `value`: "attestation" - - `payload_type`: `SignedAttestation` + +def parse_topic_string(topic_str: str) -> tuple[str, str, str, str]: + """Parse a topic string into its components. + + Low-level function for deconstructing topic strings. For most cases, + use ``GossipTopic.from_string()`` instead. + + Args: + topic_str: Topic string to parse. + + Returns: + Tuple of (prefix, fork_digest, topic_name, encoding). + + Raises: + ValueError: If the topic string is malformed. + + Example:: + + prefix, fork, name, enc = parse_topic_string("/leanconsensus/0x12345678/block/ssz_snappy") + assert prefix == "leanconsensus" + assert fork == "0x12345678" + assert name == "block" + assert enc == "ssz_snappy" """ + parts = topic_str.lstrip("/").split("/") + + if len(parts) != 4: + raise ValueError(f"Invalid topic format: expected 4 parts, got {len(parts)}") + + return (parts[0], parts[1], parts[2], parts[3]) diff --git a/src/lean_spec/subspecs/networking/gossipsub/types.py b/src/lean_spec/subspecs/networking/gossipsub/types.py new file mode 100644 index 00000000..f764b8d2 --- /dev/null +++ b/src/lean_spec/subspecs/networking/gossipsub/types.py @@ -0,0 +1,45 @@ +"""Gossipsub Type Definitions""" + +from __future__ import annotations + +from typing import TypeAlias + +from lean_spec.types import Bytes20 + +MessageId: TypeAlias = Bytes20 +"""20-byte message identifier. + +Computed from message contents using SHA256:: + + SHA256(domain + uint64_le(len(topic)) + topic + data)[:20] + +The domain byte distinguishes valid/invalid snappy compression. +""" + + +PeerId: TypeAlias = str +"""Libp2p peer identifier. + +Derived from the peer's public key as a base58-encoded multihash. +Uniquely identifies peers in the P2P network. +""" + + +TopicId: TypeAlias = str +"""Topic string identifier. + +Follows the Ethereum consensus format:: + + /{prefix}/{fork_digest}/{topic_name}/{encoding} +""" + + +Timestamp: TypeAlias = float +"""Unix timestamp in seconds since epoch. + +Used for: + +- Message arrival times +- Peer activity tracking +- Seen cache expiry +""" diff --git a/src/lean_spec/types/__init__.py b/src/lean_spec/types/__init__.py index 6dcbb36f..02e14798 100644 --- a/src/lean_spec/types/__init__.py +++ b/src/lean_spec/types/__init__.py @@ -4,7 +4,7 @@ from .basispt import BasisPoint from .bitfields import BaseBitlist from .boolean import Boolean -from .byte_arrays import ZERO_HASH, Bytes32, Bytes52, Bytes3116 +from .byte_arrays import ZERO_HASH, Bytes20, Bytes32, Bytes52, Bytes3116 from .collections import SSZList, SSZVector from .container import Container from .exceptions import ( @@ -22,6 +22,7 @@ "BaseBitlist", "Uint64", "BasisPoint", + "Bytes20", "Bytes32", "Bytes52", "Bytes3116", diff --git a/src/lean_spec/types/byte_arrays.py b/src/lean_spec/types/byte_arrays.py index d2bccc3d..29d9616c 100644 --- a/src/lean_spec/types/byte_arrays.py +++ b/src/lean_spec/types/byte_arrays.py @@ -209,6 +209,12 @@ class Bytes8(BaseBytes): LENGTH = 8 +class Bytes20(BaseBytes): + """Fixed-size byte array of exactly 20 bytes.""" + + LENGTH = 20 + + class Bytes32(BaseBytes): """Fixed-size byte array of exactly 32 bytes.""" diff --git a/tests/lean_spec/subspecs/networking/test_gossipsub.py b/tests/lean_spec/subspecs/networking/test_gossipsub.py index 30808a42..3d57c6f2 100644 --- a/tests/lean_spec/subspecs/networking/test_gossipsub.py +++ b/tests/lean_spec/subspecs/networking/test_gossipsub.py @@ -7,9 +7,23 @@ MESSAGE_DOMAIN_VALID_SNAPPY, ) from lean_spec.subspecs.networking.gossipsub import ( + ControlMessage, + FanoutEntry, GossipsubMessage, GossipsubParameters, - MessageId, + GossipTopic, + Graft, + IDontWant, + IHave, + IWant, + MeshState, + MessageCache, + Prune, + SeenCache, + TopicKind, + TopicMesh, + format_topic_string, + parse_topic_string, ) @@ -17,30 +31,24 @@ class TestGossipsubParameters: """Test suite for GossipSub protocol parameters.""" def test_default_parameters(self) -> None: - """ - Test default GossipSub parameters and their relationships. - - Validates that default parameter values maintain expected relationships - for proper protocol operation and that all values are positive. - """ + """Test default GossipSub parameters.""" params = GossipsubParameters() - # Test logical relationships + # Test Ethereum spec values + assert params.d == 8 + assert params.d_low == 6 + assert params.d_high == 12 + assert params.d_lazy == 6 + assert params.heartbeat_interval_secs == 0.7 + assert params.fanout_ttl_secs == 60 + assert params.mcache_len == 6 + assert params.mcache_gossip == 3 + + # Test relationships assert params.d_low < params.d < params.d_high assert params.d_lazy <= params.d assert params.mcache_gossip <= params.mcache_len - # Test all timing/count parameters are positive - positive_params = [ - params.heartbeat_interval_secs, - params.fanout_ttl_secs, - params.seen_ttl_secs, - params.mcache_len, - params.mcache_gossip, - ] - for param in positive_params: - assert param > 0 - class TestGossipsubMessage: """Test suite for GossipSub message handling and ID computation.""" @@ -48,25 +56,15 @@ class TestGossipsubMessage: @pytest.mark.parametrize( "has_snappy,decompress_succeeds,expected_domain", [ - # No decompressor (False, False, MESSAGE_DOMAIN_INVALID_SNAPPY), - # Valid decompression (True, True, MESSAGE_DOMAIN_VALID_SNAPPY), - # Failed decompression (True, False, MESSAGE_DOMAIN_INVALID_SNAPPY), ], ) def test_message_id_computation( self, has_snappy: bool, decompress_succeeds: bool, expected_domain: bytes ) -> None: - """ - Test message ID computation across different snappy scenarios. - - Args: - has_snappy: Whether to provide a snappy decompressor. - decompress_succeeds: Whether decompression should succeed. - expected_domain: Expected domain bytes for ID computation. - """ + """Test message ID computation across different snappy scenarios.""" topic = b"test_topic" raw_data = b"raw_test_data" decompressed_data = b"decompressed_test_data" @@ -85,35 +83,17 @@ def snappy_decompress(data: bytes) -> bytes: message = GossipsubMessage(topic, raw_data, snappy_decompress) message_id = message.id - # Should always be exactly 20 bytes assert len(message_id) == 20 assert isinstance(message_id, bytes) - # Test deterministic behavior - same inputs should produce same ID + # Test determinism message2 = GossipsubMessage(topic, raw_data, snappy_decompress) assert message_id == message2.id - # Test that snappy success/failure affects the ID - if has_snappy: - # Create message without snappy - should produce different ID if decompression succeeded - msg_no_snappy = GossipsubMessage(topic, raw_data, None) - if decompress_succeeds: - assert message_id != msg_no_snappy.id # Different domains - else: - # Both use invalid domain, but this tests the flow works - assert len(msg_no_snappy.id) == 20 - def test_message_id_caching(self) -> None: - """ - Test that message IDs are cached and deterministic. - - Verifies caching behavior and that identical messages always - produce the same ID across multiple instantiations. - """ + """Test that message IDs are cached.""" topic = b"test_topic" data = b"test_data" - - # Test caching within single message decompress_calls = 0 def counting_decompress(data: bytes) -> bytes: @@ -125,153 +105,315 @@ def counting_decompress(data: bytes) -> bytes: first_id = message.id second_id = message.id - assert decompress_calls == 1 # Called only once (cached) - assert first_id == second_id - assert first_id is second_id # Same object reference - - # Test deterministic behavior across different message instances - message2 = GossipsubMessage(topic, data) - message3 = GossipsubMessage(topic, data) - - assert message2.id == message3.id - - @pytest.mark.parametrize( - "topic,data,description", - [ - (b"", b"", "empty topic and data"), - (b"topic", b"data1", "basic case 1"), - (b"topic", b"data2", "basic case 2"), - (b"topic1", b"data", "different topic"), - (b"topic2", b"data", "different topic"), - (b"x" * 1000, b"y" * 5000, "large inputs"), - (b"\x00\xff\x01\xfe", bytes(range(16)), "binary data"), - ], - ) - def test_message_id_edge_cases(self, topic: bytes, data: bytes, description: str) -> None: - """ - Test message ID computation across various edge cases and input sizes. - - Parametrized test ensuring the algorithm works correctly with: - - Empty inputs - - Different topics/data combinations - - Large inputs - - Binary data with null bytes and non-UTF-8 sequences - - Args: - topic: Topic bytes to test. - data: Data bytes to test. - description: Description of the test case. - """ - message = GossipsubMessage(topic, data) - message_id = message.id - - # Should always produce exactly 20-byte ID - assert len(message_id) == 20 - assert isinstance(message_id, bytes) - - # Test deterministic behavior - same inputs produce same ID - message2 = GossipsubMessage(topic, data) - assert message_id == message2.id - - def test_message_uniqueness_and_collision_resistance(self) -> None: - """ - Test message ID uniqueness and collision resistance. + assert decompress_calls == 1 # Called only once + assert first_id is second_id - Ensures different inputs produce different outputs and tests - resistance to common collision attack patterns. - """ - # Test cases designed to catch collision vulnerabilities + def test_message_uniqueness(self) -> None: + """Test message ID uniqueness.""" test_cases = [ - # Basic different inputs (b"topic1", b"data"), (b"topic2", b"data"), (b"topic", b"data1"), (b"topic", b"data2"), - # Topic/data swapping - (b"abc", b"def"), - (b"def", b"abc"), - # Length-based attacks - (b"ab", b"cd"), - (b"a", b"bcd"), - # Null byte insertion - (b"topic", b"data"), - (b"top\x00ic", b"data"), ] messages = [GossipsubMessage(topic, data) for topic, data in test_cases] ids = [msg.id for msg in messages] - # All IDs should be unique (no collisions) assert len(ids) == len(set(ids)) - # All should be 20 bytes - for msg_id in ids: - assert len(msg_id) == 20 +class TestControlMessages: + """Test suite for gossipsub control messages.""" -class TestMessageIdType: - """Test suite for MessageId type validation.""" + def test_graft_creation(self) -> None: + """Test GRAFT message creation.""" + graft = Graft(topic_id="test_topic") + assert graft.topic_id == "test_topic" - def test_message_id_pydantic_validation(self) -> None: - """ - Test MessageId validation in Pydantic models. + def test_prune_creation(self) -> None: + """Test PRUNE message creation.""" + prune = Prune(topic_id="test_topic") + assert prune.topic_id == "test_topic" - The MessageId type annotation includes Pydantic field constraints - that enforce 20-byte length when used in models. - """ - from pydantic import BaseModel, ValidationError + def test_ihave_creation(self) -> None: + """Test IHAVE message creation.""" + from lean_spec.types import Bytes20 - class TestModel(BaseModel): - msg_id: MessageId + msg_ids = [Bytes20(b"12345678901234567890"), Bytes20(b"abcdefghijklmnopqrst")] + ihave = IHave(topic_id="test_topic", message_ids=msg_ids) - # Valid 20-byte ID should work - valid_model = TestModel(msg_id=b"12345678901234567890") - assert len(valid_model.msg_id) == 20 + assert ihave.topic_id == "test_topic" + assert len(ihave.message_ids) == 2 - # Invalid lengths should raise ValidationError - invalid_cases = [ - (b"", "empty bytes"), - (b"short", "too short"), - (b"too_long_message_id_bytes", "too long"), - ] + def test_iwant_creation(self) -> None: + """Test IWANT message creation.""" + from lean_spec.types import Bytes20 - for invalid_id, _case_desc in invalid_cases: - with pytest.raises(ValidationError, match=".*"): - TestModel(msg_id=invalid_id) + msg_ids = [Bytes20(b"12345678901234567890")] + iwant = IWant(message_ids=msg_ids) + assert len(iwant.message_ids) == 1 -class TestGossipsubIntegration: - """Integration tests for complete GossipSub workflows.""" + def test_idontwant_creation(self) -> None: + """Test IDONTWANT message creation (v1.2).""" + from lean_spec.types import Bytes20 + + msg_ids = [Bytes20(b"12345678901234567890")] + idontwant = IDontWant(message_ids=msg_ids) + + assert len(idontwant.message_ids) == 1 + + def test_control_message_aggregation(self) -> None: + """Test aggregated control message container.""" + graft = Graft(topic_id="topic1") + prune = Prune(topic_id="topic2") + + control = ControlMessage(grafts=[graft], prunes=[prune]) + + assert len(control.grafts) == 1 + assert len(control.prunes) == 1 + assert not control.is_empty() + + def test_control_message_empty_check(self) -> None: + """Test control message empty check.""" + empty_control = ControlMessage() + assert empty_control.is_empty() + + non_empty = ControlMessage(grafts=[Graft(topic_id="topic")]) + assert not non_empty.is_empty() + + +class TestTopicFormatting: + """Test suite for topic string formatting and parsing.""" + + def test_gossip_topic_creation(self) -> None: + """Test GossipTopic creation.""" + topic = GossipTopic(kind=TopicKind.BLOCK, fork_digest="0x12345678") + + assert topic.kind == TopicKind.BLOCK + assert topic.fork_digest == "0x12345678" + assert str(topic) == "/leanconsensus/0x12345678/block/ssz_snappy" + + def test_gossip_topic_from_string(self) -> None: + """Test parsing topic string.""" + topic = GossipTopic.from_string("/leanconsensus/0x12345678/block/ssz_snappy") + + assert topic.kind == TopicKind.BLOCK + assert topic.fork_digest == "0x12345678" + + def test_gossip_topic_factory_methods(self) -> None: + """Test GossipTopic factory methods.""" + block_topic = GossipTopic.block("0xabcd1234") + assert block_topic.kind == TopicKind.BLOCK + + attestation_topic = GossipTopic.attestation("0xabcd1234") + assert attestation_topic.kind == TopicKind.ATTESTATION + + def test_format_topic_string(self) -> None: + """Test topic string formatting.""" + result = format_topic_string("block", "0x12345678") + assert result == "/leanconsensus/0x12345678/block/ssz_snappy" + + def test_parse_topic_string(self) -> None: + """Test topic string parsing.""" + prefix, fork_digest, topic_name, encoding = parse_topic_string( + "/leanconsensus/0x12345678/block/ssz_snappy" + ) + + assert prefix == "leanconsensus" + assert fork_digest == "0x12345678" + assert topic_name == "block" + assert encoding == "ssz_snappy" + + def test_invalid_topic_string(self) -> None: + """Test handling of invalid topic strings.""" + with pytest.raises(ValueError, match="expected 4 parts"): + GossipTopic.from_string("/invalid/topic") + + with pytest.raises(ValueError, match="Invalid prefix"): + GossipTopic.from_string("/wrongprefix/0x123/block/ssz_snappy") + + def test_topic_kind_enum(self) -> None: + """Test TopicKind enum.""" + assert TopicKind.BLOCK.value == "block" + assert TopicKind.ATTESTATION.value == "attestation" + assert str(TopicKind.BLOCK) == "block" - def test_realistic_blockchain_scenarios(self) -> None: - """Test realistic blockchain message scenarios.""" - # Some Ethereum like GossipSub topics and payloads - scenarios = [ - (b"/eth2/beacon_block/ssz_snappy", b"beacon_block_ssz_data"), - (b"/eth2/beacon_aggregate_and_proof/ssz_snappy", b"aggregate_proof_ssz"), - (b"/eth2/voluntary_exit/ssz_snappy", b"voluntary_exit_message"), - ] - def mock_snappy_decompress(data: bytes) -> bytes: - return data + b"_decompressed" # Simulate decompression +class TestMeshState: + """Test suite for mesh state management.""" + + def test_mesh_state_initialization(self) -> None: + """Test MeshState initialization.""" + params = GossipsubParameters(d=8, d_low=6, d_high=12, d_lazy=6) + mesh = MeshState(params=params) + + assert mesh.d == 8 + assert mesh.d_low == 6 + assert mesh.d_high == 12 + assert mesh.d_lazy == 6 + + def test_subscribe_and_unsubscribe(self) -> None: + """Test topic subscription.""" + mesh = MeshState(params=GossipsubParameters()) + + mesh.subscribe("topic1") + assert mesh.is_subscribed("topic1") + assert not mesh.is_subscribed("topic2") + + peers = mesh.unsubscribe("topic1") + assert not mesh.is_subscribed("topic1") + assert peers == set() + + def test_add_remove_mesh_peers(self) -> None: + """Test adding and removing peers from mesh.""" + mesh = MeshState(params=GossipsubParameters()) + mesh.subscribe("topic1") + + assert mesh.add_to_mesh("topic1", "peer1") + assert mesh.add_to_mesh("topic1", "peer2") + assert not mesh.add_to_mesh("topic1", "peer1") # Already in mesh + + peers = mesh.get_mesh_peers("topic1") + assert "peer1" in peers + assert "peer2" in peers + + assert mesh.remove_from_mesh("topic1", "peer1") + assert not mesh.remove_from_mesh("topic1", "peer1") # Already removed + + peers = mesh.get_mesh_peers("topic1") + assert "peer1" not in peers + assert "peer2" in peers + + def test_gossip_peer_selection(self) -> None: + """Test selection of non-mesh peers for gossip.""" + params = GossipsubParameters(d_lazy=3) + mesh = MeshState(params=params) + mesh.subscribe("topic1") + mesh.add_to_mesh("topic1", "peer1") + mesh.add_to_mesh("topic1", "peer2") + + all_peers = {"peer1", "peer2", "peer3", "peer4", "peer5", "peer6"} + + gossip_peers = mesh.select_peers_for_gossip("topic1", all_peers) + + mesh_peers = mesh.get_mesh_peers("topic1") + for peer in gossip_peers: + assert peer not in mesh_peers + + +class TestTopicMesh: + """Test suite for TopicMesh dataclass.""" + + def test_topic_mesh_add_remove(self) -> None: + """Test adding and removing peers.""" + topic_mesh = TopicMesh() + + assert topic_mesh.add_peer("peer1") + assert not topic_mesh.add_peer("peer1") # Already exists + assert "peer1" in topic_mesh.peers + + assert topic_mesh.remove_peer("peer1") + assert not topic_mesh.remove_peer("peer1") # Already removed + assert "peer1" not in topic_mesh.peers + + +class TestMessageCache: + """Test suite for message cache.""" + + def test_cache_put_and_get(self) -> None: + """Test putting and retrieving messages.""" + cache = MessageCache(mcache_len=6, mcache_gossip=3) + message = GossipsubMessage(topic=b"topic", raw_data=b"data") + + assert cache.put("topic", message) + assert not cache.put("topic", message) # Duplicate + + retrieved = cache.get(message.id) + assert retrieved is not None + assert retrieved.id == message.id + + def test_cache_has(self) -> None: + """Test checking if message is in cache.""" + cache = MessageCache() + message = GossipsubMessage(topic=b"topic", raw_data=b"data") + + assert not cache.has(message.id) + cache.put("topic", message) + assert cache.has(message.id) + + def test_cache_shift(self) -> None: + """Test cache window shifting.""" + cache = MessageCache(mcache_len=3, mcache_gossip=2) messages = [] - for topic, data in scenarios: - # Test both with and without snappy - msg_with_snappy = GossipsubMessage(topic, data, mock_snappy_decompress) - msg_without_snappy = GossipsubMessage(topic, data) - messages.extend([msg_with_snappy, msg_without_snappy]) + for i in range(5): + msg = GossipsubMessage(topic=b"topic", raw_data=f"data{i}".encode()) + cache.put("topic", msg) + messages.append(msg) + cache.shift() - ids = [msg.id for msg in messages] + # Old messages should be evicted + assert not cache.has(messages[0].id) + assert not cache.has(messages[1].id) + + def test_get_gossip_ids(self) -> None: + """Test getting message IDs for IHAVE gossip.""" + cache = MessageCache(mcache_len=6, mcache_gossip=3) + + msg1 = GossipsubMessage(topic=b"topic1", raw_data=b"data1") + msg2 = GossipsubMessage(topic=b"topic2", raw_data=b"data2") + msg3 = GossipsubMessage(topic=b"topic1", raw_data=b"data3") + + cache.put("topic1", msg1) + cache.put("topic2", msg2) + cache.put("topic1", msg3) + + gossip_ids = cache.get_gossip_ids("topic1") + + assert msg1.id in gossip_ids + assert msg2.id not in gossip_ids + assert msg3.id in gossip_ids + + +class TestSeenCache: + """Test suite for seen message cache.""" + + def test_seen_cache_add_and_check(self) -> None: + """Test adding and checking seen messages.""" + from lean_spec.types import Bytes20 + + cache = SeenCache(ttl_seconds=60) + msg_id = Bytes20(b"12345678901234567890") + + assert not cache.has(msg_id) + assert cache.add(msg_id, timestamp=1000.0) + assert cache.has(msg_id) + assert not cache.add(msg_id, timestamp=1001.0) # Duplicate + + def test_seen_cache_cleanup(self) -> None: + """Test cleanup of expired entries.""" + from lean_spec.types import Bytes20 + + cache = SeenCache(ttl_seconds=10) + msg_id = Bytes20(b"12345678901234567890") + + cache.add(msg_id, timestamp=1000.0) + assert cache.has(msg_id) + + removed = cache.cleanup(current_time=1015.0) + assert removed == 1 + assert not cache.has(msg_id) + + +class TestFanoutEntry: + """Test suite for FanoutEntry dataclass.""" + + def test_fanout_entry_staleness(self) -> None: + """Test fanout entry staleness detection.""" + entry = FanoutEntry() + entry.last_published = 1000.0 - # All messages should produce valid, unique IDs - assert len(ids) == len(set(ids)) # All unique - for msg_id in ids: - assert len(msg_id) == 20 - assert isinstance(msg_id, bytes) - - # Verify snappy vs non-snappy messages produce different IDs - for i in range(0, len(messages), 2): - with_snappy_id = messages[i].id - without_snappy_id = messages[i + 1].id - assert with_snappy_id != without_snappy_id + assert not entry.is_stale(current_time=1050.0, ttl=60.0) + assert entry.is_stale(current_time=1070.0, ttl=60.0) From 59fbd0cad2b9533d42c8c7fde2e4281e38fe2c83 Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Wed, 7 Jan 2026 13:46:48 +0100 Subject: [PATCH 2/3] add protocol IDF --- src/lean_spec/subspecs/networking/gossipsub/parameters.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/lean_spec/subspecs/networking/gossipsub/parameters.py b/src/lean_spec/subspecs/networking/gossipsub/parameters.py index 6717cb5c..363075a1 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/parameters.py +++ b/src/lean_spec/subspecs/networking/gossipsub/parameters.py @@ -71,6 +71,9 @@ class GossipsubParameters(StrictBaseModel): Default values follow the Ethereum consensus P2P specification. """ + protocol_id: str = "/meshsub/1.3.0" + """The protocol ID for gossip messages.""" + # ------------------------------------------------------------------------- # Mesh Degree Parameters # ------------------------------------------------------------------------- From e1a52c64c76c38aaf720250a48d3e0fa938b4e90 Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Wed, 7 Jan 2026 14:45:19 +0100 Subject: [PATCH 3/3] cleanup --- .../subspecs/networking/gossipsub/mcache.py | 13 ------------- src/lean_spec/subspecs/networking/gossipsub/mesh.py | 11 +++++------ .../subspecs/networking/gossipsub/message.py | 4 ---- .../subspecs/networking/gossipsub/parameters.py | 8 +++++--- 4 files changed, 10 insertions(+), 26 deletions(-) diff --git a/src/lean_spec/subspecs/networking/gossipsub/mcache.py b/src/lean_spec/subspecs/networking/gossipsub/mcache.py index 9bd02963..1e14fe17 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/mcache.py +++ b/src/lean_spec/subspecs/networking/gossipsub/mcache.py @@ -272,19 +272,6 @@ class SeenCache: - Skip processing of already-seen messages - Avoid forwarding duplicates to mesh peers - Bound memory with automatic TTL cleanup - - Example:: - - seen = SeenCache(ttl_seconds=120) - - # Check and mark as seen - if seen.add(msg_id, current_time): - process_message(msg) # First time seeing this - else: - pass # Duplicate, skip - - # Periodic cleanup - removed = seen.cleanup(current_time) """ ttl_seconds: int = 120 diff --git a/src/lean_spec/subspecs/networking/gossipsub/mesh.py b/src/lean_spec/subspecs/networking/gossipsub/mesh.py index 6d30547f..0e372b53 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/mesh.py +++ b/src/lean_spec/subspecs/networking/gossipsub/mesh.py @@ -45,13 +45,10 @@ import random import time from dataclasses import dataclass, field -from typing import TYPE_CHECKING +from .parameters import GossipsubParameters from .types import PeerId, TopicId -if TYPE_CHECKING: - from .parameters import GossipsubParameters - @dataclass(slots=True) class FanoutEntry: @@ -256,7 +253,8 @@ def add_to_mesh(self, topic: TopicId, peer_id: PeerId) -> bool: peer_id: Peer to add. Returns: - True if added, False if already present or not subscribed. + - True if added, + - False if already present or not subscribed. """ mesh = self._meshes.get(topic) if mesh is None: @@ -271,7 +269,8 @@ def remove_from_mesh(self, topic: TopicId, peer_id: PeerId) -> bool: peer_id: Peer to remove. Returns: - True if removed, False if not present or not subscribed. + - True if removed, + - False if not present or not subscribed. """ mesh = self._meshes.get(topic) if mesh is None: diff --git a/src/lean_spec/subspecs/networking/gossipsub/message.py b/src/lean_spec/subspecs/networking/gossipsub/message.py index a0eca077..ec804afc 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/message.py +++ b/src/lean_spec/subspecs/networking/gossipsub/message.py @@ -66,10 +66,6 @@ from .types import MessageId -# ============================================================================= -# Snappy Decompressor Protocol -# ============================================================================= - @runtime_checkable class SnappyDecompressor(Protocol): diff --git a/src/lean_spec/subspecs/networking/gossipsub/parameters.py b/src/lean_spec/subspecs/networking/gossipsub/parameters.py index 363075a1..96f49b80 100644 --- a/src/lean_spec/subspecs/networking/gossipsub/parameters.py +++ b/src/lean_spec/subspecs/networking/gossipsub/parameters.py @@ -139,8 +139,8 @@ class GossipsubParameters(StrictBaseModel): mcache_len: int = 6 """Total number of history windows in the message cache. - Messages are stored for this many heartbeat intervals. - After mcache_len heartbeats, messages are evicted. + - Messages are stored for this many heartbeat intervals. + - After mcache_len heartbeats, messages are evicted. """ mcache_gossip: int = 3 @@ -170,5 +170,7 @@ class GossipsubParameters(StrictBaseModel): When receiving a message larger than this threshold, immediately send IDONTWANT to mesh peers to prevent - redundant transmissions. Set to 1KB by default. + redundant transmissions. + + Set to 1KB by default. """