From 4d5f8d2cc4c16618a61cce25fac1dedd3f4bdaa1 Mon Sep 17 00:00:00 2001 From: Vyakart Date: Sun, 16 Nov 2025 05:58:15 +0530 Subject: [PATCH 01/13] framework: make XmssKeyManager configurable (#129) --- .../testing/src/consensus_testing/keys.py | 232 ++++++++++++++---- .../subspecs/containers/signature.py | 19 ++ 2 files changed, 204 insertions(+), 47 deletions(-) diff --git a/packages/testing/src/consensus_testing/keys.py b/packages/testing/src/consensus_testing/keys.py index b484367c..a29fd01b 100644 --- a/packages/testing/src/consensus_testing/keys.py +++ b/packages/testing/src/consensus_testing/keys.py @@ -1,16 +1,23 @@ """XMSS key management utilities for testing.""" -from typing import NamedTuple, Optional +from __future__ import annotations + +import random +from typing import Any, NamedTuple, Optional from lean_spec.subspecs.containers import Attestation, Signature from lean_spec.subspecs.containers.slot import Slot +from lean_spec.subspecs.koalabear import Fp, P from lean_spec.subspecs.ssz.hash import hash_tree_root +from lean_spec.subspecs.xmss.constants import PRF_KEY_LENGTH, XmssConfig from lean_spec.subspecs.xmss.containers import PublicKey, SecretKey from lean_spec.subspecs.xmss.interface import ( TEST_SIGNATURE_SCHEME, GeneralizedXmssScheme, ) -from lean_spec.types import ValidatorIndex +from lean_spec.subspecs.xmss.prf import Prf +from lean_spec.subspecs.xmss.utils import Rand +from lean_spec.types import Uint64, ValidatorIndex class KeyPair(NamedTuple): @@ -23,24 +30,63 @@ class KeyPair(NamedTuple): """The validator's secret key (used for signing).""" -_KEY_CACHE: dict[tuple[int, int], KeyPair] = {} +_KEY_CACHE: dict[tuple[int, int, int, int | None], KeyPair] = {} """ Cache keys across tests to avoid regenerating them for the same validator/lifetime combo. -Key: (validator_index, num_active_epochs) -> KeyPair +Key: (validator_index, activation_epoch, num_active_epochs, seed) -> KeyPair """ +def _to_int(value: int | Slot | Uint64 | None, default: int = 0) -> int: + """Normalize Slot/Uint64/int to int with an optional default.""" + if value is None: + return default + if isinstance(value, Slot): + return value.as_int() + return int(value) + + +class SeededRand(Rand): + """Deterministic Rand helper to make key generation repeatable in tests.""" + + def __init__(self, config: XmssConfig, seed: int) -> None: + """Initialize with a deterministic seed.""" + super().__init__(config) + self._rng = random.Random(seed) + + def field_elements(self, length: int) -> list[Fp]: + """Generate deterministic field elements from the seeded RNG.""" + return [Fp(value=self._rng.randrange(P)) for _ in range(length)] + + +class SeededPrf(Prf): + """Deterministic PRF helper for repeatable PRF key generation.""" + + def __init__(self, config: XmssConfig, seed: int) -> None: + """Initialize with a deterministic seed.""" + super().__init__(config) + self._rng = random.Random(seed) + + def key_gen(self) -> bytes: + """Generate a deterministic PRF key for repeatable tests.""" + # Use a deterministic stream rather than os.urandom for repeatability in tests. + return self._rng.randbytes(PRF_KEY_LENGTH) + + class XmssKeyManager: """Lazy key manager for test validators using XMSS signatures.""" DEFAULT_MAX_SLOT = Slot(100) """Default maximum slot horizon if not specified.""" + DEFAULT_ACTIVATION_EPOCH = 0 def __init__( self, max_slot: Optional[Slot] = None, scheme: GeneralizedXmssScheme = TEST_SIGNATURE_SCHEME, + default_activation_epoch: int | Slot | Uint64 = DEFAULT_ACTIVATION_EPOCH, + default_seed: int | None = 0, ) -> None: """ Initialize the key manager. @@ -53,6 +99,11 @@ def __init__( scheme : GeneralizedXmssScheme, optional The XMSS scheme to use. Defaults to `TEST_SIGNATURE_SCHEME`. + default_activation_epoch : int | Slot | Uint64, optional + Activation epoch used when none is provided for key generation. + default_seed : int | None, optional + Seed for deterministic key generation. Set to None to use non-deterministic + randomness from the underlying XMSS scheme. Notes: ----- @@ -61,7 +112,92 @@ def __init__( """ self.max_slot = max_slot if max_slot is not None else self.DEFAULT_MAX_SLOT self.scheme = scheme + self.default_activation_epoch = _to_int( + default_activation_epoch, self.DEFAULT_ACTIVATION_EPOCH + ) + self.default_seed = default_seed self._key_pairs: dict[ValidatorIndex, KeyPair] = {} + self._key_metadata: dict[ValidatorIndex, dict[str, Any]] = {} + self._schemes_by_seed: dict[int, GeneralizedXmssScheme] = {} + + @property + def default_num_active_epochs(self) -> int: + """Default lifetime derived from the configured max_slot.""" + return self.max_slot.as_int() + 1 + + def _scheme_for_seed(self, seed: int | None) -> GeneralizedXmssScheme: + """ + Return a scheme instance appropriate for the provided seed. + + A deterministic scheme (SeededRand + SeededPrf) is returned when a specific + seed is provided; otherwise the base scheme is used. + """ + if seed is None: + return self.scheme + + if seed not in self._schemes_by_seed: + self._schemes_by_seed[seed] = GeneralizedXmssScheme( + config=self.scheme.config, + prf=SeededPrf(self.scheme.config, seed), + hasher=self.scheme.hasher, + merkle_tree=self.scheme.merkle_tree, + encoder=self.scheme.encoder, + rand=SeededRand(self.scheme.config, seed), + ) + + return self._schemes_by_seed[seed] + + def create_and_store_key_pair( + self, + validator_index: ValidatorIndex, + *, + activation_epoch: int | Slot | Uint64 | None = None, + num_active_epochs: int | Slot | Uint64 | None = None, + seed: int | None = None, + ) -> KeyPair: + """ + Generate and store a key pair with explicit control over key generation. + + Parameters + ---------- + validator_index : ValidatorIndex + The validator for whom a key pair should be generated. + activation_epoch : int | Slot | Uint64, optional + First epoch for which the key is valid. Defaults to `default_activation_epoch`. + num_active_epochs : int | Slot | Uint64, optional + Number of consecutive epochs the key should remain active. + Defaults to `max_slot + 1` (to include genesis). + seed : int | None, optional + Seed used for deterministic key generation. If None, the base scheme's + randomness is used. + """ + activation_epoch_int = _to_int(activation_epoch, self.default_activation_epoch) + num_active_epochs_int = _to_int(num_active_epochs, self.default_num_active_epochs) + key_seed = seed if seed is not None else self.default_seed + + scheme = self._scheme_for_seed(key_seed) + + cache_key = ( + int(validator_index), + activation_epoch_int, + num_active_epochs_int, + key_seed, + ) + + if cache_key in _KEY_CACHE: + key_pair = _KEY_CACHE[cache_key] + else: + pk, sk = scheme.key_gen(Uint64(activation_epoch_int), Uint64(num_active_epochs_int)) + key_pair = KeyPair(public=pk, secret=sk) + _KEY_CACHE[cache_key] = key_pair + + self._key_pairs[validator_index] = key_pair + self._key_metadata[validator_index] = { + "activation_epoch": activation_epoch_int, + "num_active_epochs": num_active_epochs_int, + "seed": key_seed, + } + return key_pair def __getitem__(self, validator_index: ValidatorIndex) -> KeyPair: """ @@ -83,37 +219,10 @@ def __getitem__(self, validator_index: ValidatorIndex) -> KeyPair: - Keys are deterministic for testing (`seed=0`). - Lifetime = `max_slot + 1` to include the genesis slot. """ - # Return cached keys if they exist. if validator_index in self._key_pairs: return self._key_pairs[validator_index] - # Generate New Key Pair - # - # XMSS requires knowing the total number of signatures in advance. - # We use max_slot + 1 as the lifetime since: - # - Validators may sign once per slot (attestations) - # - We include slot 0 (genesis) in the count - num_active_epochs = self.max_slot.as_int() + 1 - - # Check global cache first (keys are reused across tests) - cache_key = (int(validator_index), num_active_epochs) - if cache_key in _KEY_CACHE: - key_pair = _KEY_CACHE[cache_key] - self._key_pairs[validator_index] = key_pair - return key_pair - - # Generate the key pair using the default XMSS scheme. - # - # The seed is set to 0 for deterministic test keys. - from lean_spec.types import Uint64 - - pk, sk = self.scheme.key_gen(Uint64(0), Uint64(num_active_epochs)) - - # Store as a cohesive unit and return. - key_pair = KeyPair(public=pk, secret=sk) - _KEY_CACHE[cache_key] = key_pair # Cache globally for reuse across tests - self._key_pairs[validator_index] = key_pair - return key_pair + return self.create_and_store_key_pair(validator_index) def sign_attestation(self, attestation: Attestation) -> Signature: """ @@ -143,16 +252,26 @@ def sign_attestation(self, attestation: Attestation) -> Signature: # Get the current secret key sk = key_pair.secret + metadata = self._key_metadata.get( + validator_id, + { + "seed": self.default_seed, + "activation_epoch": self.default_activation_epoch, + "num_active_epochs": self.default_num_active_epochs, + }, + ) + scheme = self._scheme_for_seed(metadata.get("seed")) + # Map the attestation slot to an XMSS epoch. # # Each slot gets its own epoch to avoid key reuse. epoch = attestation.data.slot # Loop until the epoch is inside the prepared interval - prepared_interval = self.scheme.get_prepared_interval(sk) + prepared_interval = scheme.get_prepared_interval(sk) while int(epoch) not in prepared_interval: # Check if we're advancing past the key's total lifetime - activation_interval = self.scheme.get_activation_interval(sk) + activation_interval = scheme.get_activation_interval(sk) if prepared_interval.stop >= activation_interval.stop: raise ValueError( f"Cannot sign for epoch {epoch}: " @@ -160,10 +279,10 @@ def sign_attestation(self, attestation: Attestation) -> Signature: ) # Advance the key and get the new key object - sk = self.scheme.advance_preparation(sk) + sk = scheme.advance_preparation(sk) # Update the prepared interval for the next loop check - prepared_interval = self.scheme.get_prepared_interval(sk) + prepared_interval = scheme.get_prepared_interval(sk) # Update the cached key pair with the new, advanced secret key. # This ensures the *next* call to sign() uses the advanced state. @@ -175,18 +294,10 @@ def sign_attestation(self, attestation: Attestation) -> Signature: message = bytes(hash_tree_root(attestation)) # Generate the XMSS signature using the validator's (now prepared) secret key. - xmss_sig = self.scheme.sign(sk, epoch, message) - - # Convert the signature to the wire format (byte array). - signature_bytes = xmss_sig.to_bytes(self.scheme.config) - - # Ensure the signature meets the consensus spec length (3100 bytes). - # - # This is necessary when using TEST_CONFIG (796 bytes) vs PROD_CONFIG. - # Padding with zeros on the right maintains compatibility. - padded_bytes = signature_bytes.ljust(Signature.LENGTH, b"\x00") + xmss_sig = scheme.sign(sk, epoch, message) - return Signature(padded_bytes) + # Convert to the consensus Signature container (handles padding internally). + return Signature.from_xmss(xmss_sig, scheme) def get_public_key(self, validator_index: ValidatorIndex) -> PublicKey: """ @@ -225,3 +336,30 @@ def __contains__(self, validator_index: ValidatorIndex) -> bool: def __len__(self) -> int: """Return the number of validators with generated keys.""" return len(self._key_pairs) + + def export_test_vectors(self, include_private_keys: bool = False) -> list[dict[str, Any]]: + """ + Export generated keys in a JSON-serializable structure for downstream clients. + + Parameters + ---------- + include_private_keys : bool + When True, include the full secret key dump; otherwise only public data. + """ + vectors: list[dict[str, Any]] = [] + for validator_index, key_pair in self._key_pairs.items(): + meta = self._key_metadata.get(validator_index, {}) + entry: dict[str, Any] = { + "validator_index": int(validator_index), + "activation_epoch": meta.get("activation_epoch"), + "num_active_epochs": meta.get("num_active_epochs"), + "seed": meta.get("seed"), + "public_key": key_pair.public.to_bytes(self.scheme.config).hex(), + } + if include_private_keys: + # Pydantic models are JSON-serializable; keep the raw dump for full fidelity. + entry["secret_key"] = key_pair.secret.model_dump(mode="json") + + vectors.append(entry) + + return vectors diff --git a/src/lean_spec/subspecs/containers/signature.py b/src/lean_spec/subspecs/containers/signature.py index f8f9bea4..a7cba214 100644 --- a/src/lean_spec/subspecs/containers/signature.py +++ b/src/lean_spec/subspecs/containers/signature.py @@ -28,3 +28,22 @@ def verify( return scheme.verify(public_key, epoch, message, signature) except Exception: return False + + @classmethod + def from_xmss( + cls, xmss_signature: XmssSignature, scheme: GeneralizedXmssScheme = TEST_SIGNATURE_SCHEME + ) -> "Signature": + """ + Create a consensus `Signature` container from an XMSS signature object. + + Handles padding to the fixed 3100-byte length required by the consensus layer, + delegating all encoding details to the XMSS container itself. + """ + raw = xmss_signature.to_bytes(scheme.config) + if len(raw) > cls.LENGTH: + raise ValueError( + f"XMSS signature length {len(raw)} exceeds container size {cls.LENGTH}" + ) + + # Pad on the right to the fixed-length container expected by consensus. + return cls(raw.ljust(cls.LENGTH, b"\x00")) From ac15db6676fcd59608d68437f74beba905992905 Mon Sep 17 00:00:00 2001 From: Vyakart Date: Mon, 17 Nov 2025 22:23:16 +0530 Subject: [PATCH 02/13] Add configurable XMSS key manager with deterministic options and tests --- .../test_fixtures/fork_choice.py | 32 +++++++++- tests/test_consensus_testing_keys.py | 58 +++++++++++++++++++ 2 files changed, 88 insertions(+), 2 deletions(-) create mode 100644 tests/test_consensus_testing_keys.py diff --git a/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py b/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py index ea155e02..fc012570 100644 --- a/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py +++ b/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py @@ -112,6 +112,20 @@ class ForkChoiceTest(BaseConsensusFixture): valid up to the highest slot used in any block or attestation. """ + key_manager_seed: int | None = None + """ + Optional deterministic seed to pass to the XMSS key manager. + + When set, validators' keys and signatures become reproducible across runs. + """ + + key_manager_activation_epoch: Slot | None = None + """ + Optional activation epoch to use when generating keys. + + Defaults to the key manager's own default (0) when unset. + """ + @model_validator(mode="after") def set_anchor_block_default(self) -> ForkChoiceTest: """ @@ -183,10 +197,24 @@ def make_fixture(self) -> ForkChoiceTest: # Use shared key manager if it has sufficient capacity, otherwise create a new one # This optimizes performance by reusing keys across tests when possible shared_key_manager = _get_shared_key_manager() + use_shared = ( + self.key_manager_seed is None + and self.key_manager_activation_epoch is None + and self.max_slot <= shared_key_manager.max_slot + ) key_manager = ( shared_key_manager - if self.max_slot <= shared_key_manager.max_slot - else XmssKeyManager(max_slot=self.max_slot, scheme=TEST_SIGNATURE_SCHEME) + if use_shared + else XmssKeyManager( + max_slot=self.max_slot, + scheme=TEST_SIGNATURE_SCHEME, + default_seed=self.key_manager_seed, + default_activation_epoch=( + self.key_manager_activation_epoch + if self.key_manager_activation_epoch is not None + else XmssKeyManager.DEFAULT_ACTIVATION_EPOCH + ), + ) ) # Update validator pubkeys to match key_manager's generated keys diff --git a/tests/test_consensus_testing_keys.py b/tests/test_consensus_testing_keys.py new file mode 100644 index 00000000..ab1707c4 --- /dev/null +++ b/tests/test_consensus_testing_keys.py @@ -0,0 +1,58 @@ +import pytest +from consensus_testing.keys import XmssKeyManager + +from lean_spec.types import ValidatorIndex + + +def test_seeded_key_generation_is_deterministic() -> None: + manager_a = XmssKeyManager(default_seed=42) + manager_b = XmssKeyManager(default_seed=42) + manager_c = XmssKeyManager(default_seed=43) + + pair_a = manager_a.create_and_store_key_pair(ValidatorIndex(0)) + pair_b = manager_b.create_and_store_key_pair(ValidatorIndex(0)) + pair_c = manager_c.create_and_store_key_pair(ValidatorIndex(0)) + + assert pair_a.public == pair_b.public + assert pair_a.secret == pair_b.secret + assert pair_a.public != pair_c.public + assert pair_a.secret != pair_c.secret + + +def test_export_test_vectors_shape_and_metadata() -> None: + manager = XmssKeyManager(default_seed=7) + # Explicitly control first key parameters + manager.create_and_store_key_pair( + ValidatorIndex(1), + activation_epoch=5, + num_active_epochs=10, + seed=99, + ) + # Use defaults for second key + manager.create_and_store_key_pair(ValidatorIndex(2)) + + vectors = manager.export_test_vectors(include_private_keys=True) + assert {entry["validator_index"] for entry in vectors} == {1, 2} + + by_validator = {entry["validator_index"]: entry for entry in vectors} + first = by_validator[1] + second = by_validator[2] + + # Public key should be hex-encoded and match the configured length. + pk_len = manager.scheme.config.PUBLIC_KEY_LEN_BYTES * 2 + assert len(first["public_key"]) == pk_len + assert len(second["public_key"]) == pk_len + + # Metadata should reflect the parameters used to create the keys. + assert first["activation_epoch"] == 5 + assert first["num_active_epochs"] == 10 + assert first["seed"] == 99 + + assert second["activation_epoch"] == manager.default_activation_epoch + assert second["num_active_epochs"] == manager.default_num_active_epochs + assert second["seed"] == manager.default_seed + + # Secret key is only present when requested. + assert "secret_key" in first + assert isinstance(first["secret_key"], dict) + assert "secret_key" in second From 55cec11f0a76b4561ba573ea40f0474ed03b2cad Mon Sep 17 00:00:00 2001 From: Vyakart Date: Wed, 19 Nov 2025 14:54:09 +0530 Subject: [PATCH 03/13] refactor: simplify XMSS key management by removing deterministic seeding and add new signature tests. --- .../testing/src/consensus_testing/keys.py | 136 ++++-------------- .../subspecs/containers/test_signature.py | 123 ++++++++++++++++ 2 files changed, 148 insertions(+), 111 deletions(-) create mode 100644 tests/lean_spec/subspecs/containers/test_signature.py diff --git a/packages/testing/src/consensus_testing/keys.py b/packages/testing/src/consensus_testing/keys.py index a29fd01b..1e8c47f4 100644 --- a/packages/testing/src/consensus_testing/keys.py +++ b/packages/testing/src/consensus_testing/keys.py @@ -2,21 +2,17 @@ from __future__ import annotations -import random from typing import Any, NamedTuple, Optional from lean_spec.subspecs.containers import Attestation, Signature from lean_spec.subspecs.containers.slot import Slot -from lean_spec.subspecs.koalabear import Fp, P from lean_spec.subspecs.ssz.hash import hash_tree_root -from lean_spec.subspecs.xmss.constants import PRF_KEY_LENGTH, XmssConfig +from lean_spec.subspecs.xmss.constants import XmssConfig from lean_spec.subspecs.xmss.containers import PublicKey, SecretKey from lean_spec.subspecs.xmss.interface import ( TEST_SIGNATURE_SCHEME, GeneralizedXmssScheme, ) -from lean_spec.subspecs.xmss.prf import Prf -from lean_spec.subspecs.xmss.utils import Rand from lean_spec.types import Uint64, ValidatorIndex @@ -30,50 +26,14 @@ class KeyPair(NamedTuple): """The validator's secret key (used for signing).""" -_KEY_CACHE: dict[tuple[int, int, int, int | None], KeyPair] = {} +_KEY_CACHE: dict[tuple[int, int, int], KeyPair] = {} """ Cache keys across tests to avoid regenerating them for the same validator/lifetime combo. -Key: (validator_index, activation_epoch, num_active_epochs, seed) -> KeyPair +Key: (validator_index, activation_epoch, num_active_epochs) -> KeyPair """ -def _to_int(value: int | Slot | Uint64 | None, default: int = 0) -> int: - """Normalize Slot/Uint64/int to int with an optional default.""" - if value is None: - return default - if isinstance(value, Slot): - return value.as_int() - return int(value) - - -class SeededRand(Rand): - """Deterministic Rand helper to make key generation repeatable in tests.""" - - def __init__(self, config: XmssConfig, seed: int) -> None: - """Initialize with a deterministic seed.""" - super().__init__(config) - self._rng = random.Random(seed) - - def field_elements(self, length: int) -> list[Fp]: - """Generate deterministic field elements from the seeded RNG.""" - return [Fp(value=self._rng.randrange(P)) for _ in range(length)] - - -class SeededPrf(Prf): - """Deterministic PRF helper for repeatable PRF key generation.""" - - def __init__(self, config: XmssConfig, seed: int) -> None: - """Initialize with a deterministic seed.""" - super().__init__(config) - self._rng = random.Random(seed) - - def key_gen(self) -> bytes: - """Generate a deterministic PRF key for repeatable tests.""" - # Use a deterministic stream rather than os.urandom for repeatability in tests. - return self._rng.randbytes(PRF_KEY_LENGTH) - - class XmssKeyManager: """Lazy key manager for test validators using XMSS signatures.""" @@ -85,8 +45,7 @@ def __init__( self, max_slot: Optional[Slot] = None, scheme: GeneralizedXmssScheme = TEST_SIGNATURE_SCHEME, - default_activation_epoch: int | Slot | Uint64 = DEFAULT_ACTIVATION_EPOCH, - default_seed: int | None = 0, + default_activation_epoch: Uint64 = Uint64(DEFAULT_ACTIVATION_EPOCH), ) -> None: """ Initialize the key manager. @@ -99,61 +58,33 @@ def __init__( scheme : GeneralizedXmssScheme, optional The XMSS scheme to use. Defaults to `TEST_SIGNATURE_SCHEME`. - default_activation_epoch : int | Slot | Uint64, optional + default_activation_epoch : Uint64, optional Activation epoch used when none is provided for key generation. - default_seed : int | None, optional - Seed for deterministic key generation. Set to None to use non-deterministic - randomness from the underlying XMSS scheme. - Notes: + Notes ----- Internally, keys are stored in a single dictionary: `{ValidatorIndex → KeyPair}`. """ self.max_slot = max_slot if max_slot is not None else self.DEFAULT_MAX_SLOT self.scheme = scheme - self.default_activation_epoch = _to_int( - default_activation_epoch, self.DEFAULT_ACTIVATION_EPOCH - ) - self.default_seed = default_seed + self.default_activation_epoch = default_activation_epoch self._key_pairs: dict[ValidatorIndex, KeyPair] = {} self._key_metadata: dict[ValidatorIndex, dict[str, Any]] = {} - self._schemes_by_seed: dict[int, GeneralizedXmssScheme] = {} @property def default_num_active_epochs(self) -> int: """Default lifetime derived from the configured max_slot.""" return self.max_slot.as_int() + 1 - def _scheme_for_seed(self, seed: int | None) -> GeneralizedXmssScheme: - """ - Return a scheme instance appropriate for the provided seed. - - A deterministic scheme (SeededRand + SeededPrf) is returned when a specific - seed is provided; otherwise the base scheme is used. - """ - if seed is None: - return self.scheme - if seed not in self._schemes_by_seed: - self._schemes_by_seed[seed] = GeneralizedXmssScheme( - config=self.scheme.config, - prf=SeededPrf(self.scheme.config, seed), - hasher=self.scheme.hasher, - merkle_tree=self.scheme.merkle_tree, - encoder=self.scheme.encoder, - rand=SeededRand(self.scheme.config, seed), - ) - - return self._schemes_by_seed[seed] def create_and_store_key_pair( self, validator_index: ValidatorIndex, *, - activation_epoch: int | Slot | Uint64 | None = None, - num_active_epochs: int | Slot | Uint64 | None = None, - seed: int | None = None, + activation_epoch: Optional[Uint64] = None, + num_active_epochs: Optional[Uint64] = None, ) -> KeyPair: """ Generate and store a key pair with explicit control over key generation. @@ -162,40 +93,32 @@ def create_and_store_key_pair( ---------- validator_index : ValidatorIndex The validator for whom a key pair should be generated. - activation_epoch : int | Slot | Uint64, optional + activation_epoch : Uint64, optional First epoch for which the key is valid. Defaults to `default_activation_epoch`. - num_active_epochs : int | Slot | Uint64, optional + num_active_epochs : Uint64, optional Number of consecutive epochs the key should remain active. Defaults to `max_slot + 1` (to include genesis). - seed : int | None, optional - Seed used for deterministic key generation. If None, the base scheme's - randomness is used. """ - activation_epoch_int = _to_int(activation_epoch, self.default_activation_epoch) - num_active_epochs_int = _to_int(num_active_epochs, self.default_num_active_epochs) - key_seed = seed if seed is not None else self.default_seed - - scheme = self._scheme_for_seed(key_seed) + activation_epoch_val = activation_epoch if activation_epoch is not None else self.default_activation_epoch + num_active_epochs_val = num_active_epochs if num_active_epochs is not None else Uint64(self.default_num_active_epochs) cache_key = ( int(validator_index), - activation_epoch_int, - num_active_epochs_int, - key_seed, + int(activation_epoch_val), + int(num_active_epochs_val), ) if cache_key in _KEY_CACHE: key_pair = _KEY_CACHE[cache_key] else: - pk, sk = scheme.key_gen(Uint64(activation_epoch_int), Uint64(num_active_epochs_int)) + pk, sk = self.scheme.key_gen(activation_epoch_val, num_active_epochs_val) key_pair = KeyPair(public=pk, secret=sk) _KEY_CACHE[cache_key] = key_pair self._key_pairs[validator_index] = key_pair self._key_metadata[validator_index] = { - "activation_epoch": activation_epoch_int, - "num_active_epochs": num_active_epochs_int, - "seed": key_seed, + "activation_epoch": int(activation_epoch_val), + "num_active_epochs": int(num_active_epochs_val), } return key_pair @@ -252,15 +175,7 @@ def sign_attestation(self, attestation: Attestation) -> Signature: # Get the current secret key sk = key_pair.secret - metadata = self._key_metadata.get( - validator_id, - { - "seed": self.default_seed, - "activation_epoch": self.default_activation_epoch, - "num_active_epochs": self.default_num_active_epochs, - }, - ) - scheme = self._scheme_for_seed(metadata.get("seed")) + # Map the attestation slot to an XMSS epoch. # @@ -268,10 +183,10 @@ def sign_attestation(self, attestation: Attestation) -> Signature: epoch = attestation.data.slot # Loop until the epoch is inside the prepared interval - prepared_interval = scheme.get_prepared_interval(sk) + prepared_interval = self.scheme.get_prepared_interval(sk) while int(epoch) not in prepared_interval: # Check if we're advancing past the key's total lifetime - activation_interval = scheme.get_activation_interval(sk) + activation_interval = self.scheme.get_activation_interval(sk) if prepared_interval.stop >= activation_interval.stop: raise ValueError( f"Cannot sign for epoch {epoch}: " @@ -279,10 +194,10 @@ def sign_attestation(self, attestation: Attestation) -> Signature: ) # Advance the key and get the new key object - sk = scheme.advance_preparation(sk) + sk = self.scheme.advance_preparation(sk) # Update the prepared interval for the next loop check - prepared_interval = scheme.get_prepared_interval(sk) + prepared_interval = self.scheme.get_prepared_interval(sk) # Update the cached key pair with the new, advanced secret key. # This ensures the *next* call to sign() uses the advanced state. @@ -294,10 +209,10 @@ def sign_attestation(self, attestation: Attestation) -> Signature: message = bytes(hash_tree_root(attestation)) # Generate the XMSS signature using the validator's (now prepared) secret key. - xmss_sig = scheme.sign(sk, epoch, message) + xmss_sig = self.scheme.sign(sk, epoch, message) # Convert to the consensus Signature container (handles padding internally). - return Signature.from_xmss(xmss_sig, scheme) + return Signature.from_xmss(xmss_sig, self.scheme) def get_public_key(self, validator_index: ValidatorIndex) -> PublicKey: """ @@ -353,7 +268,6 @@ def export_test_vectors(self, include_private_keys: bool = False) -> list[dict[s "validator_index": int(validator_index), "activation_epoch": meta.get("activation_epoch"), "num_active_epochs": meta.get("num_active_epochs"), - "seed": meta.get("seed"), "public_key": key_pair.public.to_bytes(self.scheme.config).hex(), } if include_private_keys: diff --git a/tests/lean_spec/subspecs/containers/test_signature.py b/tests/lean_spec/subspecs/containers/test_signature.py new file mode 100644 index 00000000..c9360e27 --- /dev/null +++ b/tests/lean_spec/subspecs/containers/test_signature.py @@ -0,0 +1,123 @@ +"""Tests for consensus Signature container.""" + +import pytest + +from lean_spec.subspecs.containers import Signature +from lean_spec.subspecs.koalabear import Fp +from lean_spec.subspecs.xmss.constants import TEST_CONFIG +from lean_spec.subspecs.xmss.containers import HashTreeOpening +from lean_spec.subspecs.xmss.containers import Signature as XmssSignature +from lean_spec.subspecs.xmss.interface import TEST_SIGNATURE_SCHEME + + +class TestSignatureFromXmss: + """Tests for Signature.from_xmss conversion method.""" + + def test_from_xmss_basic_conversion(self) -> None: + """Test that from_xmss correctly converts an XMSS signature to consensus format.""" + # Create a valid XMSS signature + path = HashTreeOpening( + siblings=[[Fp(value=i) for i in range(TEST_CONFIG.HASH_LEN_FE)]] + * TEST_CONFIG.LOG_LIFETIME + ) + rho = [Fp(value=i) for i in range(TEST_CONFIG.RAND_LEN_FE)] + hashes = [ + [Fp(value=i + j) for i in range(TEST_CONFIG.HASH_LEN_FE)] + for j in range(TEST_CONFIG.DIMENSION) + ] + xmss_sig = XmssSignature(path=path, rho=rho, hashes=hashes) + + # Convert to consensus signature + consensus_sig = Signature.from_xmss(xmss_sig, TEST_SIGNATURE_SCHEME) + + # Verify it's the correct type and length + assert isinstance(consensus_sig, Signature) + assert len(consensus_sig) == Signature.LENGTH + assert len(consensus_sig) == 3100 + + def test_from_xmss_padding(self) -> None: + """Test that from_xmss correctly pads to 3100 bytes.""" + # Create a minimal XMSS signature + path = HashTreeOpening( + siblings=[[Fp(value=0)] * TEST_CONFIG.HASH_LEN_FE] * TEST_CONFIG.LOG_LIFETIME + ) + rho = [Fp(value=0)] * TEST_CONFIG.RAND_LEN_FE + hashes = [[Fp(value=0)] * TEST_CONFIG.HASH_LEN_FE] * TEST_CONFIG.DIMENSION + xmss_sig = XmssSignature(path=path, rho=rho, hashes=hashes) + + consensus_sig = Signature.from_xmss(xmss_sig, TEST_SIGNATURE_SCHEME) + + # The signature should be padded with zeros + raw_xmss = xmss_sig.to_bytes(TEST_CONFIG) + expected_padding = 3100 - len(raw_xmss) + + # Verify the last bytes are zeros (padding) + assert consensus_sig[-expected_padding:] == b"\x00" * expected_padding + + def test_from_xmss_preserves_data(self) -> None: + """Test that from_xmss preserves the XMSS signature data.""" + # Create an XMSS signature with distinct values + path = HashTreeOpening( + siblings=[ + [Fp(value=i * j) for i in range(TEST_CONFIG.HASH_LEN_FE)] + for j in range(TEST_CONFIG.LOG_LIFETIME) + ] + ) + rho = [Fp(value=i * 10) for i in range(TEST_CONFIG.RAND_LEN_FE)] + hashes = [ + [Fp(value=i + j * 100) for i in range(TEST_CONFIG.HASH_LEN_FE)] + for j in range(TEST_CONFIG.DIMENSION) + ] + xmss_sig = XmssSignature(path=path, rho=rho, hashes=hashes) + + # Convert to consensus format + consensus_sig = Signature.from_xmss(xmss_sig, TEST_SIGNATURE_SCHEME) + + # The beginning of the consensus signature should match the XMSS bytes + raw_xmss = xmss_sig.to_bytes(TEST_CONFIG) + assert bytes(consensus_sig)[: len(raw_xmss)] == raw_xmss + + def test_from_xmss_roundtrip_with_verify(self) -> None: + """Test that a signature created via from_xmss can be verified.""" + from lean_spec.subspecs.xmss.interface import TEST_SIGNATURE_SCHEME + from lean_spec.types import Uint64 + + # Generate a test key pair + pk, sk = TEST_SIGNATURE_SCHEME.key_gen(Uint64(0), Uint64(10)) + + # Create a test message (must be exactly 32 bytes) + message = b"test message for signing123456\x00\x00" # 32 bytes + assert len(message) == 32 + epoch = Uint64(0) + + # Sign the message + xmss_sig = TEST_SIGNATURE_SCHEME.sign(sk, epoch, message) + + # Convert to consensus signature + consensus_sig = Signature.from_xmss(xmss_sig, TEST_SIGNATURE_SCHEME) + + # Verify using the consensus signature's verify method + assert consensus_sig.verify(pk, epoch, message, TEST_SIGNATURE_SCHEME) + + def test_from_xmss_different_signatures_produce_different_results(self) -> None: + """Test that different XMSS signatures produce different consensus signatures.""" + # Create two different XMSS signatures + path1 = HashTreeOpening( + siblings=[[Fp(value=i) for i in range(TEST_CONFIG.HASH_LEN_FE)]] + * TEST_CONFIG.LOG_LIFETIME + ) + path2 = HashTreeOpening( + siblings=[[Fp(value=i + 1) for i in range(TEST_CONFIG.HASH_LEN_FE)]] + * TEST_CONFIG.LOG_LIFETIME + ) + rho = [Fp(value=0)] * TEST_CONFIG.RAND_LEN_FE + hashes = [[Fp(value=0)] * TEST_CONFIG.HASH_LEN_FE] * TEST_CONFIG.DIMENSION + + xmss_sig1 = XmssSignature(path=path1, rho=rho, hashes=hashes) + xmss_sig2 = XmssSignature(path=path2, rho=rho, hashes=hashes) + + consensus_sig1 = Signature.from_xmss(xmss_sig1, TEST_SIGNATURE_SCHEME) + consensus_sig2 = Signature.from_xmss(xmss_sig2, TEST_SIGNATURE_SCHEME) + + # Different XMSS signatures should produce different consensus signatures + assert bytes(consensus_sig1) != bytes(consensus_sig2) From c404d1fa593394446c8daa1eed7473839f2b264d Mon Sep 17 00:00:00 2001 From: Vyakart Date: Wed, 19 Nov 2025 15:11:10 +0530 Subject: [PATCH 04/13] refactor: make KeyManager's default_activation_epoch optional and consistently use Uint64. --- .../testing/src/consensus_testing/keys.py | 27 +++++++++++-------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/packages/testing/src/consensus_testing/keys.py b/packages/testing/src/consensus_testing/keys.py index 1e8c47f4..f293445e 100644 --- a/packages/testing/src/consensus_testing/keys.py +++ b/packages/testing/src/consensus_testing/keys.py @@ -7,7 +7,6 @@ from lean_spec.subspecs.containers import Attestation, Signature from lean_spec.subspecs.containers.slot import Slot from lean_spec.subspecs.ssz.hash import hash_tree_root -from lean_spec.subspecs.xmss.constants import XmssConfig from lean_spec.subspecs.xmss.containers import PublicKey, SecretKey from lean_spec.subspecs.xmss.interface import ( TEST_SIGNATURE_SCHEME, @@ -39,13 +38,13 @@ class XmssKeyManager: DEFAULT_MAX_SLOT = Slot(100) """Default maximum slot horizon if not specified.""" - DEFAULT_ACTIVATION_EPOCH = 0 + DEFAULT_ACTIVATION_EPOCH = Uint64(0) def __init__( self, max_slot: Optional[Slot] = None, scheme: GeneralizedXmssScheme = TEST_SIGNATURE_SCHEME, - default_activation_epoch: Uint64 = Uint64(DEFAULT_ACTIVATION_EPOCH), + default_activation_epoch: Optional[Uint64] = None, ) -> None: """ Initialize the key manager. @@ -61,14 +60,18 @@ def __init__( default_activation_epoch : Uint64, optional Activation epoch used when none is provided for key generation. - Notes + Notes: ----- Internally, keys are stored in a single dictionary: `{ValidatorIndex → KeyPair}`. """ self.max_slot = max_slot if max_slot is not None else self.DEFAULT_MAX_SLOT self.scheme = scheme - self.default_activation_epoch = default_activation_epoch + self.default_activation_epoch = ( + default_activation_epoch + if default_activation_epoch is not None + else self.DEFAULT_ACTIVATION_EPOCH + ) self._key_pairs: dict[ValidatorIndex, KeyPair] = {} self._key_metadata: dict[ValidatorIndex, dict[str, Any]] = {} @@ -77,8 +80,6 @@ def default_num_active_epochs(self) -> int: """Default lifetime derived from the configured max_slot.""" return self.max_slot.as_int() + 1 - - def create_and_store_key_pair( self, validator_index: ValidatorIndex, @@ -99,8 +100,14 @@ def create_and_store_key_pair( Number of consecutive epochs the key should remain active. Defaults to `max_slot + 1` (to include genesis). """ - activation_epoch_val = activation_epoch if activation_epoch is not None else self.default_activation_epoch - num_active_epochs_val = num_active_epochs if num_active_epochs is not None else Uint64(self.default_num_active_epochs) + activation_epoch_val = ( + activation_epoch if activation_epoch is not None else self.default_activation_epoch + ) + num_active_epochs_val = ( + num_active_epochs + if num_active_epochs is not None + else Uint64(self.default_num_active_epochs) + ) cache_key = ( int(validator_index), @@ -175,8 +182,6 @@ def sign_attestation(self, attestation: Attestation) -> Signature: # Get the current secret key sk = key_pair.secret - - # Map the attestation slot to an XMSS epoch. # # Each slot gets its own epoch to avoid key reuse. From 309b0b1109ae0bf339a9db7ce7b6fb48c96bd336 Mon Sep 17 00:00:00 2001 From: Vyakart Date: Wed, 19 Nov 2025 20:20:43 +0530 Subject: [PATCH 05/13] Remove XMSS padding and differentiation tests, and fixed signature length assertion. --- .../subspecs/containers/test_signature.py | 40 ------------------- 1 file changed, 40 deletions(-) diff --git a/tests/lean_spec/subspecs/containers/test_signature.py b/tests/lean_spec/subspecs/containers/test_signature.py index c9360e27..406b724d 100644 --- a/tests/lean_spec/subspecs/containers/test_signature.py +++ b/tests/lean_spec/subspecs/containers/test_signature.py @@ -33,26 +33,8 @@ def test_from_xmss_basic_conversion(self) -> None: # Verify it's the correct type and length assert isinstance(consensus_sig, Signature) assert len(consensus_sig) == Signature.LENGTH - assert len(consensus_sig) == 3100 - def test_from_xmss_padding(self) -> None: - """Test that from_xmss correctly pads to 3100 bytes.""" - # Create a minimal XMSS signature - path = HashTreeOpening( - siblings=[[Fp(value=0)] * TEST_CONFIG.HASH_LEN_FE] * TEST_CONFIG.LOG_LIFETIME - ) - rho = [Fp(value=0)] * TEST_CONFIG.RAND_LEN_FE - hashes = [[Fp(value=0)] * TEST_CONFIG.HASH_LEN_FE] * TEST_CONFIG.DIMENSION - xmss_sig = XmssSignature(path=path, rho=rho, hashes=hashes) - - consensus_sig = Signature.from_xmss(xmss_sig, TEST_SIGNATURE_SCHEME) - - # The signature should be padded with zeros - raw_xmss = xmss_sig.to_bytes(TEST_CONFIG) - expected_padding = 3100 - len(raw_xmss) - # Verify the last bytes are zeros (padding) - assert consensus_sig[-expected_padding:] == b"\x00" * expected_padding def test_from_xmss_preserves_data(self) -> None: """Test that from_xmss preserves the XMSS signature data.""" @@ -79,7 +61,6 @@ def test_from_xmss_preserves_data(self) -> None: def test_from_xmss_roundtrip_with_verify(self) -> None: """Test that a signature created via from_xmss can be verified.""" - from lean_spec.subspecs.xmss.interface import TEST_SIGNATURE_SCHEME from lean_spec.types import Uint64 # Generate a test key pair @@ -99,25 +80,4 @@ def test_from_xmss_roundtrip_with_verify(self) -> None: # Verify using the consensus signature's verify method assert consensus_sig.verify(pk, epoch, message, TEST_SIGNATURE_SCHEME) - def test_from_xmss_different_signatures_produce_different_results(self) -> None: - """Test that different XMSS signatures produce different consensus signatures.""" - # Create two different XMSS signatures - path1 = HashTreeOpening( - siblings=[[Fp(value=i) for i in range(TEST_CONFIG.HASH_LEN_FE)]] - * TEST_CONFIG.LOG_LIFETIME - ) - path2 = HashTreeOpening( - siblings=[[Fp(value=i + 1) for i in range(TEST_CONFIG.HASH_LEN_FE)]] - * TEST_CONFIG.LOG_LIFETIME - ) - rho = [Fp(value=0)] * TEST_CONFIG.RAND_LEN_FE - hashes = [[Fp(value=0)] * TEST_CONFIG.HASH_LEN_FE] * TEST_CONFIG.DIMENSION - - xmss_sig1 = XmssSignature(path=path1, rho=rho, hashes=hashes) - xmss_sig2 = XmssSignature(path=path2, rho=rho, hashes=hashes) - - consensus_sig1 = Signature.from_xmss(xmss_sig1, TEST_SIGNATURE_SCHEME) - consensus_sig2 = Signature.from_xmss(xmss_sig2, TEST_SIGNATURE_SCHEME) - # Different XMSS signatures should produce different consensus signatures - assert bytes(consensus_sig1) != bytes(consensus_sig2) From 034d5ab96920f45fc7092d2551ee84d750fca108 Mon Sep 17 00:00:00 2001 From: Vyakart Date: Wed, 19 Nov 2025 20:27:04 +0530 Subject: [PATCH 06/13] chore: Remove extraneous blank lines from test_signature.py --- tests/lean_spec/subspecs/containers/test_signature.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/lean_spec/subspecs/containers/test_signature.py b/tests/lean_spec/subspecs/containers/test_signature.py index 406b724d..c825eccc 100644 --- a/tests/lean_spec/subspecs/containers/test_signature.py +++ b/tests/lean_spec/subspecs/containers/test_signature.py @@ -34,8 +34,6 @@ def test_from_xmss_basic_conversion(self) -> None: assert isinstance(consensus_sig, Signature) assert len(consensus_sig) == Signature.LENGTH - - def test_from_xmss_preserves_data(self) -> None: """Test that from_xmss preserves the XMSS signature data.""" # Create an XMSS signature with distinct values @@ -79,5 +77,3 @@ def test_from_xmss_roundtrip_with_verify(self) -> None: # Verify using the consensus signature's verify method assert consensus_sig.verify(pk, epoch, message, TEST_SIGNATURE_SCHEME) - - From f513830f06ce256a8d2013d256845f62e6b3ebd4 Mon Sep 17 00:00:00 2001 From: Thomas Coratger <60488569+tcoratger@users.noreply.github.com> Date: Wed, 19 Nov 2025 16:29:11 +0100 Subject: [PATCH 07/13] Apply suggestions from code review --- packages/testing/src/consensus_testing/keys.py | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/testing/src/consensus_testing/keys.py b/packages/testing/src/consensus_testing/keys.py index f293445e..711e15fa 100644 --- a/packages/testing/src/consensus_testing/keys.py +++ b/packages/testing/src/consensus_testing/keys.py @@ -39,6 +39,7 @@ class XmssKeyManager: DEFAULT_MAX_SLOT = Slot(100) """Default maximum slot horizon if not specified.""" DEFAULT_ACTIVATION_EPOCH = Uint64(0) + """Default activation epoch if not specified""" def __init__( self, From fddbd934667bc6b7f49a17a421af522b58b55586 Mon Sep 17 00:00:00 2001 From: Vyakart Date: Thu, 20 Nov 2025 12:56:32 +0530 Subject: [PATCH 08/13] refactor: remove redundant constant and rename property in `XmssKeyManager`, correct `Signature` type hint, and remove basic XMSS signature conversion tests. --- .../testing/src/consensus_testing/keys.py | 13 +++-- .../subspecs/containers/signature.py | 2 +- .../subspecs/containers/test_signature.py | 52 +------------------ 3 files changed, 10 insertions(+), 57 deletions(-) diff --git a/packages/testing/src/consensus_testing/keys.py b/packages/testing/src/consensus_testing/keys.py index 711e15fa..84e54cd3 100644 --- a/packages/testing/src/consensus_testing/keys.py +++ b/packages/testing/src/consensus_testing/keys.py @@ -36,10 +36,10 @@ class KeyPair(NamedTuple): class XmssKeyManager: """Lazy key manager for test validators using XMSS signatures.""" + DEFAULT_MAX_SLOT = Slot(100) """Default maximum slot horizon if not specified.""" - DEFAULT_ACTIVATION_EPOCH = Uint64(0) - """Default activation epoch if not specified""" + def __init__( self, @@ -65,19 +65,22 @@ def __init__( ----- Internally, keys are stored in a single dictionary: `{ValidatorIndex → KeyPair}`. + + This class manages stateful XMSS keys for testing, handling the complexity of + epoch updates and key evolution that stateless helpers cannot provide. """ self.max_slot = max_slot if max_slot is not None else self.DEFAULT_MAX_SLOT self.scheme = scheme self.default_activation_epoch = ( default_activation_epoch if default_activation_epoch is not None - else self.DEFAULT_ACTIVATION_EPOCH + else Uint64(0) ) self._key_pairs: dict[ValidatorIndex, KeyPair] = {} self._key_metadata: dict[ValidatorIndex, dict[str, Any]] = {} @property - def default_num_active_epochs(self) -> int: + def default_max_epoch(self) -> int: """Default lifetime derived from the configured max_slot.""" return self.max_slot.as_int() + 1 @@ -107,7 +110,7 @@ def create_and_store_key_pair( num_active_epochs_val = ( num_active_epochs if num_active_epochs is not None - else Uint64(self.default_num_active_epochs) + else Uint64(self.default_max_epoch) ) cache_key = ( diff --git a/src/lean_spec/subspecs/containers/signature.py b/src/lean_spec/subspecs/containers/signature.py index a7cba214..bf65f6df 100644 --- a/src/lean_spec/subspecs/containers/signature.py +++ b/src/lean_spec/subspecs/containers/signature.py @@ -32,7 +32,7 @@ def verify( @classmethod def from_xmss( cls, xmss_signature: XmssSignature, scheme: GeneralizedXmssScheme = TEST_SIGNATURE_SCHEME - ) -> "Signature": + ) -> Signature: """ Create a consensus `Signature` container from an XMSS signature object. diff --git a/tests/lean_spec/subspecs/containers/test_signature.py b/tests/lean_spec/subspecs/containers/test_signature.py index c825eccc..8bb3bff8 100644 --- a/tests/lean_spec/subspecs/containers/test_signature.py +++ b/tests/lean_spec/subspecs/containers/test_signature.py @@ -1,65 +1,15 @@ """Tests for consensus Signature container.""" -import pytest - from lean_spec.subspecs.containers import Signature -from lean_spec.subspecs.koalabear import Fp -from lean_spec.subspecs.xmss.constants import TEST_CONFIG -from lean_spec.subspecs.xmss.containers import HashTreeOpening -from lean_spec.subspecs.xmss.containers import Signature as XmssSignature from lean_spec.subspecs.xmss.interface import TEST_SIGNATURE_SCHEME +from lean_spec.types import Uint64 class TestSignatureFromXmss: """Tests for Signature.from_xmss conversion method.""" - def test_from_xmss_basic_conversion(self) -> None: - """Test that from_xmss correctly converts an XMSS signature to consensus format.""" - # Create a valid XMSS signature - path = HashTreeOpening( - siblings=[[Fp(value=i) for i in range(TEST_CONFIG.HASH_LEN_FE)]] - * TEST_CONFIG.LOG_LIFETIME - ) - rho = [Fp(value=i) for i in range(TEST_CONFIG.RAND_LEN_FE)] - hashes = [ - [Fp(value=i + j) for i in range(TEST_CONFIG.HASH_LEN_FE)] - for j in range(TEST_CONFIG.DIMENSION) - ] - xmss_sig = XmssSignature(path=path, rho=rho, hashes=hashes) - - # Convert to consensus signature - consensus_sig = Signature.from_xmss(xmss_sig, TEST_SIGNATURE_SCHEME) - - # Verify it's the correct type and length - assert isinstance(consensus_sig, Signature) - assert len(consensus_sig) == Signature.LENGTH - - def test_from_xmss_preserves_data(self) -> None: - """Test that from_xmss preserves the XMSS signature data.""" - # Create an XMSS signature with distinct values - path = HashTreeOpening( - siblings=[ - [Fp(value=i * j) for i in range(TEST_CONFIG.HASH_LEN_FE)] - for j in range(TEST_CONFIG.LOG_LIFETIME) - ] - ) - rho = [Fp(value=i * 10) for i in range(TEST_CONFIG.RAND_LEN_FE)] - hashes = [ - [Fp(value=i + j * 100) for i in range(TEST_CONFIG.HASH_LEN_FE)] - for j in range(TEST_CONFIG.DIMENSION) - ] - xmss_sig = XmssSignature(path=path, rho=rho, hashes=hashes) - - # Convert to consensus format - consensus_sig = Signature.from_xmss(xmss_sig, TEST_SIGNATURE_SCHEME) - - # The beginning of the consensus signature should match the XMSS bytes - raw_xmss = xmss_sig.to_bytes(TEST_CONFIG) - assert bytes(consensus_sig)[: len(raw_xmss)] == raw_xmss - def test_from_xmss_roundtrip_with_verify(self) -> None: """Test that a signature created via from_xmss can be verified.""" - from lean_spec.types import Uint64 # Generate a test key pair pk, sk = TEST_SIGNATURE_SCHEME.key_gen(Uint64(0), Uint64(10)) From 0cd8a9bcd0587ccf51d9e7189b9a31fc21f909f6 Mon Sep 17 00:00:00 2001 From: Vyakart Date: Thu, 20 Nov 2025 13:08:45 +0530 Subject: [PATCH 09/13] style: apply ruff formatting to keys.py --- packages/testing/src/consensus_testing/keys.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/packages/testing/src/consensus_testing/keys.py b/packages/testing/src/consensus_testing/keys.py index 84e54cd3..df2af1dc 100644 --- a/packages/testing/src/consensus_testing/keys.py +++ b/packages/testing/src/consensus_testing/keys.py @@ -36,11 +36,9 @@ class KeyPair(NamedTuple): class XmssKeyManager: """Lazy key manager for test validators using XMSS signatures.""" - DEFAULT_MAX_SLOT = Slot(100) """Default maximum slot horizon if not specified.""" - def __init__( self, max_slot: Optional[Slot] = None, @@ -72,9 +70,7 @@ def __init__( self.max_slot = max_slot if max_slot is not None else self.DEFAULT_MAX_SLOT self.scheme = scheme self.default_activation_epoch = ( - default_activation_epoch - if default_activation_epoch is not None - else Uint64(0) + default_activation_epoch if default_activation_epoch is not None else Uint64(0) ) self._key_pairs: dict[ValidatorIndex, KeyPair] = {} self._key_metadata: dict[ValidatorIndex, dict[str, Any]] = {} @@ -108,9 +104,7 @@ def create_and_store_key_pair( activation_epoch if activation_epoch is not None else self.default_activation_epoch ) num_active_epochs_val = ( - num_active_epochs - if num_active_epochs is not None - else Uint64(self.default_max_epoch) + num_active_epochs if num_active_epochs is not None else Uint64(self.default_max_epoch) ) cache_key = ( From 986285c16a01eb3b422077efa619fec709e43fcb Mon Sep 17 00:00:00 2001 From: Vyakart Date: Fri, 21 Nov 2025 15:49:56 +0530 Subject: [PATCH 10/13] refactor: update XmssKeyManager to use activation_epoch parameter and clean up default values; improve Signature class documentation --- .../testing/src/consensus_testing/keys.py | 50 +++++-------------- .../subspecs/containers/signature.py | 4 +- 2 files changed, 14 insertions(+), 40 deletions(-) diff --git a/packages/testing/src/consensus_testing/keys.py b/packages/testing/src/consensus_testing/keys.py index df2af1dc..019f11d6 100644 --- a/packages/testing/src/consensus_testing/keys.py +++ b/packages/testing/src/consensus_testing/keys.py @@ -41,23 +41,23 @@ class XmssKeyManager: def __init__( self, + activation_epoch: Optional[Uint64] = None, max_slot: Optional[Slot] = None, scheme: GeneralizedXmssScheme = TEST_SIGNATURE_SCHEME, - default_activation_epoch: Optional[Uint64] = None, ) -> None: """ Initialize the key manager. Parameters ---------- + activation_epoch : Uint64, optional + Activation epoch used when none is provided for key generation. max_slot : Slot, optional Highest slot number for which keys must remain valid. Defaults to `Slot(100)`. scheme : GeneralizedXmssScheme, optional The XMSS scheme to use. Defaults to `TEST_SIGNATURE_SCHEME`. - default_activation_epoch : Uint64, optional - Activation epoch used when none is provided for key generation. Notes: ----- @@ -69,16 +69,14 @@ def __init__( """ self.max_slot = max_slot if max_slot is not None else self.DEFAULT_MAX_SLOT self.scheme = scheme - self.default_activation_epoch = ( - default_activation_epoch if default_activation_epoch is not None else Uint64(0) - ) + self.activation_epoch = activation_epoch if activation_epoch is not None else Uint64(0) self._key_pairs: dict[ValidatorIndex, KeyPair] = {} self._key_metadata: dict[ValidatorIndex, dict[str, Any]] = {} @property def default_max_epoch(self) -> int: - """Default lifetime derived from the configured max_slot.""" - return self.max_slot.as_int() + 1 + """Default lifetime derived from the class default max_slot.""" + return self.DEFAULT_MAX_SLOT.as_int() + 1 def create_and_store_key_pair( self, @@ -95,13 +93,14 @@ def create_and_store_key_pair( validator_index : ValidatorIndex The validator for whom a key pair should be generated. activation_epoch : Uint64, optional - First epoch for which the key is valid. Defaults to `default_activation_epoch`. + First epoch for which the key is valid. Defaults to the manager's + configured `activation_epoch`. num_active_epochs : Uint64, optional Number of consecutive epochs the key should remain active. - Defaults to `max_slot + 1` (to include genesis). + Defaults to `default_max_epoch` (derived from `DEFAULT_MAX_SLOT` to include genesis). """ activation_epoch_val = ( - activation_epoch if activation_epoch is not None else self.default_activation_epoch + activation_epoch if activation_epoch is not None else self.activation_epoch ) num_active_epochs_val = ( num_active_epochs if num_active_epochs is not None else Uint64(self.default_max_epoch) @@ -125,6 +124,7 @@ def create_and_store_key_pair( "activation_epoch": int(activation_epoch_val), "num_active_epochs": int(num_active_epochs_val), } + # TODO: support multiple keys per validator keyed by activation_epoch. return key_pair def __getitem__(self, validator_index: ValidatorIndex) -> KeyPair: @@ -145,7 +145,7 @@ def __getitem__(self, validator_index: ValidatorIndex) -> KeyPair: ----- - Generates a new key if none exists. - Keys are deterministic for testing (`seed=0`). - - Lifetime = `max_slot + 1` to include the genesis slot. + - Lifetime defaults to `default_max_epoch` to include the genesis slot. """ if validator_index in self._key_pairs: return self._key_pairs[validator_index] @@ -254,29 +254,3 @@ def __contains__(self, validator_index: ValidatorIndex) -> bool: def __len__(self) -> int: """Return the number of validators with generated keys.""" return len(self._key_pairs) - - def export_test_vectors(self, include_private_keys: bool = False) -> list[dict[str, Any]]: - """ - Export generated keys in a JSON-serializable structure for downstream clients. - - Parameters - ---------- - include_private_keys : bool - When True, include the full secret key dump; otherwise only public data. - """ - vectors: list[dict[str, Any]] = [] - for validator_index, key_pair in self._key_pairs.items(): - meta = self._key_metadata.get(validator_index, {}) - entry: dict[str, Any] = { - "validator_index": int(validator_index), - "activation_epoch": meta.get("activation_epoch"), - "num_active_epochs": meta.get("num_active_epochs"), - "public_key": key_pair.public.to_bytes(self.scheme.config).hex(), - } - if include_private_keys: - # Pydantic models are JSON-serializable; keep the raw dump for full fidelity. - entry["secret_key"] = key_pair.secret.model_dump(mode="json") - - vectors.append(entry) - - return vectors diff --git a/src/lean_spec/subspecs/containers/signature.py b/src/lean_spec/subspecs/containers/signature.py index bf65f6df..81ed6179 100644 --- a/src/lean_spec/subspecs/containers/signature.py +++ b/src/lean_spec/subspecs/containers/signature.py @@ -36,8 +36,8 @@ def from_xmss( """ Create a consensus `Signature` container from an XMSS signature object. - Handles padding to the fixed 3100-byte length required by the consensus layer, - delegating all encoding details to the XMSS container itself. + Applies the consensus-layer fixed-length padding, delegating all encoding + details to the XMSS container itself. """ raw = xmss_signature.to_bytes(scheme.config) if len(raw) > cls.LENGTH: From cafdbfbfbac1a037fcbfd04e380511a06902b9c5 Mon Sep 17 00:00:00 2001 From: Vyakart Date: Mon, 24 Nov 2025 13:20:25 +0530 Subject: [PATCH 11/13] refactor: enhance XmssKeyManager to support default activation epoch and seed; update key caching mechanism --- .../testing/src/consensus_testing/keys.py | 120 +++++++++++++++--- 1 file changed, 104 insertions(+), 16 deletions(-) diff --git a/packages/testing/src/consensus_testing/keys.py b/packages/testing/src/consensus_testing/keys.py index 019f11d6..7fb97ceb 100644 --- a/packages/testing/src/consensus_testing/keys.py +++ b/packages/testing/src/consensus_testing/keys.py @@ -25,11 +25,11 @@ class KeyPair(NamedTuple): """The validator's secret key (used for signing).""" -_KEY_CACHE: dict[tuple[int, int, int], KeyPair] = {} +_KEY_CACHE: dict[tuple[int, int, int, int], KeyPair] = {} """ Cache keys across tests to avoid regenerating them for the same validator/lifetime combo. -Key: (validator_index, activation_epoch, num_active_epochs) -> KeyPair +Key: (validator_index, activation_epoch, num_active_epochs, seed) -> KeyPair """ @@ -38,10 +38,17 @@ class XmssKeyManager: DEFAULT_MAX_SLOT = Slot(100) """Default maximum slot horizon if not specified.""" + DEFAULT_ACTIVATION_EPOCH = Uint64(0) + """Default activation epoch when none is provided.""" + DEFAULT_SEED = 0 + """Default deterministic seed when none is provided.""" def __init__( self, - activation_epoch: Optional[Uint64] = None, + activation_epoch: Optional[Uint64 | Slot | int] = None, + *, + default_activation_epoch: Optional[Uint64 | Slot | int] = None, + default_seed: Optional[int] = None, max_slot: Optional[Slot] = None, scheme: GeneralizedXmssScheme = TEST_SIGNATURE_SCHEME, ) -> None: @@ -50,8 +57,12 @@ def __init__( Parameters ---------- - activation_epoch : Uint64, optional + activation_epoch : Uint64 | Slot | int, optional + Deprecated alias for `default_activation_epoch`. + default_activation_epoch : Uint64 | Slot | int, optional Activation epoch used when none is provided for key generation. + default_seed : int, optional + Seed value used when none is provided for key generation. max_slot : Slot, optional Highest slot number for which keys must remain valid. Defaults to `Slot(100)`. @@ -69,21 +80,57 @@ def __init__( """ self.max_slot = max_slot if max_slot is not None else self.DEFAULT_MAX_SLOT self.scheme = scheme - self.activation_epoch = activation_epoch if activation_epoch is not None else Uint64(0) + if activation_epoch is not None and default_activation_epoch is not None: + raise ValueError("Use either activation_epoch or default_activation_epoch, not both.") + effective_activation = ( + default_activation_epoch if default_activation_epoch is not None else activation_epoch + ) + activation_value = ( + self.DEFAULT_ACTIVATION_EPOCH + if effective_activation is None + else self._coerce_uint64(effective_activation) + ) + self._default_activation_epoch = activation_value + self._default_seed = int(default_seed) if default_seed is not None else self.DEFAULT_SEED self._key_pairs: dict[ValidatorIndex, KeyPair] = {} self._key_metadata: dict[ValidatorIndex, dict[str, Any]] = {} + @staticmethod + def _coerce_uint64(value: Uint64 | Slot | int) -> Uint64: + """Convert supported numeric inputs to Uint64.""" + if isinstance(value, Uint64): + return Uint64(int(value)) + if isinstance(value, Slot): + return Uint64(value.as_int()) + return Uint64(int(value)) + @property def default_max_epoch(self) -> int: - """Default lifetime derived from the class default max_slot.""" - return self.DEFAULT_MAX_SLOT.as_int() + 1 + """Default lifetime derived from the manager's configured max_slot.""" + return self.default_num_active_epochs + + @property + def default_num_active_epochs(self) -> int: + """Number of epochs keys stay active when not overridden.""" + return self.max_slot.as_int() + 1 + + @property + def default_activation_epoch(self) -> int: + """Default activation epoch as an int.""" + return int(self._default_activation_epoch) + + @property + def default_seed(self) -> int: + """Default seed used when none is provided.""" + return self._default_seed def create_and_store_key_pair( self, validator_index: ValidatorIndex, *, - activation_epoch: Optional[Uint64] = None, - num_active_epochs: Optional[Uint64] = None, + activation_epoch: Optional[Uint64 | Slot | int] = None, + num_active_epochs: Optional[Uint64 | Slot | int] = None, + seed: Optional[int] = None, ) -> KeyPair: """ Generate and store a key pair with explicit control over key generation. @@ -92,24 +139,32 @@ def create_and_store_key_pair( ---------- validator_index : ValidatorIndex The validator for whom a key pair should be generated. - activation_epoch : Uint64, optional + activation_epoch : Uint64 | Slot | int, optional First epoch for which the key is valid. Defaults to the manager's - configured `activation_epoch`. - num_active_epochs : Uint64, optional + configured `default_activation_epoch`. + num_active_epochs : Uint64 | Slot | int, optional Number of consecutive epochs the key should remain active. - Defaults to `default_max_epoch` (derived from `DEFAULT_MAX_SLOT` to include genesis). + Defaults to `default_num_active_epochs` (derived from `max_slot` to include genesis). + seed : int, optional + Deterministic seed for caching/reuse. Defaults to manager's `default_seed`. """ activation_epoch_val = ( - activation_epoch if activation_epoch is not None else self.activation_epoch + self._coerce_uint64(activation_epoch) + if activation_epoch is not None + else self._default_activation_epoch ) num_active_epochs_val = ( - num_active_epochs if num_active_epochs is not None else Uint64(self.default_max_epoch) + self._coerce_uint64(num_active_epochs) + if num_active_epochs is not None + else self._coerce_uint64(self.default_num_active_epochs) ) + seed_val = int(seed) if seed is not None else self.default_seed cache_key = ( int(validator_index), int(activation_epoch_val), int(num_active_epochs_val), + seed_val, ) if cache_key in _KEY_CACHE: @@ -123,6 +178,7 @@ def create_and_store_key_pair( self._key_metadata[validator_index] = { "activation_epoch": int(activation_epoch_val), "num_active_epochs": int(num_active_epochs_val), + "seed": seed_val, } # TODO: support multiple keys per validator keyed by activation_epoch. return key_pair @@ -145,7 +201,7 @@ def __getitem__(self, validator_index: ValidatorIndex) -> KeyPair: ----- - Generates a new key if none exists. - Keys are deterministic for testing (`seed=0`). - - Lifetime defaults to `default_max_epoch` to include the genesis slot. + - Lifetime defaults to `default_num_active_epochs` to include the genesis slot. """ if validator_index in self._key_pairs: return self._key_pairs[validator_index] @@ -217,6 +273,38 @@ def sign_attestation(self, attestation: Attestation) -> Signature: # Convert to the consensus Signature container (handles padding internally). return Signature.from_xmss(xmss_sig, self.scheme) + def export_test_vectors(self, include_private_keys: bool = False) -> list[dict[str, Any]]: + """ + Export generated keys as dictionaries suitable for JSON test vectors. + + Parameters + ---------- + include_private_keys : bool, optional + When True, include SecretKey contents for debugging fixtures. + + Returns: + ------- + list[dict[str, Any]] + A list of entries keyed by validator_index with metadata and hex keys. + """ + vectors: list[dict[str, Any]] = [] + for validator_index in sorted(self._key_pairs.keys(), key=int): + key_pair = self._key_pairs[validator_index] + metadata = self._key_metadata.get(validator_index, {}) + entry: dict[str, Any] = { + "validator_index": int(validator_index), + "activation_epoch": metadata.get("activation_epoch", self.default_activation_epoch), + "num_active_epochs": metadata.get( + "num_active_epochs", self.default_num_active_epochs + ), + "seed": metadata.get("seed", self.default_seed), + "public_key": key_pair.public.to_bytes(self.scheme.config).hex(), + } + if include_private_keys: + entry["secret_key"] = key_pair.secret.model_dump() + vectors.append(entry) + return vectors + def get_public_key(self, validator_index: ValidatorIndex) -> PublicKey: """ Return the public key for a validator. From 3f12f66d48eed7730288ab1dc67c494317c89e5b Mon Sep 17 00:00:00 2001 From: Vyakart Date: Mon, 24 Nov 2025 13:38:33 +0530 Subject: [PATCH 12/13] style: correct typographical errors in docstrings for XmssKeyManager methods --- packages/testing/src/consensus_testing/keys.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/testing/src/consensus_testing/keys.py b/packages/testing/src/consensus_testing/keys.py index 4df70a61..dfc6e3bd 100644 --- a/packages/testing/src/consensus_testing/keys.py +++ b/packages/testing/src/consensus_testing/keys.py @@ -185,7 +185,7 @@ def create_and_store_key_pair( def __getitem__(self, validator_index: ValidatorIndex) -> KeyPair: """ - Retrieve or lazily generate a validator’s key pair. + Retrieve or lazily generate a validator's key pair. Parameters ---------- @@ -195,7 +195,7 @@ def __getitem__(self, validator_index: ValidatorIndex) -> KeyPair: Returns: ------- KeyPair - The validator’s XMSS key pair. + The validator's XMSS key pair. Notes: ----- @@ -313,7 +313,7 @@ def get_public_key(self, validator_index: ValidatorIndex) -> PublicKey: Returns: ------- PublicKey - The validator’s public key. + The validator's public key. """ return self[validator_index].public From 85a0b8393f711e5f949dc113f4d2d8cd80a56d8b Mon Sep 17 00:00:00 2001 From: vyakart Date: Mon, 24 Nov 2025 15:31:23 +0530 Subject: [PATCH 13/13] style: improve docstring clarity for XmssKeyManager methods --- packages/testing/src/consensus_testing/keys.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/packages/testing/src/consensus_testing/keys.py b/packages/testing/src/consensus_testing/keys.py index dfc6e3bd..65b95cdf 100644 --- a/packages/testing/src/consensus_testing/keys.py +++ b/packages/testing/src/consensus_testing/keys.py @@ -195,7 +195,7 @@ def __getitem__(self, validator_index: ValidatorIndex) -> KeyPair: Returns: ------- KeyPair - The validator's XMSS key pair. + XMSS key pair associated with the validator. Notes: ----- @@ -273,11 +273,14 @@ def sign_attestation(self, attestation: Attestation) -> Signature: # Convert to the consensus Signature container (handles padding internally). return Signature.from_xmss(xmss_sig, self.scheme) - # Ensure the signature meets the consensus spec length (3116 bytes). - # - # This is necessary when using TEST_CONFIG (796 bytes) vs PROD_CONFIG. - # Padding with zeros on the right maintains compatibility. - padded_bytes = signature_bytes.ljust(Signature.LENGTH, b"\x00") + def export_test_vectors(self, include_private_keys: bool = False) -> list[dict[str, Any]]: + """ + Export generated keys as dictionaries suitable for JSON test vectors. + + Parameters + ---------- + include_private_keys : bool, optional + When True, include SecretKey contents for debugging fixtures. Returns: ------- @@ -313,7 +316,7 @@ def get_public_key(self, validator_index: ValidatorIndex) -> PublicKey: Returns: ------- PublicKey - The validator's public key. + Public key for the validator. """ return self[validator_index].public