From 62be4f3f6c06dd8063e3fd851cbcb9ece4f26f61 Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Sat, 20 Dec 2025 23:53:18 +0100 Subject: [PATCH 1/3] ssz: proper error typing --- pyproject.toml | 2 +- src/lean_spec/types/__init__.py | 27 ++ src/lean_spec/types/bitfields.py | 85 +++++-- src/lean_spec/types/boolean.py | 40 ++- src/lean_spec/types/byte_arrays.py | 90 +++++-- src/lean_spec/types/collections.py | 111 +++++++-- src/lean_spec/types/container.py | 54 +++- src/lean_spec/types/exceptions.py | 287 ++++++++++++++++++++++ src/lean_spec/types/uint.py | 45 +++- src/lean_spec/types/union.py | 95 +++++-- tests/lean_spec/types/test_bitfields.py | 35 ++- tests/lean_spec/types/test_boolean.py | 28 ++- tests/lean_spec/types/test_byte_arrays.py | 27 +- tests/lean_spec/types/test_collections.py | 26 +- tests/lean_spec/types/test_uint.py | 38 +-- tests/lean_spec/types/test_union.py | 32 ++- 16 files changed, 839 insertions(+), 183 deletions(-) create mode 100644 src/lean_spec/types/exceptions.py diff --git a/pyproject.toml b/pyproject.toml index ec08c110..2db62b49 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,7 @@ docstring-code-format = true [tool.ruff.lint] select = ["E", "F", "B", "W", "I", "A", "N", "D", "C"] fixable = ["I", "B", "E", "F", "W", "D", "C"] -ignore = ["D205", "D203", "D212", "D415", "C901", "A005", "C420"] +ignore = ["D205", "D203", "D212", "D415", "C901", "A005", "C420", "D105", "D107"] [tool.ruff.lint.pydocstyle] convention = "google" diff --git a/src/lean_spec/types/__init__.py b/src/lean_spec/types/__init__.py index cd3e46b0..cfdbc5aa 100644 --- a/src/lean_spec/types/__init__.py +++ b/src/lean_spec/types/__init__.py @@ -6,11 +6,26 @@ from .byte_arrays import ZERO_HASH, Bytes32, Bytes52, Bytes3116 from .collections import SSZList, SSZVector from .container import Container +from .exceptions import ( + SSZDecodeError, + SSZError, + SSZLengthError, + SSZOffsetError, + SSZOverflowError, + SSZSelectorError, + SSZSerializationError, + SSZStreamError, + SSZTypeCoercionError, + SSZTypeDefinitionError, + SSZTypeError, + SSZValueError, +) from .ssz_base import SSZType from .uint import Uint64 from .validator import is_proposer __all__ = [ + # Core types "Uint64", "BasisPoint", "Bytes32", @@ -25,4 +40,16 @@ "SSZType", "Boolean", "Container", + "SSZError", + "SSZTypeError", + "SSZTypeDefinitionError", + "SSZTypeCoercionError", + "SSZValueError", + "SSZOverflowError", + "SSZLengthError", + "SSZSerializationError", + "SSZDecodeError", + "SSZStreamError", + "SSZOffsetError", + "SSZSelectorError", ] diff --git a/src/lean_spec/types/bitfields.py b/src/lean_spec/types/bitfields.py index f14021aa..942edf86 100644 --- a/src/lean_spec/types/bitfields.py +++ b/src/lean_spec/types/bitfields.py @@ -29,6 +29,13 @@ from typing_extensions import Self from .boolean import Boolean +from .exceptions import ( + SSZDecodeError, + SSZLengthError, + SSZStreamError, + SSZTypeCoercionError, + SSZTypeDefinitionError, +) from .ssz_base import SSZModel @@ -55,13 +62,20 @@ class BaseBitvector(SSZModel): def _coerce_and_validate(cls, v: Any) -> tuple[Boolean, ...]: """Validate and convert input data to typed tuple of Booleans.""" if not hasattr(cls, "LENGTH"): - raise TypeError(f"{cls.__name__} must define LENGTH") + raise SSZTypeDefinitionError( + type_name=cls.__name__, + missing_attr="LENGTH", + ) if not isinstance(v, (list, tuple)): v = tuple(v) if len(v) != cls.LENGTH: - raise ValueError(f"{cls.__name__} requires exactly {cls.LENGTH} bits, got {len(v)}") + raise SSZLengthError( + type_name=cls.__name__, + expected=cls.LENGTH, + actual=len(v), + ) return tuple(Boolean(bit) for bit in v) @@ -86,10 +100,18 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: """Read SSZ bytes from a stream and return an instance.""" expected_len = cls.get_byte_length() if scope != expected_len: - raise ValueError(f"{cls.__name__}: expected {expected_len} bytes, got {scope}") + raise SSZDecodeError( + type_name=cls.__name__, + detail=f"expected {expected_len} bytes, got {scope}", + ) data = stream.read(scope) if len(data) != scope: - raise IOError(f"Expected {scope} bytes, got {len(data)}") + raise SSZStreamError( + type_name=cls.__name__, + operation="decoding", + expected_bytes=scope, + actual_bytes=len(data), + ) return cls.decode_bytes(data) def encode_bytes(self) -> bytes: @@ -115,7 +137,11 @@ def decode_bytes(cls, data: bytes) -> Self: """ expected = cls.get_byte_length() if len(data) != expected: - raise ValueError(f"{cls.__name__}: expected {expected} bytes, got {len(data)}") + raise SSZLengthError( + type_name=cls.__name__, + expected=expected, + actual=len(data), + ) bits = tuple(Boolean((data[i // 8] >> (i % 8)) & 1) for i in range(cls.LENGTH)) return cls(data=bits) @@ -144,7 +170,10 @@ class BaseBitlist(SSZModel): def _coerce_and_validate(cls, v: Any) -> tuple[Boolean, ...]: """Validate and convert input to a tuple of Boolean elements.""" if not hasattr(cls, "LIMIT"): - raise TypeError(f"{cls.__name__} must define LIMIT") + raise SSZTypeDefinitionError( + type_name=cls.__name__, + missing_attr="LIMIT", + ) # Handle various input types if isinstance(v, (list, tuple)): @@ -152,11 +181,20 @@ def _coerce_and_validate(cls, v: Any) -> tuple[Boolean, ...]: elif hasattr(v, "__iter__") and not isinstance(v, (str, bytes)): elements = list(v) else: - raise TypeError(f"Bitlist data must be iterable, got {type(v)}") + raise SSZTypeCoercionError( + expected_type="iterable", + actual_type=type(v).__name__, + value=v, + ) # Check limit if len(elements) > cls.LIMIT: - raise ValueError(f"{cls.__name__} cannot exceed {cls.LIMIT} bits, got {len(elements)}") + raise SSZLengthError( + type_name=cls.__name__, + expected=cls.LIMIT, + actual=len(elements), + is_limit=True, + ) return tuple(Boolean(bit) for bit in elements) @@ -197,8 +235,11 @@ def is_fixed_size(cls) -> bool: @classmethod def get_byte_length(cls) -> int: - """Lists are variable-size, so this raises a TypeError.""" - raise TypeError(f"{cls.__name__} is variable-size") + """Lists are variable-size, so this raises an SSZTypeDefinitionError.""" + raise SSZTypeDefinitionError( + type_name=cls.__name__, + detail="variable-size bitlist has no fixed byte length", + ) def serialize(self, stream: IO[bytes]) -> int: """Write SSZ bytes to a binary stream.""" @@ -211,7 +252,12 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: """Read SSZ bytes from a stream and return an instance.""" data = stream.read(scope) if len(data) != scope: - raise IOError(f"Expected {scope} bytes, got {len(data)}") + raise SSZStreamError( + type_name=cls.__name__, + operation="decoding", + expected_bytes=scope, + actual_bytes=len(data), + ) return cls.decode_bytes(data) def encode_bytes(self) -> bytes: @@ -254,7 +300,10 @@ def decode_bytes(cls, data: bytes) -> Self: the last data bit. All bits after the delimiter are assumed to be 0. """ if len(data) == 0: - raise ValueError("Cannot decode empty bytes to Bitlist") + raise SSZDecodeError( + type_name=cls.__name__, + detail="cannot decode empty bytes to Bitlist", + ) # Find the position of the delimiter bit (rightmost 1). delimiter_pos = None @@ -267,12 +316,20 @@ def decode_bytes(cls, data: bytes) -> Self: break if delimiter_pos is None: - raise ValueError("No delimiter bit found in Bitlist data") + raise SSZDecodeError( + type_name=cls.__name__, + detail="no delimiter bit found in Bitlist data", + ) # Extract data bits (everything before the delimiter). num_bits = delimiter_pos if num_bits > cls.LIMIT: - raise ValueError(f"{cls.__name__} decoded length {num_bits} exceeds limit {cls.LIMIT}") + raise SSZLengthError( + type_name=cls.__name__, + expected=cls.LIMIT, + actual=num_bits, + is_limit=True, + ) bits = tuple(Boolean((data[i // 8] >> (i % 8)) & 1) for i in range(num_bits)) return cls(data=bits) diff --git a/src/lean_spec/types/boolean.py b/src/lean_spec/types/boolean.py index e461c73e..481039ac 100644 --- a/src/lean_spec/types/boolean.py +++ b/src/lean_spec/types/boolean.py @@ -8,6 +8,12 @@ from pydantic_core import CoreSchema, core_schema from typing_extensions import Self +from .exceptions import ( + SSZDecodeError, + SSZStreamError, + SSZTypeCoercionError, + SSZValueError, +) from .ssz_base import SSZType @@ -31,14 +37,18 @@ def __new__(cls, value: bool | int) -> Self: Accepts only `True`, `False`, `1`, or `0`. Raises: - TypeError: If `value` is not a bool or int. - ValueError: If `value` is an integer other than 0 or 1. + SSZTypeCoercionError: If `value` is not a bool or int. + SSZDecodeError: If `value` is an integer other than 0 or 1. """ if not isinstance(value, int): - raise TypeError(f"Expected bool or int, got {type(value).__name__}") + raise SSZTypeCoercionError( + expected_type="bool or int", + actual_type=type(value).__name__, + value=value, + ) if value not in (0, 1): - raise ValueError(f"Boolean value must be 0 or 1, not {value}") + raise SSZValueError(f"Boolean value must be 0 or 1, not {value}") return super().__new__(cls, value) @@ -93,9 +103,15 @@ def encode_bytes(self) -> bytes: def decode_bytes(cls, data: bytes) -> Self: """Deserialize a single byte into a Boolean instance.""" if len(data) != 1: - raise ValueError(f"Expected 1 byte for Boolean, got {len(data)}") + raise SSZDecodeError( + type_name="Boolean", + detail=f"expected 1 byte, got {len(data)}", + ) if data[0] not in (0, 1): - raise ValueError(f"Boolean byte must be 0x00 or 0x01, got {data[0]:#04x}") + raise SSZDecodeError( + type_name="Boolean", + detail=f"byte must be 0x00 or 0x01, got {data[0]:#04x}", + ) return cls(data[0]) def serialize(self, stream: IO[bytes]) -> int: @@ -108,10 +124,18 @@ def serialize(self, stream: IO[bytes]) -> int: def deserialize(cls, stream: IO[bytes], scope: int) -> Self: """Deserialize a boolean from a binary stream.""" if scope != 1: - raise ValueError(f"Invalid scope for Boolean: expected 1, got {scope}") + raise SSZDecodeError( + type_name="Boolean", + detail=f"expected scope of 1, got {scope}", + ) data = stream.read(1) if len(data) != 1: - raise IOError("Stream ended prematurely while decoding Boolean") + raise SSZStreamError( + type_name="Boolean", + operation="decoding", + expected_bytes=1, + actual_bytes=len(data), + ) return cls.decode_bytes(data) def _raise_type_error(self, other: Any, op_symbol: str) -> None: diff --git a/src/lean_spec/types/byte_arrays.py b/src/lean_spec/types/byte_arrays.py index 5a360085..a87d64a2 100644 --- a/src/lean_spec/types/byte_arrays.py +++ b/src/lean_spec/types/byte_arrays.py @@ -16,6 +16,12 @@ from pydantic_core import core_schema from typing_extensions import Self +from .exceptions import ( + SSZDecodeError, + SSZLengthError, + SSZStreamError, + SSZTypeDefinitionError, +) from .ssz_base import SSZModel, SSZType @@ -64,14 +70,22 @@ def __new__(cls, value: Any = b"") -> Self: value: Any value coercible to bytes (see `_coerce_to_bytes`). Raises: - ValueError: If the resulting byte length differs from `LENGTH`. + SSZTypeDefinitionError: If the class doesn't define LENGTH. + SSZLengthError: If the resulting byte length differs from `LENGTH`. """ if not hasattr(cls, "LENGTH"): - raise TypeError(f"{cls.__name__} must define LENGTH") + raise SSZTypeDefinitionError( + type_name=cls.__name__, + missing_attr="LENGTH", + ) b = _coerce_to_bytes(value) if len(b) != cls.LENGTH: - raise ValueError(f"{cls.__name__} expects exactly {cls.LENGTH} bytes, got {len(b)}") + raise SSZLengthError( + type_name=cls.__name__, + expected=cls.LENGTH, + actual=len(b), + ) return super().__new__(cls, b) @classmethod @@ -112,16 +126,22 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: For a fixed-size type, `scope` must match `LENGTH`. Raises: - ValueError: if `scope` != `LENGTH`. - IOError: if the stream ends prematurely. + SSZDecodeError: if `scope` != `LENGTH`. + SSZStreamError: if the stream ends prematurely. """ if scope != cls.LENGTH: - raise ValueError( - f"Invalid scope for ByteVector[{cls.LENGTH}]: expected {cls.LENGTH}, got {scope}" + raise SSZDecodeError( + type_name=cls.__name__, + detail=f"expected {cls.LENGTH} bytes, got {scope}", ) data = stream.read(scope) if len(data) != scope: - raise IOError("Stream ended prematurely while decoding ByteVector") + raise SSZStreamError( + type_name=cls.__name__, + operation="decoding", + expected_bytes=scope, + actual_bytes=len(data), + ) return cls(data) def encode_bytes(self) -> bytes: @@ -136,7 +156,11 @@ def decode_bytes(cls, data: bytes) -> Self: For a fixed-size type, the data must be exactly `LENGTH` bytes. """ if len(data) != cls.LENGTH: - raise ValueError(f"{cls.__name__} expects exactly {cls.LENGTH} bytes, got {len(data)}") + raise SSZLengthError( + type_name=cls.__name__, + expected=cls.LENGTH, + actual=len(data), + ) return cls(data) @classmethod @@ -262,11 +286,19 @@ class BaseByteList(SSZModel): def _validate_byte_list_data(cls, v: Any) -> bytes: """Validate and convert input to bytes with limit checking.""" if not hasattr(cls, "LIMIT"): - raise TypeError(f"{cls.__name__} must define LIMIT") + raise SSZTypeDefinitionError( + type_name=cls.__name__, + missing_attr="LIMIT", + ) b = _coerce_to_bytes(v) if len(b) > cls.LIMIT: - raise ValueError(f"ByteList[{cls.LIMIT}] length {len(b)} exceeds limit {cls.LIMIT}") + raise SSZLengthError( + type_name=cls.__name__, + expected=cls.LIMIT, + actual=len(b), + is_limit=True, + ) return b @field_serializer("data", when_used="json") @@ -282,7 +314,10 @@ def is_fixed_size(cls) -> bool: @classmethod def get_byte_length(cls) -> int: """ByteList is variable-size, so this should not be called.""" - raise TypeError(f"{cls.__name__} is variable-size and has no fixed byte length") + raise SSZTypeDefinitionError( + type_name=cls.__name__, + detail="variable-size byte list has no fixed byte length", + ) def serialize(self, stream: IO[bytes]) -> int: """ @@ -303,16 +338,30 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: knows how many bytes belong to this value in its context). Raises: - ValueError: if the decoded length exceeds `LIMIT`. - IOError: if the stream ends prematurely. + SSZDecodeError: if the scope is negative. + SSZLengthError: if the decoded length exceeds `LIMIT`. + SSZStreamError: if the stream ends prematurely. """ if scope < 0: - raise ValueError("Invalid scope for ByteList: negative") + raise SSZDecodeError( + type_name=cls.__name__, + detail="negative scope", + ) if scope > cls.LIMIT: - raise ValueError(f"ByteList[{cls.LIMIT}] scope {scope} exceeds limit") + raise SSZLengthError( + type_name=cls.__name__, + expected=cls.LIMIT, + actual=scope, + is_limit=True, + ) data = stream.read(scope) if len(data) != scope: - raise IOError("Stream ended prematurely while decoding ByteList") + raise SSZStreamError( + type_name=cls.__name__, + operation="decoding", + expected_bytes=scope, + actual_bytes=len(data), + ) return cls(data=data) def encode_bytes(self) -> bytes: @@ -327,7 +376,12 @@ def decode_bytes(cls, data: bytes) -> Self: For variable-size types, the data length must be `<= LIMIT`. """ if len(data) > cls.LIMIT: - raise ValueError(f"ByteList[{cls.LIMIT}] length {len(data)} exceeds limit") + raise SSZLengthError( + type_name=cls.__name__, + expected=cls.LIMIT, + actual=len(data), + is_limit=True, + ) return cls(data=data) def __bytes__(self) -> bytes: diff --git a/src/lean_spec/types/collections.py b/src/lean_spec/types/collections.py index 9c5924d1..6ffe9bbd 100644 --- a/src/lean_spec/types/collections.py +++ b/src/lean_spec/types/collections.py @@ -22,6 +22,13 @@ from lean_spec.types.constants import OFFSET_BYTE_LENGTH from .byte_arrays import BaseBytes +from .exceptions import ( + SSZDecodeError, + SSZLengthError, + SSZOffsetError, + SSZTypeCoercionError, + SSZTypeDefinitionError, +) from .ssz_base import SSZModel, SSZType from .uint import Uint32 @@ -103,7 +110,10 @@ def _serialize_data(self, value: Sequence[T]) -> list[Any]: def _validate_vector_data(cls, v: Any) -> tuple[SSZType, ...]: """Validate and convert input to a typed tuple of exactly LENGTH elements.""" if not hasattr(cls, "ELEMENT_TYPE") or not hasattr(cls, "LENGTH"): - raise TypeError(f"{cls.__name__} must define ELEMENT_TYPE and LENGTH") + raise SSZTypeDefinitionError( + type_name=cls.__name__, + detail="must define ELEMENT_TYPE and LENGTH", + ) if not isinstance(v, (list, tuple)): v = tuple(v) @@ -115,9 +125,10 @@ def _validate_vector_data(cls, v: Any) -> tuple[SSZType, ...]: ) if len(typed_values) != cls.LENGTH: - raise ValueError( - f"{cls.__name__} requires exactly {cls.LENGTH} items, " - f"but {len(typed_values)} were provided." + raise SSZLengthError( + type_name=cls.__name__, + expected=cls.LENGTH, + actual=len(typed_values), ) return typed_values @@ -131,7 +142,10 @@ def is_fixed_size(cls) -> bool: def get_byte_length(cls) -> int: """Get the byte length if the SSZVector is fixed-size.""" if not cls.is_fixed_size(): - raise TypeError(f"{cls.__name__} is not a fixed-size type.") + raise SSZTypeDefinitionError( + type_name=cls.__name__, + detail="variable-size vector has no fixed byte length", + ) return cls.ELEMENT_TYPE.get_byte_length() * cls.LENGTH def serialize(self, stream: IO[bytes]) -> int: @@ -162,9 +176,9 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: if cls.is_fixed_size(): elem_byte_length = cls.get_byte_length() // cls.LENGTH if scope != cls.get_byte_length(): - raise ValueError( - f"Invalid scope for {cls.__name__}: " - f"expected {cls.get_byte_length()}, got {scope}" + raise SSZDecodeError( + type_name=cls.__name__, + detail=f"expected {cls.get_byte_length()} bytes, got {scope}", ) elements = [ cls.ELEMENT_TYPE.deserialize(stream, elem_byte_length) for _ in range(cls.LENGTH) @@ -175,7 +189,11 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: # The first offset tells us where the data starts, which must be after all offsets. first_offset = int(Uint32.deserialize(stream, OFFSET_BYTE_LENGTH)) if first_offset != cls.LENGTH * OFFSET_BYTE_LENGTH: - raise ValueError("Invalid first offset in variable-size vector.") + raise SSZOffsetError( + type_name=cls.__name__, + start_offset=first_offset, + end_offset=cls.LENGTH * OFFSET_BYTE_LENGTH, + ) # Read the remaining offsets and add the total scope as the final boundary. offsets = [first_offset] + [ int(Uint32.deserialize(stream, OFFSET_BYTE_LENGTH)) for _ in range(cls.LENGTH - 1) @@ -185,7 +203,11 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: for i in range(cls.LENGTH): start, end = offsets[i], offsets[i + 1] if start > end: - raise ValueError(f"Invalid offsets: start {start} > end {end}") + raise SSZOffsetError( + type_name=cls.__name__, + start_offset=start, + end_offset=end, + ) elements.append(cls.ELEMENT_TYPE.deserialize(stream, end - start)) return cls(data=elements) @@ -294,7 +316,10 @@ def _serialize_data(self, value: Sequence[T]) -> list[Any]: def _validate_list_data(cls, v: Any) -> tuple[SSZType, ...]: """Validate and convert input to a tuple of SSZType elements.""" if not hasattr(cls, "ELEMENT_TYPE") or not hasattr(cls, "LIMIT"): - raise TypeError(f"{cls.__name__} must define ELEMENT_TYPE and LIMIT") + raise SSZTypeDefinitionError( + type_name=cls.__name__, + detail="must define ELEMENT_TYPE and LIMIT", + ) # Handle various input types if isinstance(v, (list, tuple)): @@ -302,25 +327,34 @@ def _validate_list_data(cls, v: Any) -> tuple[SSZType, ...]: elif hasattr(v, "__iter__") and not isinstance(v, (str, bytes)): elements = list(v) else: - raise TypeError(f"List data must be iterable, got {type(v)}") + raise SSZTypeCoercionError( + expected_type="iterable", + actual_type=type(v).__name__, + value=v, + ) # Check limit if len(elements) > cls.LIMIT: - raise ValueError( - f"{cls.__name__} cannot contain more than {cls.LIMIT} elements, got {len(elements)}" + raise SSZLengthError( + type_name=cls.__name__, + expected=cls.LIMIT, + actual=len(elements), + is_limit=True, ) # Convert and validate each element typed_values = [] - for i, element in enumerate(elements): + for element in elements: if isinstance(element, cls.ELEMENT_TYPE): typed_values.append(element) else: try: typed_values.append(cast(Any, cls.ELEMENT_TYPE)(element)) except Exception as e: - raise ValueError( - f"Element {i} cannot be converted to {cls.ELEMENT_TYPE.__name__}: {e}" + raise SSZTypeCoercionError( + expected_type=cls.ELEMENT_TYPE.__name__, + actual_type=type(element).__name__, + value=element, ) from e return tuple(typed_values) @@ -342,8 +376,11 @@ def is_fixed_size(cls) -> bool: @classmethod def get_byte_length(cls) -> int: - """Lists are variable-size, so this raises a TypeError.""" - raise TypeError(f"{cls.__name__} is variable-size") + """Lists are variable-size, so this raises an SSZTypeDefinitionError.""" + raise SSZTypeDefinitionError( + type_name=cls.__name__, + detail="variable-size list has no fixed byte length", + ) def serialize(self, stream: IO[bytes]) -> int: """Serialize the list to a binary stream.""" @@ -372,11 +409,19 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: # Fixed-size elements: read them back-to-back element_size = cls.ELEMENT_TYPE.get_byte_length() if scope % element_size != 0: - raise ValueError(f"Scope {scope} is not divisible by element size {element_size}") + raise SSZDecodeError( + type_name=cls.__name__, + detail=f"scope {scope} is not divisible by element size {element_size}", + ) num_elements = scope // element_size if num_elements > cls.LIMIT: - raise ValueError(f"Too many elements: {num_elements} > {cls.LIMIT}") + raise SSZLengthError( + type_name=cls.__name__, + expected=cls.LIMIT, + actual=num_elements, + is_limit=True, + ) elements = [ cls.ELEMENT_TYPE.deserialize(stream, element_size) for _ in range(num_elements) @@ -389,16 +434,28 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: # Empty list case return cls(data=[]) if scope < OFFSET_BYTE_LENGTH: - raise ValueError(f"Invalid scope for variable-size list: {scope}") + raise SSZDecodeError( + type_name=cls.__name__, + detail=f"scope {scope} too small for variable-size list", + ) # Read the first offset to determine the number of elements. first_offset = int(Uint32.deserialize(stream, OFFSET_BYTE_LENGTH)) if first_offset > scope or first_offset % OFFSET_BYTE_LENGTH != 0: - raise ValueError("Invalid first offset in list.") + raise SSZOffsetError( + type_name=cls.__name__, + start_offset=first_offset, + end_offset=scope, + ) count = first_offset // OFFSET_BYTE_LENGTH if count > cls.LIMIT: - raise ValueError(f"Decoded list length {count} exceeds limit of {cls.LIMIT}") + raise SSZLengthError( + type_name=cls.__name__, + expected=cls.LIMIT, + actual=count, + is_limit=True, + ) # Read the rest of the offsets. offsets = [first_offset] + [ @@ -411,7 +468,11 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: for i in range(count): start, end = offsets[i], offsets[i + 1] if start > end: - raise ValueError(f"Invalid offsets: start {start} > end {end}") + raise SSZOffsetError( + type_name=cls.__name__, + start_offset=start, + end_offset=end, + ) elements.append(cls.ELEMENT_TYPE.deserialize(stream, end - start)) return cls(data=elements) diff --git a/src/lean_spec/types/container.py b/src/lean_spec/types/container.py index 4caa88be..450dce32 100644 --- a/src/lean_spec/types/container.py +++ b/src/lean_spec/types/container.py @@ -16,6 +16,12 @@ from typing_extensions import Self from .constants import OFFSET_BYTE_LENGTH +from .exceptions import ( + SSZOffsetError, + SSZStreamError, + SSZTypeCoercionError, + SSZTypeDefinitionError, +) from .ssz_base import SSZModel, SSZType from .uint import Uint32 @@ -31,13 +37,14 @@ def _get_ssz_field_type(annotation: Any) -> Type[SSZType]: The SSZType class. Raises: - TypeError: If the annotation is not a valid SSZType class. + SSZTypeCoercionError: If the annotation is not a valid SSZType class. """ # Check if it's a class and is a subclass of SSZType if not (inspect.isclass(annotation) and issubclass(annotation, SSZType)): - raise TypeError( - f"Field annotation {annotation} is not a valid SSZType class. " - f"Container fields must be concrete SSZType subclasses." + raise SSZTypeCoercionError( + expected_type="SSZType subclass", + actual_type=str(annotation), + value=annotation, ) return annotation @@ -98,11 +105,14 @@ def get_byte_length(cls) -> int: Total byte length of all fields summed together. Raises: - TypeError: If called on a variable-size container. + SSZTypeDefinitionError: If called on a variable-size container. """ # Only fixed-size containers have a deterministic byte length if not cls.is_fixed_size(): - raise TypeError(f"{cls.__name__} is variable-size") + raise SSZTypeDefinitionError( + type_name=cls.__name__, + detail="variable-size containers have no fixed byte length", + ) # Sum the byte lengths of all fixed-size fields return sum( @@ -185,8 +195,8 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: New container instance with deserialized values. Raises: - IOError: If stream ends unexpectedly. - ValueError: If offsets are invalid. + SSZStreamError: If stream ends unexpectedly. + SSZOffsetError: If offsets are invalid. """ fields = {} # Collected field values var_fields = [] # (name, type, offset) for variable fields @@ -201,14 +211,24 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: size = field_type.get_byte_length() data = stream.read(size) if len(data) != size: - raise IOError(f"Unexpected EOF reading {field_name}") + raise SSZStreamError( + type_name=cls.__name__, + operation=f"reading field '{field_name}'", + expected_bytes=size, + actual_bytes=len(data), + ) fields[field_name] = field_type.decode_bytes(data) bytes_read += size else: # Read offset pointer for variable field offset_bytes = stream.read(OFFSET_BYTE_LENGTH) if len(offset_bytes) != OFFSET_BYTE_LENGTH: - raise IOError(f"Unexpected EOF reading offset for {field_name}") + raise SSZStreamError( + type_name=cls.__name__, + operation=f"reading offset for field '{field_name}'", + expected_bytes=OFFSET_BYTE_LENGTH, + actual_bytes=len(offset_bytes), + ) offset = int(Uint32.decode_bytes(offset_bytes)) var_fields.append((field_name, field_type, offset)) bytes_read += OFFSET_BYTE_LENGTH @@ -219,7 +239,12 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: var_section_size = scope - bytes_read var_section = stream.read(var_section_size) if len(var_section) != var_section_size: - raise IOError("Unexpected EOF in variable section") + raise SSZStreamError( + type_name=cls.__name__, + operation="reading variable section", + expected_bytes=var_section_size, + actual_bytes=len(var_section), + ) # Extract each variable field using offsets offsets = [offset for _, _, offset in var_fields] + [scope] @@ -231,7 +256,12 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: # Validate offset bounds if rel_start < 0 or rel_start > rel_end: - raise ValueError(f"Invalid offsets for {name}") + raise SSZOffsetError( + type_name=cls.__name__, + field_name=name, + start_offset=start, + end_offset=end, + ) # Deserialize field from its slice field_data = var_section[rel_start:rel_end] diff --git a/src/lean_spec/types/exceptions.py b/src/lean_spec/types/exceptions.py new file mode 100644 index 00000000..673e8d3d --- /dev/null +++ b/src/lean_spec/types/exceptions.py @@ -0,0 +1,287 @@ +"""Exception hierarchy for the SSZ type system.""" + +from __future__ import annotations + +from typing import Any + + +class SSZError(Exception): + """ + Base exception for all SSZ-related errors. + + Attributes: + message: Human-readable error description. + """ + + def __init__(self, message: str) -> None: + self.message = message + super().__init__(message) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}({self.message!r})" + + +class SSZTypeError(SSZError): + """Base class for type-related errors.""" + + +class SSZTypeDefinitionError(SSZTypeError): + """ + Raised when an SSZ type class is incorrectly defined. + + Attributes: + type_name: The name of the type with the definition error. + missing_attr: The missing or invalid attribute name. + detail: Additional context about the error. + """ + + def __init__( + self, + type_name: str, + *, + missing_attr: str | None = None, + detail: str | None = None, + ) -> None: + self.type_name = type_name + self.missing_attr = missing_attr + self.detail = detail + + if missing_attr: + msg = f"{type_name} must define {missing_attr}" + elif detail: + msg = f"{type_name}: {detail}" + else: + msg = f"{type_name} has an invalid type definition" + + super().__init__(msg) + + +class SSZTypeCoercionError(SSZTypeError): + """ + Raised when a value cannot be coerced to the expected SSZ type. + + Attributes: + expected_type: The type that was expected. + actual_type: The actual type of the value. + value: The value that couldn't be coerced (may be truncated for display). + """ + + def __init__( + self, + expected_type: str, + actual_type: str, + value: Any = None, + ) -> None: + self.expected_type = expected_type + self.actual_type = actual_type + self.value = value + + msg = f"Expected {expected_type}, got {actual_type}" + if value is not None: + value_repr = repr(value) + if len(value_repr) > 50: + value_repr = value_repr[:47] + "..." + msg = f"{msg}: {value_repr}" + + super().__init__(msg) + + +class SSZValueError(SSZError): + """ + Base class for value-related errors. + + Raised when a value is invalid for an SSZ operation, even if the type is correct. + """ + + +class SSZOverflowError(SSZValueError): + """ + Raised when a numeric value is outside the valid range. + + Attributes: + value: The value that caused the overflow. + type_name: The SSZ type that couldn't hold the value. + min_value: The minimum allowed value (inclusive). + max_value: The maximum allowed value (inclusive). + """ + + def __init__( + self, + value: int, + type_name: str, + *, + min_value: int = 0, + max_value: int, + ) -> None: + self.value = value + self.type_name = type_name + self.min_value = min_value + self.max_value = max_value + + super().__init__( + f"{value} is out of range for {type_name} (valid range: [{min_value}, {max_value}])" + ) + + +class SSZLengthError(SSZValueError): + """ + Raised when a sequence has incorrect length. + + Attributes: + type_name: The SSZ type with the length constraint. + expected: The expected length (exact for vectors, max for lists). + actual: The actual length received. + is_limit: True if expected is a maximum limit, False if exact. + """ + + def __init__( + self, + type_name: str, + *, + expected: int, + actual: int, + is_limit: bool = False, + ) -> None: + self.type_name = type_name + self.expected = expected + self.actual = actual + self.is_limit = is_limit + + if is_limit: + msg = f"{type_name} cannot exceed {expected} elements, got {actual}" + else: + msg = f"{type_name} requires exactly {expected} elements, got {actual}" + + super().__init__(msg) + + +class SSZSerializationError(SSZError): + """Base class for serialization-related errors.""" + + +class SSZDecodeError(SSZSerializationError): + """ + Raised when decoding SSZ bytes to a value fails. + + Attributes: + type_name: The type being decoded. + detail: Description of what went wrong. + offset: The byte offset where the error occurred (if known). + """ + + def __init__( + self, + type_name: str, + detail: str, + *, + offset: int | None = None, + ) -> None: + self.type_name = type_name + self.detail = detail + self.offset = offset + + msg = f"Failed to decode {type_name}: {detail}" + if offset is not None: + msg = f"{msg} (at byte offset {offset})" + + super().__init__(msg) + + +class SSZStreamError(SSZSerializationError): + """ + Raised when a stream/IO error occurs during SSZ operations. + + Attributes: + type_name: The type being processed when the error occurred. + operation: The operation being performed (e.g., "read", "decode"). + expected_bytes: Number of bytes expected (if applicable). + actual_bytes: Number of bytes received (if applicable). + """ + + def __init__( + self, + type_name: str, + operation: str, + *, + expected_bytes: int | None = None, + actual_bytes: int | None = None, + ) -> None: + self.type_name = type_name + self.operation = operation + self.expected_bytes = expected_bytes + self.actual_bytes = actual_bytes + + if expected_bytes is not None and actual_bytes is not None: + msg = ( + f"Stream error while {operation} {type_name}: " + f"expected {expected_bytes} bytes, got {actual_bytes}" + ) + elif expected_bytes is not None: + msg = ( + f"Stream ended prematurely while {operation} {type_name}: " + f"needed {expected_bytes} bytes" + ) + else: + msg = f"Stream error while {operation} {type_name}" + + super().__init__(msg) + + +class SSZOffsetError(SSZDecodeError): + """ + Raised when SSZ offset parsing fails during variable-size decoding. + + Attributes: + type_name: The container or collection type being decoded. + field_name: The field with the invalid offset (if applicable). + start_offset: The start offset value. + end_offset: The end offset value. + """ + + def __init__( + self, + type_name: str, + *, + field_name: str | None = None, + start_offset: int | None = None, + end_offset: int | None = None, + ) -> None: + self.field_name = field_name + self.start_offset = start_offset + self.end_offset = end_offset + + if field_name and start_offset is not None and end_offset is not None: + detail = ( + f"invalid offsets for field '{field_name}' (start={start_offset}, end={end_offset})" + ) + elif field_name: + detail = f"invalid offset for field '{field_name}'" + elif start_offset is not None and end_offset is not None: + detail = f"invalid offsets: start={start_offset} > end={end_offset}" + else: + detail = "invalid offset structure" + + super().__init__(type_name, detail) + + +class SSZSelectorError(SSZDecodeError): + """ + Raised when a Union selector is invalid. + + Attributes: + type_name: The Union type being decoded. + selector: The invalid selector value. + num_options: The number of valid options. + """ + + def __init__( + self, + type_name: str, + selector: int, + num_options: int, + ) -> None: + self.selector = selector + self.num_options = num_options + + detail = f"selector {selector} out of range for {num_options} options" + super().__init__(type_name, detail) diff --git a/src/lean_spec/types/uint.py b/src/lean_spec/types/uint.py index b3274966..fa3c3b28 100644 --- a/src/lean_spec/types/uint.py +++ b/src/lean_spec/types/uint.py @@ -8,6 +8,12 @@ from pydantic_core import core_schema from typing_extensions import Self +from .exceptions import ( + SSZDecodeError, + SSZOverflowError, + SSZStreamError, + SSZTypeCoercionError, +) from .ssz_base import SSZType @@ -22,16 +28,24 @@ def __new__(cls, value: SupportsInt) -> Self: Create and validate a new Uint instance. Raises: - TypeError: If `value` is not an int (rejects bool, string, float). - OverflowError: If `value` is outside the allowed range [0, 2**BITS - 1]. + SSZTypeCoercionError: If `value` is not an int (rejects bool, string, float). + SSZOverflowError: If `value` is outside the allowed range [0, 2**BITS - 1]. """ # We should accept only ints. if not isinstance(value, int) or isinstance(value, bool): - raise TypeError(f"Expected int, got {type(value).__name__}") + raise SSZTypeCoercionError( + expected_type="int", + actual_type=type(value).__name__, + value=value, + ) int_value = int(value) if not (0 <= int_value < (2**cls.BITS)): - raise OverflowError(f"{int_value} is out of range for {cls.__name__}") + raise SSZOverflowError( + value=int_value, + type_name=cls.__name__, + max_value=2**cls.BITS - 1, + ) return super().__new__(cls, int_value) @classmethod @@ -104,7 +118,7 @@ def decode_bytes(cls, data: bytes) -> Self: data (bytes): The SSZ byte string to deserialize. Raises: - ValueError: If the byte string has an incorrect length. + SSZDecodeError: If the byte string has an incorrect length. Returns: Self: A new instance of the Uint class. @@ -112,9 +126,9 @@ def decode_bytes(cls, data: bytes) -> Self: # Ensure the input data has the correct number of bytes. expected_length = cls.get_byte_length() if len(data) != expected_length: - raise ValueError( - f"Invalid byte length for {cls.__name__}: " - f"expected {expected_length}, got {len(data)}" + raise SSZDecodeError( + type_name=cls.__name__, + detail=f"expected {expected_length} bytes, got {len(data)}", ) # The `from_bytes` class method from `int` is used to convert the data. return cls(int.from_bytes(data, "little")) @@ -146,7 +160,8 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: scope (int): The number of bytes available to read for this object. Raises: - ValueError: If the scope does not match the type's byte length. + SSZDecodeError: If the scope does not match the type's byte length. + SSZStreamError: If the stream ends prematurely. Returns: Self: A new instance of the Uint class. @@ -154,14 +169,20 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: # For a fixed-size type, the scope must exactly match the byte length. byte_length = cls.get_byte_length() if scope != byte_length: - raise ValueError( - f"Invalid scope for {cls.__name__}: expected {byte_length}, got {scope}" + raise SSZDecodeError( + type_name=cls.__name__, + detail=f"invalid scope: expected {byte_length} bytes, got {scope}", ) # Read the required number of bytes from the stream. data = stream.read(byte_length) # Ensure the correct number of bytes was read. if len(data) != byte_length: - raise IOError(f"Stream ended prematurely while decoding {cls.__name__}") + raise SSZStreamError( + type_name=cls.__name__, + operation="decoding", + expected_bytes=byte_length, + actual_bytes=len(data), + ) # Decode the bytes into a new instance. return cls.decode_bytes(data) diff --git a/src/lean_spec/types/union.py b/src/lean_spec/types/union.py index 13571b49..5c231cf5 100644 --- a/src/lean_spec/types/union.py +++ b/src/lean_spec/types/union.py @@ -16,6 +16,13 @@ from pydantic import model_validator from typing_extensions import Self +from .exceptions import ( + SSZDecodeError, + SSZSelectorError, + SSZStreamError, + SSZTypeCoercionError, + SSZTypeDefinitionError, +) from .ssz_base import SSZModel, SSZType # Constants for Union implementation @@ -103,26 +110,42 @@ def _validate_union_data(cls, data: Any) -> dict[str, Any]: """Validate selector and value together.""" # Check required class attributes and get options if not hasattr(cls, "OPTIONS") or not isinstance(cls.OPTIONS, tuple): - raise TypeError(f"{cls.__name__} must define OPTIONS as a tuple of SSZ types") + raise SSZTypeDefinitionError( + type_name=cls.__name__, + detail="must define OPTIONS as a tuple of SSZ types", + ) options, options_count = cls.OPTIONS, len(cls.OPTIONS) # Validate OPTIONS constraints if options_count == 0: - raise TypeError(f"{cls.__name__} OPTIONS cannot be empty") + raise SSZTypeDefinitionError( + type_name=cls.__name__, + detail="OPTIONS cannot be empty", + ) if options_count > MAX_UNION_OPTIONS: - raise TypeError( - f"{cls.__name__} has {options_count} options, but maximum is {MAX_UNION_OPTIONS}" + raise SSZTypeDefinitionError( + type_name=cls.__name__, + detail=f"has {options_count} options, but maximum is {MAX_UNION_OPTIONS}", ) if options[0] is None and options_count == 1: - raise TypeError(f"{cls.__name__} cannot have None as the only option") + raise SSZTypeDefinitionError( + type_name=cls.__name__, + detail="cannot have None as the only option", + ) # Validate None placement (only at index 0) and types for i, opt in enumerate(options): if opt is None and i != 0: - raise TypeError(f"{cls.__name__} can only have None at index 0, found at index {i}") + raise SSZTypeDefinitionError( + type_name=cls.__name__, + detail=f"can only have None at index 0, found at index {i}", + ) elif opt is not None and not isinstance(opt, type): - raise TypeError(f"{cls.__name__} option {i} must be a type, got {type(opt)}") + raise SSZTypeDefinitionError( + type_name=cls.__name__, + detail=f"option {i} must be a type, got {type(opt)}", + ) # Extract selector and value from input selector = data.get("selector") @@ -130,12 +153,20 @@ def _validate_union_data(cls, data: Any) -> dict[str, Any]: # Validate selector if not isinstance(selector, int) or not 0 <= selector < options_count: - raise ValueError(f"Invalid selector {selector} for {options_count} options") + raise SSZSelectorError( + type_name=cls.__name__, + selector=selector if isinstance(selector, int) else -1, + num_options=options_count, + ) # Handle None option if (selected_type := options[selector]) is None: if value is not None: - raise TypeError("Selected option is None, therefore value must be None") + raise SSZTypeCoercionError( + expected_type="None", + actual_type=type(value).__name__, + value=value, + ) return {"selector": selector, "value": None} # Handle non-None option - coerce value if needed @@ -146,8 +177,10 @@ def _validate_union_data(cls, data: Any) -> dict[str, Any]: coerced_value = cast(Any, selected_type)(value) return {"selector": selector, "value": coerced_value} except Exception as e: - raise TypeError( - f"Cannot coerce {type(value).__name__} to {selected_type.__name__}: {e}" + raise SSZTypeCoercionError( + expected_type=selected_type.__name__, + actual_type=type(value).__name__, + value=value, ) from e @property @@ -168,7 +201,10 @@ def is_fixed_size(cls) -> bool: @classmethod def get_byte_length(cls) -> int: """Union types are variable-size and don't have fixed length.""" - raise TypeError(f"{cls.__name__} is variable-size") + raise SSZTypeDefinitionError( + type_name=cls.__name__, + detail="variable-size union has no fixed byte length", + ) def serialize(self, stream: IO[bytes]) -> int: """Serialize this Union to a byte stream in SSZ format.""" @@ -183,40 +219,63 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: """Deserialize a Union from a byte stream using SSZ format.""" # Validate scope for selector byte if scope < SELECTOR_BYTE_SIZE: - raise ValueError("Scope too small for Union selector") + raise SSZDecodeError( + type_name=cls.__name__, + detail="scope too small for Union selector", + ) # Read selector byte selector_bytes = stream.read(SELECTOR_BYTE_SIZE) if len(selector_bytes) != SELECTOR_BYTE_SIZE: - raise IOError("Stream ended reading Union selector") + raise SSZStreamError( + type_name=cls.__name__, + operation="reading selector", + expected_bytes=SELECTOR_BYTE_SIZE, + actual_bytes=len(selector_bytes), + ) selector = int.from_bytes(selector_bytes, byteorder="little") remaining_bytes = scope - SELECTOR_BYTE_SIZE # Validate selector range if not 0 <= selector < len(cls.OPTIONS): - raise ValueError(f"Selector {selector} out of range for {len(cls.OPTIONS)} options") + raise SSZSelectorError( + type_name=cls.__name__, + selector=selector, + num_options=len(cls.OPTIONS), + ) selected_type = cls.OPTIONS[selector] # Handle None option if selected_type is None: if remaining_bytes != 0: - raise ValueError("Invalid encoding: None arm must have no payload bytes") + raise SSZDecodeError( + type_name=cls.__name__, + detail="None arm must have no payload bytes", + ) return cls(selector=selector, value=None) # Handle non-None option if selected_type.is_fixed_size() and hasattr(selected_type, "get_byte_length"): required_bytes = selected_type.get_byte_length() if remaining_bytes < required_bytes: - raise IOError(f"Need {required_bytes} bytes, got {remaining_bytes}") + raise SSZStreamError( + type_name=cls.__name__, + operation=f"deserializing {selected_type.__name__}", + expected_bytes=required_bytes, + actual_bytes=remaining_bytes, + ) # Deserialize value try: value = selected_type.deserialize(stream, remaining_bytes) return cls(selector=selector, value=value) except Exception as e: - raise IOError(f"Failed to deserialize {selected_type.__name__}: {e}") from e + raise SSZDecodeError( + type_name=cls.__name__, + detail=f"failed to deserialize {selected_type.__name__}: {e}", + ) from e def encode_bytes(self) -> bytes: """Encode this Union to bytes.""" diff --git a/tests/lean_spec/types/test_bitfields.py b/tests/lean_spec/types/test_bitfields.py index 554c6255..5066a9ab 100644 --- a/tests/lean_spec/types/test_bitfields.py +++ b/tests/lean_spec/types/test_bitfields.py @@ -9,6 +9,15 @@ from lean_spec.types.bitfields import BaseBitlist, BaseBitvector from lean_spec.types.boolean import Boolean +from lean_spec.types.exceptions import ( + SSZDecodeError, + SSZLengthError, + SSZStreamError, + SSZTypeDefinitionError, +) + +# Type alias for errors that can be SSZLengthError or wrapped in ValidationError +LengthOrValidationError = (SSZLengthError, ValidationError) # Define bitfield types at module level for reuse and model classes @@ -54,7 +63,7 @@ class Bitvector16(BaseBitvector): def test_instantiate_raw_type_raises_error(self) -> None: """Tests that the raw, non-specialized BaseBitvector cannot be instantiated.""" - with pytest.raises(TypeError, match="must define LENGTH"): + with pytest.raises(SSZTypeDefinitionError, match="must define LENGTH"): BaseBitvector(data=[]) def test_instantiation_success(self) -> None: @@ -74,7 +83,7 @@ def test_instantiation_success(self) -> None: ) def test_instantiation_with_wrong_length_raises_error(self, values: list[Boolean]) -> None: """Tests that providing the wrong number of items during instantiation fails.""" - with pytest.raises(ValueError, match="requires exactly 4 bits"): + with pytest.raises(LengthOrValidationError): Bitvector4(data=values) def test_pydantic_validation_accepts_valid_list(self) -> None: @@ -93,7 +102,7 @@ def test_pydantic_validation_accepts_valid_list(self) -> None: ) def test_pydantic_validation_rejects_invalid_values(self, invalid_value: Any) -> None: """Tests that Pydantic validation rejects lists of the wrong length.""" - with pytest.raises(ValidationError): + with pytest.raises(LengthOrValidationError): Bitvector4Model(value=invalid_value) def test_bitvector_is_immutable(self) -> None: @@ -125,7 +134,7 @@ class Bitlist16(BaseBitlist): def test_instantiate_raw_type_raises_error(self) -> None: """Tests that the raw, non-specialized BaseBitlist cannot be instantiated.""" - with pytest.raises(TypeError, match="must define LIMIT"): + with pytest.raises(SSZTypeDefinitionError, match="must define LIMIT"): BaseBitlist(data=[]) def test_instantiation_success(self) -> None: @@ -141,7 +150,7 @@ def test_instantiation_over_limit_raises_error(self) -> None: class Bitlist4(BaseBitlist): LIMIT = 4 - with pytest.raises(ValueError, match="cannot exceed 4 bits"): + with pytest.raises(LengthOrValidationError): Bitlist4(data=[Boolean(b) for b in [True, False, True, False, True]]) def test_pydantic_validation_accepts_valid_list(self) -> None: @@ -159,7 +168,7 @@ def test_pydantic_validation_accepts_valid_list(self) -> None: ) def test_pydantic_validation_rejects_invalid_values(self, invalid_value: Any) -> None: """Tests that Pydantic validation rejects lists that exceed the limit.""" - with pytest.raises(ValidationError): + with pytest.raises(LengthOrValidationError): Bitlist8Model(value=invalid_value) def test_add_with_list(self) -> None: @@ -197,7 +206,7 @@ class Bitlist4(BaseBitlist): LIMIT = 4 bitlist = Bitlist4(data=[Boolean(True), Boolean(False), Boolean(True)]) - with pytest.raises(ValueError, match="cannot exceed 4 bits"): + with pytest.raises(LengthOrValidationError): bitlist + [Boolean(False), Boolean(True)] @@ -267,7 +276,7 @@ def test_bitvector_decode_invalid_length(self) -> None: class Bitvector8(BaseBitvector): LENGTH = 8 - with pytest.raises(ValueError, match="expected 1 bytes, got 2"): + with pytest.raises(SSZLengthError, match="requires exactly 1"): Bitvector8.decode_bytes(b"\x01\x02") # Expects 1 byte, gets 2 def test_bitlist_decode_invalid_data(self) -> None: @@ -276,7 +285,7 @@ def test_bitlist_decode_invalid_data(self) -> None: class Bitlist8(BaseBitlist): LIMIT = 8 - with pytest.raises(ValueError, match="Cannot decode empty bytes"): + with pytest.raises(SSZDecodeError, match="cannot decode empty bytes"): Bitlist8.decode_bytes(b"") @@ -295,7 +304,7 @@ class Bitlist10(BaseBitlist): LIMIT = 10 assert Bitlist10.is_fixed_size() is False - with pytest.raises(TypeError): + with pytest.raises(SSZTypeDefinitionError): Bitlist10.get_byte_length() def test_bitvector_deserialize_invalid_scope(self) -> None: @@ -303,7 +312,7 @@ class Bitvector8(BaseBitvector): LENGTH = 8 stream = io.BytesIO(b"\xff") - with pytest.raises(ValueError, match="expected 1 bytes, got 2"): + with pytest.raises(SSZDecodeError, match="expected 1 bytes, got 2"): Bitvector8.deserialize(stream, scope=2) def test_bitvector_deserialize_premature_end(self) -> None: @@ -311,7 +320,7 @@ class Bitvector16(BaseBitvector): LENGTH = 16 stream = io.BytesIO(b"\xff") # Only 1 byte, expects 2 - with pytest.raises(IOError, match="Expected 2 bytes, got 1"): + with pytest.raises(SSZStreamError, match="expected 2 bytes, got 1"): Bitvector16.deserialize(stream, scope=2) def test_bitlist_deserialize_premature_end(self) -> None: @@ -319,7 +328,7 @@ class Bitlist16(BaseBitlist): LIMIT = 16 stream = io.BytesIO(b"\xff") # Only 1 byte - with pytest.raises(IOError, match="Expected 2 bytes, got 1"): + with pytest.raises(SSZStreamError, match="expected 2 bytes, got 1"): Bitlist16.deserialize(stream, scope=2) # Scope says to read 2 @pytest.mark.parametrize( diff --git a/tests/lean_spec/types/test_boolean.py b/tests/lean_spec/types/test_boolean.py index 7e6168ba..89536c30 100644 --- a/tests/lean_spec/types/test_boolean.py +++ b/tests/lean_spec/types/test_boolean.py @@ -7,6 +7,12 @@ from pydantic import BaseModel, ValidationError from lean_spec.types.boolean import Boolean +from lean_spec.types.exceptions import ( + SSZDecodeError, + SSZStreamError, + SSZTypeCoercionError, + SSZValueError, +) class BooleanModel(BaseModel): @@ -39,15 +45,15 @@ def test_instantiation_from_valid_types(valid_value: bool | int) -> None: @pytest.mark.parametrize("invalid_int", [-1, 2, 100]) def test_instantiation_from_invalid_int_raises_error(invalid_int: int) -> None: - """Tests that instantiating with an int other than 0 or 1 raises ValueError.""" - with pytest.raises(ValueError, match="Boolean value must be 0 or 1"): + """Tests that instantiating with an int other than 0 or 1 raises SSZValueError.""" + with pytest.raises(SSZValueError, match="Boolean value must be 0 or 1"): Boolean(invalid_int) @pytest.mark.parametrize("invalid_type", [1.0, "True", b"\x01", None]) def test_instantiation_from_invalid_types_raises_error(invalid_type: Any) -> None: - """Tests that instantiating with non-bool/non-int types raises a TypeError.""" - with pytest.raises(TypeError, match="Expected bool or int"): + """Tests that instantiating with non-bool/non-int types raises SSZTypeCoercionError.""" + with pytest.raises(SSZTypeCoercionError, match="Expected bool or int"): Boolean(invalid_type) @@ -210,16 +216,16 @@ def test_encode_decode_roundtrip(self, value: bool, expected_bytes: bytes) -> No def test_decode_invalid_length(self) -> None: """Tests that decode_bytes fails with incorrect byte length.""" - with pytest.raises(ValueError, match="Expected 1 byte"): + with pytest.raises(SSZDecodeError, match="expected 1 byte"): Boolean.decode_bytes(b"") - with pytest.raises(ValueError, match="Expected 1 byte"): + with pytest.raises(SSZDecodeError, match="expected 1 byte"): Boolean.decode_bytes(b"\x00\x01") def test_decode_invalid_value(self) -> None: """Tests that decode_bytes fails with an invalid byte value.""" - with pytest.raises(ValueError, match="must be 0x00 or 0x01"): + with pytest.raises(SSZDecodeError, match="must be 0x00 or 0x01"): Boolean.decode_bytes(b"\x02") - with pytest.raises(ValueError, match="must be 0x00 or 0x01"): + with pytest.raises(SSZDecodeError, match="must be 0x00 or 0x01"): Boolean.decode_bytes(b"\xff") @pytest.mark.parametrize("value", [True, False]) @@ -241,15 +247,15 @@ def test_serialize_deserialize_roundtrip(self, value: bool) -> None: def test_deserialize_invalid_scope(self) -> None: """Tests that deserialize fails with an incorrect scope.""" stream = io.BytesIO(b"\x01") - with pytest.raises(ValueError, match="Invalid scope for Boolean"): + with pytest.raises(SSZDecodeError, match="expected scope of 1"): Boolean.deserialize(stream, scope=0) stream.seek(0) - with pytest.raises(ValueError, match="Invalid scope for Boolean"): + with pytest.raises(SSZDecodeError, match="expected scope of 1"): Boolean.deserialize(stream, scope=2) def test_deserialize_premature_stream_end(self) -> None: """Tests that deserialize fails if the stream ends prematurely.""" stream = io.BytesIO(b"") # Empty stream - with pytest.raises(IOError, match="Stream ended prematurely"): + with pytest.raises(SSZStreamError, match="expected 1 bytes, got 0"): Boolean.deserialize(stream, scope=1) diff --git a/tests/lean_spec/types/test_byte_arrays.py b/tests/lean_spec/types/test_byte_arrays.py index 8888f4e4..46fe6f2b 100644 --- a/tests/lean_spec/types/test_byte_arrays.py +++ b/tests/lean_spec/types/test_byte_arrays.py @@ -19,6 +19,11 @@ BaseBytes, BaseByteList, ) +from lean_spec.types.exceptions import ( + SSZDecodeError, + SSZLengthError, + SSZStreamError, +) def sha256(b: bytes) -> bytes: @@ -61,11 +66,11 @@ def test_bytevector_coercion(value: Any, expected: bytes) -> None: def test_bytevector_wrong_length_raises() -> None: - with pytest.raises(ValueError): + with pytest.raises(SSZLengthError): Bytes4(b"\x00\x01\x02") # 3 != 4 - with pytest.raises(ValueError): + with pytest.raises(SSZLengthError): Bytes4([0, 1, 2]) # 3 != 4 - with pytest.raises(ValueError): + with pytest.raises(SSZLengthError): Bytes4("000102") # 3 != 4 (hex nibbles -> 3 bytes) @@ -89,7 +94,7 @@ class ByteList5(BaseByteList): def test_bytelist_over_limit_raises() -> None: # Test with ByteList64 that has limit 64 - with pytest.raises(ValueError): + with pytest.raises(SSZLengthError): ByteList64(data=b"\x00" * 65) # Over the limit @@ -170,7 +175,7 @@ def test_encode_decode_roundtrip_vector(Typ: Type[BaseBytes], payload: bytes) -> def test_vector_deserialize_scope_mismatch_raises() -> None: v = Bytes4(b"\x00\x01\x02\x03") buf = io.BytesIO(v.encode_bytes()) - with pytest.raises(ValueError): + with pytest.raises(SSZDecodeError, match="expected 4 bytes, got 3"): Bytes4.deserialize(buf, 3) # wrong scope @@ -204,7 +209,7 @@ class TestByteList2(BaseByteList): LIMIT = 2 buf = io.BytesIO(b"\x00\x01\x02") - with pytest.raises(ValueError): + with pytest.raises(SSZLengthError): TestByteList2.deserialize(buf, 3) @@ -213,7 +218,7 @@ class TestByteList10(BaseByteList): LIMIT = 10 buf = io.BytesIO(b"\x00\x01") - with pytest.raises(IOError): + with pytest.raises(SSZStreamError): TestByteList10.deserialize(buf, 3) # stream too short @@ -249,11 +254,11 @@ def test_pydantic_accepts_various_inputs_for_vectors() -> None: def test_pydantic_validates_vector_lengths() -> None: - with pytest.raises(ValueError): + with pytest.raises(SSZLengthError): ModelVectors(root=Bytes32(b"\x11" * 31), key=Bytes4(b"\x00\x01\x02\x03")) # too short - with pytest.raises(ValueError): + with pytest.raises(SSZLengthError): ModelVectors(root=Bytes32(b"\x11" * 33), key=Bytes4(b"\x00\x01\x02\x03")) # too long - with pytest.raises(ValueError): + with pytest.raises(SSZLengthError): ModelVectors(root=Bytes32(b"\x11" * 32), key=Bytes4(b"\x00\x01\x02")) # key too short @@ -277,7 +282,7 @@ def test_pydantic_accepts_and_serializes_bytelist() -> None: def test_pydantic_bytelist_limit_enforced() -> None: - with pytest.raises(ValueError): + with pytest.raises(SSZLengthError): ModelLists(payload=ByteList16(data=bytes(range(17)))) # over limit diff --git a/tests/lean_spec/types/test_collections.py b/tests/lean_spec/types/test_collections.py index 4d98c03b..b85edfcc 100644 --- a/tests/lean_spec/types/test_collections.py +++ b/tests/lean_spec/types/test_collections.py @@ -10,8 +10,12 @@ from lean_spec.types.boolean import Boolean from lean_spec.types.collections import SSZList, SSZVector from lean_spec.types.container import Container +from lean_spec.types.exceptions import SSZLengthError, SSZTypeCoercionError, SSZTypeDefinitionError from lean_spec.types.uint import Uint8, Uint16, Uint32, Uint256 +# Type alias for errors that can be SSZLengthError or wrapped in ValidationError +LengthOrValidationError = (SSZLengthError, ValidationError) + # Define some List types that are needed for Container definitions class Uint16List4(SSZList): @@ -234,9 +238,9 @@ def test_instantiation_success(self) -> None: def test_instantiation_with_wrong_length_raises_error(self) -> None: """Tests that providing the wrong number of items during instantiation fails.""" vec_type = Uint8Vector4 - with pytest.raises(ValueError, match="requires exactly 4 items"): + with pytest.raises(LengthOrValidationError): vec_type(data=[Uint8(1), Uint8(2), Uint8(3)]) # Too few - with pytest.raises(ValueError, match="requires exactly 4 items"): + with pytest.raises(LengthOrValidationError): vec_type(data=[Uint8(1), Uint8(2), Uint8(3), Uint8(4), Uint8(5)]) # Too many def test_pydantic_validation(self) -> None: @@ -246,11 +250,11 @@ def test_pydantic_validation(self) -> None: assert isinstance(instance.value, Uint8Vector2) assert list(instance.value) == [Uint8(10), Uint8(20)] # Test invalid data - with pytest.raises(ValidationError): + with pytest.raises(LengthOrValidationError): Uint8Vector2Model(value={"data": [10]}) # type: ignore[arg-type] - with pytest.raises(ValidationError): + with pytest.raises(LengthOrValidationError): Uint8Vector2Model(value={"data": [10, 20, 30]}) # type: ignore[arg-type] - with pytest.raises(TypeError): + with pytest.raises(SSZTypeCoercionError): Uint8Vector2Model(value={"data": [10, "bad"]}) # type: ignore[arg-type] def test_vector_is_immutable(self) -> None: @@ -281,13 +285,13 @@ def test_class_getitem_creates_specialized_type(self) -> None: def test_instantiate_raw_type_raises_error(self) -> None: """Tests that the raw, non-specialized SSZList cannot be instantiated.""" - with pytest.raises(TypeError, match="must define ELEMENT_TYPE and LIMIT"): + with pytest.raises(SSZTypeDefinitionError, match="must define ELEMENT_TYPE and LIMIT"): SSZList(data=[]) def test_instantiation_over_limit_raises_error(self) -> None: """Tests that providing more items than the limit during instantiation fails.""" list_type = Uint8List4 - with pytest.raises(ValueError, match="cannot contain more than 4 elements"): + with pytest.raises(LengthOrValidationError): list_type(data=[Uint8(1), Uint8(2), Uint8(3), Uint8(4), Uint8(5)]) def test_pydantic_validation(self) -> None: @@ -297,19 +301,19 @@ def test_pydantic_validation(self) -> None: assert isinstance(instance.value, Uint8List4) assert list(instance.value) == [Uint8(10), Uint8(20)] # Test invalid data - list too long - with pytest.raises(ValidationError): + with pytest.raises(LengthOrValidationError): Uint8List4Model( value=Uint8List4(data=[Uint8(10), Uint8(20), Uint8(30), Uint8(40), Uint8(50)]) ) def test_append_at_limit_raises_error(self) -> None: """Tests that creating a list at limit +1 fails during construction.""" - with pytest.raises(ValueError, match="cannot contain more than 4 elements"): + with pytest.raises(LengthOrValidationError): BooleanList4(data=[Boolean(True)] * 5) def test_extend_over_limit_raises_error(self) -> None: """Tests that creating a list over the limit fails during construction.""" - with pytest.raises(ValueError, match="cannot contain more than 4 elements"): + with pytest.raises(LengthOrValidationError): BooleanList4( data=[Boolean(True), Boolean(False), Boolean(True), Boolean(False), Boolean(True)] ) @@ -332,7 +336,7 @@ def test_add_with_sszlist(self) -> None: def test_add_exceeding_limit_raises_error(self) -> None: """Tests that concatenating beyond the limit raises an error.""" list1 = Uint8List4(data=[Uint8(1), Uint8(2), Uint8(3)]) - with pytest.raises(ValueError, match="cannot contain more than 4 elements"): + with pytest.raises(LengthOrValidationError): list1 + [4, 5] diff --git a/tests/lean_spec/types/test_uint.py b/tests/lean_spec/types/test_uint.py index e973154b..f22bd851 100644 --- a/tests/lean_spec/types/test_uint.py +++ b/tests/lean_spec/types/test_uint.py @@ -6,6 +6,12 @@ import pytest from pydantic import BaseModel, ValidationError +from lean_spec.types.exceptions import ( + SSZDecodeError, + SSZOverflowError, + SSZStreamError, + SSZTypeCoercionError, +) from lean_spec.types.uint import ( BaseUint, Uint8, @@ -92,9 +98,9 @@ def test_pydantic_strict_mode_rejects_invalid_types( def test_instantiation_from_invalid_types_raises_error( uint_class: Type[BaseUint], invalid_value: Any, expected_type_name: str ) -> None: - """Tests that instantiating with non-integer types raises a TypeError.""" + """Tests that instantiating with non-integer types raises SSZTypeCoercionError.""" expected_msg = f"Expected int, got {expected_type_name}" - with pytest.raises(TypeError, match=expected_msg): + with pytest.raises(SSZTypeCoercionError, match=expected_msg): uint_class(invalid_value) @@ -109,16 +115,16 @@ def test_instantiation_and_type(uint_class: Type[BaseUint]) -> None: @pytest.mark.parametrize("uint_class", ALL_UINT_TYPES) def test_instantiation_negative(uint_class: Type[BaseUint]) -> None: - """Tests that instantiating with a negative number raises OverflowError.""" - with pytest.raises(OverflowError): + """Tests that instantiating with a negative number raises SSZOverflowError.""" + with pytest.raises(SSZOverflowError): uint_class(-5) @pytest.mark.parametrize("uint_class", ALL_UINT_TYPES) def test_instantiation_too_large(uint_class: Type[BaseUint]) -> None: - """Tests that instantiating with a value >= MAX raises OverflowError.""" + """Tests that instantiating with a value >= MAX raises SSZOverflowError.""" max_value = 2**uint_class.BITS - with pytest.raises(OverflowError): + with pytest.raises(SSZOverflowError): uint_class(max_value) @@ -140,17 +146,17 @@ def test_arithmetic_operators(uint_class: Type[BaseUint]) -> None: # Addition assert a + b == uint_class(a_val + b_val) - with pytest.raises(OverflowError): + with pytest.raises(SSZOverflowError): _ = max_val + b # Subtraction assert a - b == uint_class(a_val - b_val) - with pytest.raises(OverflowError): + with pytest.raises(SSZOverflowError): _ = b - a # Multiplication assert a * b == uint_class(a_val * b_val) - with pytest.raises(OverflowError): + with pytest.raises(SSZOverflowError): _ = max_val * b # Floor Division @@ -162,7 +168,7 @@ def test_arithmetic_operators(uint_class: Type[BaseUint]) -> None: # Exponentiation assert uint_class(b_val) ** 4 == uint_class(b_val**4) if uint_class.BITS <= 16: # Pow gets too big quickly - with pytest.raises(OverflowError): + with pytest.raises(SSZOverflowError): _ = a ** int(b) @@ -399,10 +405,10 @@ def test_encode_decode_roundtrip( @pytest.mark.parametrize("uint_class", ALL_UINT_TYPES) def test_decode_bytes_invalid_length(self, uint_class: Type[BaseUint]) -> None: - """Tests that `decode_bytes` raises a ValueError for data of the wrong length.""" + """Tests that `decode_bytes` raises an SSZDecodeError for data of the wrong length.""" # Create byte string that is one byte too short. invalid_data = b"\x00" * (uint_class.get_byte_length() - 1) - with pytest.raises(ValueError, match="Invalid byte length"): + with pytest.raises(SSZDecodeError, match="expected .* bytes, got"): uint_class.decode_bytes(invalid_data) @pytest.mark.parametrize("uint_class", ALL_UINT_TYPES) @@ -426,17 +432,17 @@ def test_serialize_deserialize_stream_roundtrip(self, uint_class: Type[BaseUint] @pytest.mark.parametrize("uint_class", ALL_UINT_TYPES) def test_deserialize_invalid_scope(self, uint_class: Type[BaseUint]) -> None: - """Tests that `deserialize` raises a ValueError if the scope is incorrect.""" + """Tests that `deserialize` raises an SSZDecodeError if the scope is incorrect.""" stream = io.BytesIO(b"\x00" * uint_class.get_byte_length()) invalid_scope = uint_class.get_byte_length() - 1 - with pytest.raises(ValueError, match="Invalid scope"): + with pytest.raises(SSZDecodeError, match="invalid scope"): uint_class.deserialize(stream, scope=invalid_scope) @pytest.mark.parametrize("uint_class", ALL_UINT_TYPES) def test_deserialize_stream_too_short(self, uint_class: Type[BaseUint]) -> None: - """Tests that `deserialize` raises an IOError if the stream ends prematurely.""" + """Tests that `deserialize` raises an SSZStreamError if the stream ends prematurely.""" byte_length = uint_class.get_byte_length() # Create a stream that is shorter than what the type requires. stream = io.BytesIO(b"\x00" * (byte_length - 1)) - with pytest.raises(IOError, match="Stream ended prematurely"): + with pytest.raises(SSZStreamError, match="expected .* bytes, got"): uint_class.deserialize(stream, scope=byte_length) diff --git a/tests/lean_spec/types/test_union.py b/tests/lean_spec/types/test_union.py index b7ab6ac1..a5bd58a7 100644 --- a/tests/lean_spec/types/test_union.py +++ b/tests/lean_spec/types/test_union.py @@ -9,6 +9,12 @@ from lean_spec.types.collections import SSZList, SSZVector from lean_spec.types.container import Container +from lean_spec.types.exceptions import ( + SSZDecodeError, + SSZSelectorError, + SSZTypeCoercionError, + SSZTypeDefinitionError, +) from lean_spec.types.ssz_base import SSZType from lean_spec.types.uint import Uint8, Uint16, Uint32 from lean_spec.types.union import SSZUnion @@ -112,14 +118,14 @@ def test_constructor_success() -> None: def test_constructor_errors() -> None: """Test Union construction error cases.""" # Invalid selector (out of range) - with pytest.raises(ValueError, match="Invalid selector"): + with pytest.raises(SSZSelectorError, match="out of range"): OptionalNumericUnion(selector=3, value=None) # None value for None option should work OptionalNumericUnion(selector=0, value=None) # Non-None value for None option should fail - with pytest.raises(TypeError, match="value must be None"): + with pytest.raises(SSZTypeCoercionError, match="Expected None"): OptionalNumericUnion(selector=0, value=Uint16(1)) @@ -138,22 +144,22 @@ def test_pydantic_validation_ok() -> None: def test_pydantic_validation_errors() -> None: """Test Pydantic validation error cases.""" # Test invalid selector directly - with pytest.raises(ValueError, match="Invalid selector"): + with pytest.raises(SSZSelectorError, match="out of range"): OptionalNumericUnion(selector=9, value=0) # Test invalid value for None option directly - with pytest.raises(TypeError, match="value must be None"): + with pytest.raises(SSZTypeCoercionError, match="Expected None"): OptionalNumericUnion(selector=0, value=1) # Test with Pydantic model wrapper - should catch underlying errors model = create_model("M", v=(OptionalNumericUnion, ...)) # Invalid selector in model context - with pytest.raises((ValidationError, ValueError)): + with pytest.raises((ValidationError, SSZSelectorError)): model(v={"selector": 9, "value": 0}) # Invalid value for None option in model context - with pytest.raises((ValidationError, TypeError)): + with pytest.raises((ValidationError, SSZTypeCoercionError)): model(v={"selector": 0, "value": 1}) @@ -192,15 +198,15 @@ def test_union_with_nested_composites_roundtrip() -> None: def test_deserialize_errors() -> None: """Test deserialization error cases.""" # Too small scope - with pytest.raises(ValueError, match="Scope too small"): + with pytest.raises(SSZDecodeError, match="scope too small"): SimpleUnion.deserialize(io.BytesIO(b""), 0) # Invalid selector - with pytest.raises(ValueError, match="out of range"): + with pytest.raises(SSZSelectorError, match="out of range"): SimpleUnion.deserialize(io.BytesIO(b"\x09"), 1) # None option with payload - with pytest.raises(ValueError, match="no payload bytes"): + with pytest.raises(SSZDecodeError, match="no payload bytes"): OptionalNumericUnion.deserialize(io.BytesIO(b"\x00\xff"), 2) @@ -255,7 +261,7 @@ def test_is_fixed_size_helper() -> None: def test_get_byte_length_raises() -> None: """Test get_byte_length() raises for variable-size types.""" - with pytest.raises(TypeError, match="variable-size"): + with pytest.raises(SSZTypeDefinitionError, match="variable-size"): NumericUnion.get_byte_length() @@ -270,7 +276,7 @@ class ValidUnion(SSZUnion): assert instance.selector == 0 # Invalid union with None not at index 0 should fail during validation - with pytest.raises(TypeError, match="None at index 0"): + with pytest.raises(SSZTypeDefinitionError, match="None at index 0"): class InvalidUnion1(SSZUnion): OPTIONS = (Uint16, None) @@ -281,7 +287,7 @@ class InvalidUnion1(SSZUnion): class NotSSZ: pass - with pytest.raises(TypeError, match="takes no arguments"): + with pytest.raises(SSZTypeCoercionError): class InvalidUnion2(SSZUnion): OPTIONS = (cast(PyType[SSZType], NotSSZ),) @@ -297,7 +303,7 @@ def test_union_boundary_cases() -> None: assert u.value == Uint16(42) # None-only union should fail validation - with pytest.raises(TypeError, match="only option"): + with pytest.raises(SSZTypeDefinitionError, match="only option"): class NoneOnlyUnion(SSZUnion): OPTIONS = (None,) From 70147e77dfa99bf0fed78331417f9ae054946aae Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Sat, 20 Dec 2025 23:56:02 +0100 Subject: [PATCH 2/3] cleanup --- pyproject.toml | 2 +- src/lean_spec/types/exceptions.py | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 2db62b49..ec08c110 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -54,7 +54,7 @@ docstring-code-format = true [tool.ruff.lint] select = ["E", "F", "B", "W", "I", "A", "N", "D", "C"] fixable = ["I", "B", "E", "F", "W", "D", "C"] -ignore = ["D205", "D203", "D212", "D415", "C901", "A005", "C420", "D105", "D107"] +ignore = ["D205", "D203", "D212", "D415", "C901", "A005", "C420"] [tool.ruff.lint.pydocstyle] convention = "google" diff --git a/src/lean_spec/types/exceptions.py b/src/lean_spec/types/exceptions.py index 673e8d3d..178c651e 100644 --- a/src/lean_spec/types/exceptions.py +++ b/src/lean_spec/types/exceptions.py @@ -14,10 +14,12 @@ class SSZError(Exception): """ def __init__(self, message: str) -> None: + """Initialize the exception with a message.""" self.message = message super().__init__(message) def __repr__(self) -> str: + """Return a string representation of the exception.""" return f"{self.__class__.__name__}({self.message!r})" @@ -42,6 +44,7 @@ def __init__( missing_attr: str | None = None, detail: str | None = None, ) -> None: + """Initialize the exception with type definition error details.""" self.type_name = type_name self.missing_attr = missing_attr self.detail = detail @@ -72,6 +75,7 @@ def __init__( actual_type: str, value: Any = None, ) -> None: + """Initialize the exception with type coercion error details.""" self.expected_type = expected_type self.actual_type = actual_type self.value = value @@ -113,6 +117,7 @@ def __init__( min_value: int = 0, max_value: int, ) -> None: + """Initialize the exception with overflow error details.""" self.value = value self.type_name = type_name self.min_value = min_value @@ -142,6 +147,7 @@ def __init__( actual: int, is_limit: bool = False, ) -> None: + """Initialize the exception with length error details.""" self.type_name = type_name self.expected = expected self.actual = actual @@ -176,6 +182,7 @@ def __init__( *, offset: int | None = None, ) -> None: + """Initialize the exception with decode error details.""" self.type_name = type_name self.detail = detail self.offset = offset @@ -206,6 +213,7 @@ def __init__( expected_bytes: int | None = None, actual_bytes: int | None = None, ) -> None: + """Initialize the exception with stream error details.""" self.type_name = type_name self.operation = operation self.expected_bytes = expected_bytes @@ -246,6 +254,7 @@ def __init__( start_offset: int | None = None, end_offset: int | None = None, ) -> None: + """Initialize the exception with offset error details.""" self.field_name = field_name self.start_offset = start_offset self.end_offset = end_offset @@ -280,6 +289,7 @@ def __init__( selector: int, num_options: int, ) -> None: + """Initialize the exception with selector error details.""" self.selector = selector self.num_options = num_options From eaf01821a25091df4808040480c85b26f4d9a792 Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Sun, 21 Dec 2025 00:12:09 +0100 Subject: [PATCH 3/3] simplify error types --- src/lean_spec/types/__init__.py | 17 +- src/lean_spec/types/bitfields.py | 86 ++----- src/lean_spec/types/boolean.py | 35 +-- src/lean_spec/types/byte_arrays.py | 79 ++---- src/lean_spec/types/collections.py | 109 +++----- src/lean_spec/types/container.py | 48 +--- src/lean_spec/types/exceptions.py | 288 +--------------------- src/lean_spec/types/uint.py | 39 +-- src/lean_spec/types/union.py | 98 ++------ tests/lean_spec/types/test_bitfields.py | 37 ++- tests/lean_spec/types/test_boolean.py | 25 +- tests/lean_spec/types/test_byte_arrays.py | 28 +-- tests/lean_spec/types/test_collections.py | 28 +-- tests/lean_spec/types/test_uint.py | 39 ++- tests/lean_spec/types/test_union.py | 33 ++- 15 files changed, 203 insertions(+), 786 deletions(-) diff --git a/src/lean_spec/types/__init__.py b/src/lean_spec/types/__init__.py index cfdbc5aa..0182d312 100644 --- a/src/lean_spec/types/__init__.py +++ b/src/lean_spec/types/__init__.py @@ -7,16 +7,8 @@ from .collections import SSZList, SSZVector from .container import Container from .exceptions import ( - SSZDecodeError, SSZError, - SSZLengthError, - SSZOffsetError, - SSZOverflowError, - SSZSelectorError, SSZSerializationError, - SSZStreamError, - SSZTypeCoercionError, - SSZTypeDefinitionError, SSZTypeError, SSZValueError, ) @@ -40,16 +32,9 @@ "SSZType", "Boolean", "Container", + # Exceptions "SSZError", "SSZTypeError", - "SSZTypeDefinitionError", - "SSZTypeCoercionError", "SSZValueError", - "SSZOverflowError", - "SSZLengthError", "SSZSerializationError", - "SSZDecodeError", - "SSZStreamError", - "SSZOffsetError", - "SSZSelectorError", ] diff --git a/src/lean_spec/types/bitfields.py b/src/lean_spec/types/bitfields.py index 942edf86..cb9b8b1b 100644 --- a/src/lean_spec/types/bitfields.py +++ b/src/lean_spec/types/bitfields.py @@ -29,13 +29,7 @@ from typing_extensions import Self from .boolean import Boolean -from .exceptions import ( - SSZDecodeError, - SSZLengthError, - SSZStreamError, - SSZTypeCoercionError, - SSZTypeDefinitionError, -) +from .exceptions import SSZSerializationError, SSZTypeError, SSZValueError from .ssz_base import SSZModel @@ -62,19 +56,14 @@ class BaseBitvector(SSZModel): def _coerce_and_validate(cls, v: Any) -> tuple[Boolean, ...]: """Validate and convert input data to typed tuple of Booleans.""" if not hasattr(cls, "LENGTH"): - raise SSZTypeDefinitionError( - type_name=cls.__name__, - missing_attr="LENGTH", - ) + raise SSZTypeError(f"{cls.__name__} must define LENGTH") if not isinstance(v, (list, tuple)): v = tuple(v) if len(v) != cls.LENGTH: - raise SSZLengthError( - type_name=cls.__name__, - expected=cls.LENGTH, - actual=len(v), + raise SSZValueError( + f"{cls.__name__} requires exactly {cls.LENGTH} elements, got {len(v)}" ) return tuple(Boolean(bit) for bit in v) @@ -100,18 +89,12 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: """Read SSZ bytes from a stream and return an instance.""" expected_len = cls.get_byte_length() if scope != expected_len: - raise SSZDecodeError( - type_name=cls.__name__, - detail=f"expected {expected_len} bytes, got {scope}", + raise SSZSerializationError( + f"{cls.__name__}: expected {expected_len} bytes, got {scope}" ) data = stream.read(scope) if len(data) != scope: - raise SSZStreamError( - type_name=cls.__name__, - operation="decoding", - expected_bytes=scope, - actual_bytes=len(data), - ) + raise SSZSerializationError(f"{cls.__name__}: expected {scope} bytes, got {len(data)}") return cls.decode_bytes(data) def encode_bytes(self) -> bytes: @@ -137,11 +120,7 @@ def decode_bytes(cls, data: bytes) -> Self: """ expected = cls.get_byte_length() if len(data) != expected: - raise SSZLengthError( - type_name=cls.__name__, - expected=expected, - actual=len(data), - ) + raise SSZValueError(f"{cls.__name__}: expected {expected} bytes, got {len(data)}") bits = tuple(Boolean((data[i // 8] >> (i % 8)) & 1) for i in range(cls.LENGTH)) return cls(data=bits) @@ -170,10 +149,7 @@ class BaseBitlist(SSZModel): def _coerce_and_validate(cls, v: Any) -> tuple[Boolean, ...]: """Validate and convert input to a tuple of Boolean elements.""" if not hasattr(cls, "LIMIT"): - raise SSZTypeDefinitionError( - type_name=cls.__name__, - missing_attr="LIMIT", - ) + raise SSZTypeError(f"{cls.__name__} must define LIMIT") # Handle various input types if isinstance(v, (list, tuple)): @@ -181,20 +157,11 @@ def _coerce_and_validate(cls, v: Any) -> tuple[Boolean, ...]: elif hasattr(v, "__iter__") and not isinstance(v, (str, bytes)): elements = list(v) else: - raise SSZTypeCoercionError( - expected_type="iterable", - actual_type=type(v).__name__, - value=v, - ) + raise SSZTypeError(f"Expected iterable, got {type(v).__name__}") # Check limit if len(elements) > cls.LIMIT: - raise SSZLengthError( - type_name=cls.__name__, - expected=cls.LIMIT, - actual=len(elements), - is_limit=True, - ) + raise SSZValueError(f"{cls.__name__} exceeds limit of {cls.LIMIT}, got {len(elements)}") return tuple(Boolean(bit) for bit in elements) @@ -235,11 +202,8 @@ def is_fixed_size(cls) -> bool: @classmethod def get_byte_length(cls) -> int: - """Lists are variable-size, so this raises an SSZTypeDefinitionError.""" - raise SSZTypeDefinitionError( - type_name=cls.__name__, - detail="variable-size bitlist has no fixed byte length", - ) + """Lists are variable-size, so this raises an SSZTypeError.""" + raise SSZTypeError(f"{cls.__name__}: variable-size bitlist has no fixed byte length") def serialize(self, stream: IO[bytes]) -> int: """Write SSZ bytes to a binary stream.""" @@ -252,12 +216,7 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: """Read SSZ bytes from a stream and return an instance.""" data = stream.read(scope) if len(data) != scope: - raise SSZStreamError( - type_name=cls.__name__, - operation="decoding", - expected_bytes=scope, - actual_bytes=len(data), - ) + raise SSZSerializationError(f"{cls.__name__}: expected {scope} bytes, got {len(data)}") return cls.decode_bytes(data) def encode_bytes(self) -> bytes: @@ -300,10 +259,7 @@ def decode_bytes(cls, data: bytes) -> Self: the last data bit. All bits after the delimiter are assumed to be 0. """ if len(data) == 0: - raise SSZDecodeError( - type_name=cls.__name__, - detail="cannot decode empty bytes to Bitlist", - ) + raise SSZSerializationError(f"{cls.__name__}: cannot decode empty bytes") # Find the position of the delimiter bit (rightmost 1). delimiter_pos = None @@ -316,20 +272,12 @@ def decode_bytes(cls, data: bytes) -> Self: break if delimiter_pos is None: - raise SSZDecodeError( - type_name=cls.__name__, - detail="no delimiter bit found in Bitlist data", - ) + raise SSZSerializationError(f"{cls.__name__}: no delimiter bit found") # Extract data bits (everything before the delimiter). num_bits = delimiter_pos if num_bits > cls.LIMIT: - raise SSZLengthError( - type_name=cls.__name__, - expected=cls.LIMIT, - actual=num_bits, - is_limit=True, - ) + raise SSZValueError(f"{cls.__name__} exceeds limit of {cls.LIMIT}, got {num_bits}") bits = tuple(Boolean((data[i // 8] >> (i % 8)) & 1) for i in range(num_bits)) return cls(data=bits) diff --git a/src/lean_spec/types/boolean.py b/src/lean_spec/types/boolean.py index 481039ac..82012e59 100644 --- a/src/lean_spec/types/boolean.py +++ b/src/lean_spec/types/boolean.py @@ -8,12 +8,7 @@ from pydantic_core import CoreSchema, core_schema from typing_extensions import Self -from .exceptions import ( - SSZDecodeError, - SSZStreamError, - SSZTypeCoercionError, - SSZValueError, -) +from .exceptions import SSZSerializationError, SSZTypeError, SSZValueError from .ssz_base import SSZType @@ -41,11 +36,7 @@ def __new__(cls, value: bool | int) -> Self: SSZDecodeError: If `value` is an integer other than 0 or 1. """ if not isinstance(value, int): - raise SSZTypeCoercionError( - expected_type="bool or int", - actual_type=type(value).__name__, - value=value, - ) + raise SSZTypeError(f"Expected bool or int, got {type(value).__name__}") if value not in (0, 1): raise SSZValueError(f"Boolean value must be 0 or 1, not {value}") @@ -103,15 +94,9 @@ def encode_bytes(self) -> bytes: def decode_bytes(cls, data: bytes) -> Self: """Deserialize a single byte into a Boolean instance.""" if len(data) != 1: - raise SSZDecodeError( - type_name="Boolean", - detail=f"expected 1 byte, got {len(data)}", - ) + raise SSZSerializationError(f"Boolean: expected 1 byte, got {len(data)}") if data[0] not in (0, 1): - raise SSZDecodeError( - type_name="Boolean", - detail=f"byte must be 0x00 or 0x01, got {data[0]:#04x}", - ) + raise SSZSerializationError(f"Boolean: byte must be 0x00 or 0x01, got {data[0]:#04x}") return cls(data[0]) def serialize(self, stream: IO[bytes]) -> int: @@ -124,18 +109,10 @@ def serialize(self, stream: IO[bytes]) -> int: def deserialize(cls, stream: IO[bytes], scope: int) -> Self: """Deserialize a boolean from a binary stream.""" if scope != 1: - raise SSZDecodeError( - type_name="Boolean", - detail=f"expected scope of 1, got {scope}", - ) + raise SSZSerializationError(f"Boolean: expected scope of 1, got {scope}") data = stream.read(1) if len(data) != 1: - raise SSZStreamError( - type_name="Boolean", - operation="decoding", - expected_bytes=1, - actual_bytes=len(data), - ) + raise SSZSerializationError(f"Boolean: expected 1 byte, got {len(data)}") return cls.decode_bytes(data) def _raise_type_error(self, other: Any, op_symbol: str) -> None: diff --git a/src/lean_spec/types/byte_arrays.py b/src/lean_spec/types/byte_arrays.py index a87d64a2..83273624 100644 --- a/src/lean_spec/types/byte_arrays.py +++ b/src/lean_spec/types/byte_arrays.py @@ -16,12 +16,7 @@ from pydantic_core import core_schema from typing_extensions import Self -from .exceptions import ( - SSZDecodeError, - SSZLengthError, - SSZStreamError, - SSZTypeDefinitionError, -) +from .exceptions import SSZSerializationError, SSZTypeError, SSZValueError from .ssz_base import SSZModel, SSZType @@ -74,18 +69,11 @@ def __new__(cls, value: Any = b"") -> Self: SSZLengthError: If the resulting byte length differs from `LENGTH`. """ if not hasattr(cls, "LENGTH"): - raise SSZTypeDefinitionError( - type_name=cls.__name__, - missing_attr="LENGTH", - ) + raise SSZTypeError(f"{cls.__name__} must define LENGTH") b = _coerce_to_bytes(value) if len(b) != cls.LENGTH: - raise SSZLengthError( - type_name=cls.__name__, - expected=cls.LENGTH, - actual=len(b), - ) + raise SSZValueError(f"{cls.__name__} requires exactly {cls.LENGTH} bytes, got {len(b)}") return super().__new__(cls, b) @classmethod @@ -130,18 +118,10 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: SSZStreamError: if the stream ends prematurely. """ if scope != cls.LENGTH: - raise SSZDecodeError( - type_name=cls.__name__, - detail=f"expected {cls.LENGTH} bytes, got {scope}", - ) + raise SSZSerializationError(f"{cls.__name__}: expected {cls.LENGTH} bytes, got {scope}") data = stream.read(scope) if len(data) != scope: - raise SSZStreamError( - type_name=cls.__name__, - operation="decoding", - expected_bytes=scope, - actual_bytes=len(data), - ) + raise SSZSerializationError(f"{cls.__name__}: expected {scope} bytes, got {len(data)}") return cls(data) def encode_bytes(self) -> bytes: @@ -156,10 +136,8 @@ def decode_bytes(cls, data: bytes) -> Self: For a fixed-size type, the data must be exactly `LENGTH` bytes. """ if len(data) != cls.LENGTH: - raise SSZLengthError( - type_name=cls.__name__, - expected=cls.LENGTH, - actual=len(data), + raise SSZValueError( + f"{cls.__name__} requires exactly {cls.LENGTH} bytes, got {len(data)}" ) return cls(data) @@ -286,19 +264,11 @@ class BaseByteList(SSZModel): def _validate_byte_list_data(cls, v: Any) -> bytes: """Validate and convert input to bytes with limit checking.""" if not hasattr(cls, "LIMIT"): - raise SSZTypeDefinitionError( - type_name=cls.__name__, - missing_attr="LIMIT", - ) + raise SSZTypeError(f"{cls.__name__} must define LIMIT") b = _coerce_to_bytes(v) if len(b) > cls.LIMIT: - raise SSZLengthError( - type_name=cls.__name__, - expected=cls.LIMIT, - actual=len(b), - is_limit=True, - ) + raise SSZValueError(f"{cls.__name__} exceeds limit of {cls.LIMIT}, got {len(b)}") return b @field_serializer("data", when_used="json") @@ -314,10 +284,7 @@ def is_fixed_size(cls) -> bool: @classmethod def get_byte_length(cls) -> int: """ByteList is variable-size, so this should not be called.""" - raise SSZTypeDefinitionError( - type_name=cls.__name__, - detail="variable-size byte list has no fixed byte length", - ) + raise SSZTypeError(f"{cls.__name__}: variable-size byte list has no fixed byte length") def serialize(self, stream: IO[bytes]) -> int: """ @@ -343,25 +310,12 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: SSZStreamError: if the stream ends prematurely. """ if scope < 0: - raise SSZDecodeError( - type_name=cls.__name__, - detail="negative scope", - ) + raise SSZSerializationError(f"{cls.__name__}: negative scope") if scope > cls.LIMIT: - raise SSZLengthError( - type_name=cls.__name__, - expected=cls.LIMIT, - actual=scope, - is_limit=True, - ) + raise SSZValueError(f"{cls.__name__} exceeds limit of {cls.LIMIT}, got {scope}") data = stream.read(scope) if len(data) != scope: - raise SSZStreamError( - type_name=cls.__name__, - operation="decoding", - expected_bytes=scope, - actual_bytes=len(data), - ) + raise SSZSerializationError(f"{cls.__name__}: expected {scope} bytes, got {len(data)}") return cls(data=data) def encode_bytes(self) -> bytes: @@ -376,12 +330,7 @@ def decode_bytes(cls, data: bytes) -> Self: For variable-size types, the data length must be `<= LIMIT`. """ if len(data) > cls.LIMIT: - raise SSZLengthError( - type_name=cls.__name__, - expected=cls.LIMIT, - actual=len(data), - is_limit=True, - ) + raise SSZValueError(f"{cls.__name__} exceeds limit of {cls.LIMIT}, got {len(data)}") return cls(data=data) def __bytes__(self) -> bytes: diff --git a/src/lean_spec/types/collections.py b/src/lean_spec/types/collections.py index 6ffe9bbd..d9d43a46 100644 --- a/src/lean_spec/types/collections.py +++ b/src/lean_spec/types/collections.py @@ -22,13 +22,7 @@ from lean_spec.types.constants import OFFSET_BYTE_LENGTH from .byte_arrays import BaseBytes -from .exceptions import ( - SSZDecodeError, - SSZLengthError, - SSZOffsetError, - SSZTypeCoercionError, - SSZTypeDefinitionError, -) +from .exceptions import SSZSerializationError, SSZTypeError, SSZValueError from .ssz_base import SSZModel, SSZType from .uint import Uint32 @@ -110,10 +104,7 @@ def _serialize_data(self, value: Sequence[T]) -> list[Any]: def _validate_vector_data(cls, v: Any) -> tuple[SSZType, ...]: """Validate and convert input to a typed tuple of exactly LENGTH elements.""" if not hasattr(cls, "ELEMENT_TYPE") or not hasattr(cls, "LENGTH"): - raise SSZTypeDefinitionError( - type_name=cls.__name__, - detail="must define ELEMENT_TYPE and LENGTH", - ) + raise SSZTypeError(f"{cls.__name__} must define ELEMENT_TYPE and LENGTH") if not isinstance(v, (list, tuple)): v = tuple(v) @@ -125,10 +116,8 @@ def _validate_vector_data(cls, v: Any) -> tuple[SSZType, ...]: ) if len(typed_values) != cls.LENGTH: - raise SSZLengthError( - type_name=cls.__name__, - expected=cls.LENGTH, - actual=len(typed_values), + raise SSZValueError( + f"{cls.__name__} requires exactly {cls.LENGTH} elements, got {len(typed_values)}" ) return typed_values @@ -142,10 +131,7 @@ def is_fixed_size(cls) -> bool: def get_byte_length(cls) -> int: """Get the byte length if the SSZVector is fixed-size.""" if not cls.is_fixed_size(): - raise SSZTypeDefinitionError( - type_name=cls.__name__, - detail="variable-size vector has no fixed byte length", - ) + raise SSZTypeError(f"{cls.__name__}: variable-size vector has no fixed byte length") return cls.ELEMENT_TYPE.get_byte_length() * cls.LENGTH def serialize(self, stream: IO[bytes]) -> int: @@ -176,9 +162,8 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: if cls.is_fixed_size(): elem_byte_length = cls.get_byte_length() // cls.LENGTH if scope != cls.get_byte_length(): - raise SSZDecodeError( - type_name=cls.__name__, - detail=f"expected {cls.get_byte_length()} bytes, got {scope}", + raise SSZSerializationError( + f"{cls.__name__}: expected {cls.get_byte_length()} bytes, got {scope}" ) elements = [ cls.ELEMENT_TYPE.deserialize(stream, elem_byte_length) for _ in range(cls.LENGTH) @@ -189,10 +174,9 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: # The first offset tells us where the data starts, which must be after all offsets. first_offset = int(Uint32.deserialize(stream, OFFSET_BYTE_LENGTH)) if first_offset != cls.LENGTH * OFFSET_BYTE_LENGTH: - raise SSZOffsetError( - type_name=cls.__name__, - start_offset=first_offset, - end_offset=cls.LENGTH * OFFSET_BYTE_LENGTH, + expected = cls.LENGTH * OFFSET_BYTE_LENGTH + raise SSZSerializationError( + f"{cls.__name__}: invalid offset {first_offset}, expected {expected}" ) # Read the remaining offsets and add the total scope as the final boundary. offsets = [first_offset] + [ @@ -203,10 +187,8 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: for i in range(cls.LENGTH): start, end = offsets[i], offsets[i + 1] if start > end: - raise SSZOffsetError( - type_name=cls.__name__, - start_offset=start, - end_offset=end, + raise SSZSerializationError( + f"{cls.__name__}: invalid offsets start={start} > end={end}" ) elements.append(cls.ELEMENT_TYPE.deserialize(stream, end - start)) return cls(data=elements) @@ -316,10 +298,7 @@ def _serialize_data(self, value: Sequence[T]) -> list[Any]: def _validate_list_data(cls, v: Any) -> tuple[SSZType, ...]: """Validate and convert input to a tuple of SSZType elements.""" if not hasattr(cls, "ELEMENT_TYPE") or not hasattr(cls, "LIMIT"): - raise SSZTypeDefinitionError( - type_name=cls.__name__, - detail="must define ELEMENT_TYPE and LIMIT", - ) + raise SSZTypeError(f"{cls.__name__} must define ELEMENT_TYPE and LIMIT") # Handle various input types if isinstance(v, (list, tuple)): @@ -327,20 +306,11 @@ def _validate_list_data(cls, v: Any) -> tuple[SSZType, ...]: elif hasattr(v, "__iter__") and not isinstance(v, (str, bytes)): elements = list(v) else: - raise SSZTypeCoercionError( - expected_type="iterable", - actual_type=type(v).__name__, - value=v, - ) + raise SSZTypeError(f"Expected iterable, got {type(v).__name__}") # Check limit if len(elements) > cls.LIMIT: - raise SSZLengthError( - type_name=cls.__name__, - expected=cls.LIMIT, - actual=len(elements), - is_limit=True, - ) + raise SSZValueError(f"{cls.__name__} exceeds limit of {cls.LIMIT}, got {len(elements)}") # Convert and validate each element typed_values = [] @@ -351,10 +321,8 @@ def _validate_list_data(cls, v: Any) -> tuple[SSZType, ...]: try: typed_values.append(cast(Any, cls.ELEMENT_TYPE)(element)) except Exception as e: - raise SSZTypeCoercionError( - expected_type=cls.ELEMENT_TYPE.__name__, - actual_type=type(element).__name__, - value=element, + raise SSZTypeError( + f"Expected {cls.ELEMENT_TYPE.__name__}, got {type(element).__name__}" ) from e return tuple(typed_values) @@ -376,11 +344,8 @@ def is_fixed_size(cls) -> bool: @classmethod def get_byte_length(cls) -> int: - """Lists are variable-size, so this raises an SSZTypeDefinitionError.""" - raise SSZTypeDefinitionError( - type_name=cls.__name__, - detail="variable-size list has no fixed byte length", - ) + """Lists are variable-size, so this raises an SSZTypeError.""" + raise SSZTypeError(f"{cls.__name__}: variable-size list has no fixed byte length") def serialize(self, stream: IO[bytes]) -> int: """Serialize the list to a binary stream.""" @@ -409,18 +374,14 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: # Fixed-size elements: read them back-to-back element_size = cls.ELEMENT_TYPE.get_byte_length() if scope % element_size != 0: - raise SSZDecodeError( - type_name=cls.__name__, - detail=f"scope {scope} is not divisible by element size {element_size}", + raise SSZSerializationError( + f"{cls.__name__}: scope {scope} not divisible by element size {element_size}" ) num_elements = scope // element_size if num_elements > cls.LIMIT: - raise SSZLengthError( - type_name=cls.__name__, - expected=cls.LIMIT, - actual=num_elements, - is_limit=True, + raise SSZValueError( + f"{cls.__name__} exceeds limit of {cls.LIMIT}, got {num_elements}" ) elements = [ @@ -434,28 +395,18 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: # Empty list case return cls(data=[]) if scope < OFFSET_BYTE_LENGTH: - raise SSZDecodeError( - type_name=cls.__name__, - detail=f"scope {scope} too small for variable-size list", + raise SSZSerializationError( + f"{cls.__name__}: scope {scope} too small for variable-size list" ) # Read the first offset to determine the number of elements. first_offset = int(Uint32.deserialize(stream, OFFSET_BYTE_LENGTH)) if first_offset > scope or first_offset % OFFSET_BYTE_LENGTH != 0: - raise SSZOffsetError( - type_name=cls.__name__, - start_offset=first_offset, - end_offset=scope, - ) + raise SSZSerializationError(f"{cls.__name__}: invalid offset {first_offset}") count = first_offset // OFFSET_BYTE_LENGTH if count > cls.LIMIT: - raise SSZLengthError( - type_name=cls.__name__, - expected=cls.LIMIT, - actual=count, - is_limit=True, - ) + raise SSZValueError(f"{cls.__name__} exceeds limit of {cls.LIMIT}, got {count}") # Read the rest of the offsets. offsets = [first_offset] + [ @@ -468,10 +419,8 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: for i in range(count): start, end = offsets[i], offsets[i + 1] if start > end: - raise SSZOffsetError( - type_name=cls.__name__, - start_offset=start, - end_offset=end, + raise SSZSerializationError( + f"{cls.__name__}: invalid offsets start={start} > end={end}" ) elements.append(cls.ELEMENT_TYPE.deserialize(stream, end - start)) diff --git a/src/lean_spec/types/container.py b/src/lean_spec/types/container.py index 450dce32..c09d594a 100644 --- a/src/lean_spec/types/container.py +++ b/src/lean_spec/types/container.py @@ -16,12 +16,7 @@ from typing_extensions import Self from .constants import OFFSET_BYTE_LENGTH -from .exceptions import ( - SSZOffsetError, - SSZStreamError, - SSZTypeCoercionError, - SSZTypeDefinitionError, -) +from .exceptions import SSZSerializationError, SSZTypeError from .ssz_base import SSZModel, SSZType from .uint import Uint32 @@ -41,11 +36,7 @@ def _get_ssz_field_type(annotation: Any) -> Type[SSZType]: """ # Check if it's a class and is a subclass of SSZType if not (inspect.isclass(annotation) and issubclass(annotation, SSZType)): - raise SSZTypeCoercionError( - expected_type="SSZType subclass", - actual_type=str(annotation), - value=annotation, - ) + raise SSZTypeError(f"Expected SSZType subclass, got {annotation}") return annotation @@ -109,10 +100,7 @@ def get_byte_length(cls) -> int: """ # Only fixed-size containers have a deterministic byte length if not cls.is_fixed_size(): - raise SSZTypeDefinitionError( - type_name=cls.__name__, - detail="variable-size containers have no fixed byte length", - ) + raise SSZTypeError(f"{cls.__name__}: variable-size container has no fixed byte length") # Sum the byte lengths of all fixed-size fields return sum( @@ -211,11 +199,8 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: size = field_type.get_byte_length() data = stream.read(size) if len(data) != size: - raise SSZStreamError( - type_name=cls.__name__, - operation=f"reading field '{field_name}'", - expected_bytes=size, - actual_bytes=len(data), + raise SSZSerializationError( + f"{cls.__name__}.{field_name}: expected {size} bytes, got {len(data)}" ) fields[field_name] = field_type.decode_bytes(data) bytes_read += size @@ -223,11 +208,9 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: # Read offset pointer for variable field offset_bytes = stream.read(OFFSET_BYTE_LENGTH) if len(offset_bytes) != OFFSET_BYTE_LENGTH: - raise SSZStreamError( - type_name=cls.__name__, - operation=f"reading offset for field '{field_name}'", - expected_bytes=OFFSET_BYTE_LENGTH, - actual_bytes=len(offset_bytes), + raise SSZSerializationError( + f"{cls.__name__}.{field_name}: " + f"expected {OFFSET_BYTE_LENGTH} offset bytes, got {len(offset_bytes)}" ) offset = int(Uint32.decode_bytes(offset_bytes)) var_fields.append((field_name, field_type, offset)) @@ -239,11 +222,9 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: var_section_size = scope - bytes_read var_section = stream.read(var_section_size) if len(var_section) != var_section_size: - raise SSZStreamError( - type_name=cls.__name__, - operation="reading variable section", - expected_bytes=var_section_size, - actual_bytes=len(var_section), + raise SSZSerializationError( + f"{cls.__name__}: " + f"expected {var_section_size} variable bytes, got {len(var_section)}" ) # Extract each variable field using offsets @@ -256,11 +237,8 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: # Validate offset bounds if rel_start < 0 or rel_start > rel_end: - raise SSZOffsetError( - type_name=cls.__name__, - field_name=name, - start_offset=start, - end_offset=end, + raise SSZSerializationError( + f"{cls.__name__}.{name}: invalid offsets start={start}, end={end}" ) # Deserialize field from its slice diff --git a/src/lean_spec/types/exceptions.py b/src/lean_spec/types/exceptions.py index 178c651e..c1fbe607 100644 --- a/src/lean_spec/types/exceptions.py +++ b/src/lean_spec/types/exceptions.py @@ -1,297 +1,17 @@ """Exception hierarchy for the SSZ type system.""" -from __future__ import annotations - -from typing import Any - class SSZError(Exception): - """ - Base exception for all SSZ-related errors. - - Attributes: - message: Human-readable error description. - """ - - def __init__(self, message: str) -> None: - """Initialize the exception with a message.""" - self.message = message - super().__init__(message) - - def __repr__(self) -> str: - """Return a string representation of the exception.""" - return f"{self.__class__.__name__}({self.message!r})" + """Base exception for all SSZ-related errors.""" class SSZTypeError(SSZError): - """Base class for type-related errors.""" - - -class SSZTypeDefinitionError(SSZTypeError): - """ - Raised when an SSZ type class is incorrectly defined. - - Attributes: - type_name: The name of the type with the definition error. - missing_attr: The missing or invalid attribute name. - detail: Additional context about the error. - """ - - def __init__( - self, - type_name: str, - *, - missing_attr: str | None = None, - detail: str | None = None, - ) -> None: - """Initialize the exception with type definition error details.""" - self.type_name = type_name - self.missing_attr = missing_attr - self.detail = detail - - if missing_attr: - msg = f"{type_name} must define {missing_attr}" - elif detail: - msg = f"{type_name}: {detail}" - else: - msg = f"{type_name} has an invalid type definition" - - super().__init__(msg) - - -class SSZTypeCoercionError(SSZTypeError): - """ - Raised when a value cannot be coerced to the expected SSZ type. - - Attributes: - expected_type: The type that was expected. - actual_type: The actual type of the value. - value: The value that couldn't be coerced (may be truncated for display). - """ - - def __init__( - self, - expected_type: str, - actual_type: str, - value: Any = None, - ) -> None: - """Initialize the exception with type coercion error details.""" - self.expected_type = expected_type - self.actual_type = actual_type - self.value = value - - msg = f"Expected {expected_type}, got {actual_type}" - if value is not None: - value_repr = repr(value) - if len(value_repr) > 50: - value_repr = value_repr[:47] + "..." - msg = f"{msg}: {value_repr}" - - super().__init__(msg) + """Raised for type-related errors (coercion, definition, invalid types).""" class SSZValueError(SSZError): - """ - Base class for value-related errors. - - Raised when a value is invalid for an SSZ operation, even if the type is correct. - """ - - -class SSZOverflowError(SSZValueError): - """ - Raised when a numeric value is outside the valid range. - - Attributes: - value: The value that caused the overflow. - type_name: The SSZ type that couldn't hold the value. - min_value: The minimum allowed value (inclusive). - max_value: The maximum allowed value (inclusive). - """ - - def __init__( - self, - value: int, - type_name: str, - *, - min_value: int = 0, - max_value: int, - ) -> None: - """Initialize the exception with overflow error details.""" - self.value = value - self.type_name = type_name - self.min_value = min_value - self.max_value = max_value - - super().__init__( - f"{value} is out of range for {type_name} (valid range: [{min_value}, {max_value}])" - ) - - -class SSZLengthError(SSZValueError): - """ - Raised when a sequence has incorrect length. - - Attributes: - type_name: The SSZ type with the length constraint. - expected: The expected length (exact for vectors, max for lists). - actual: The actual length received. - is_limit: True if expected is a maximum limit, False if exact. - """ - - def __init__( - self, - type_name: str, - *, - expected: int, - actual: int, - is_limit: bool = False, - ) -> None: - """Initialize the exception with length error details.""" - self.type_name = type_name - self.expected = expected - self.actual = actual - self.is_limit = is_limit - - if is_limit: - msg = f"{type_name} cannot exceed {expected} elements, got {actual}" - else: - msg = f"{type_name} requires exactly {expected} elements, got {actual}" - - super().__init__(msg) + """Raised for value-related errors (overflow, length, bounds).""" class SSZSerializationError(SSZError): - """Base class for serialization-related errors.""" - - -class SSZDecodeError(SSZSerializationError): - """ - Raised when decoding SSZ bytes to a value fails. - - Attributes: - type_name: The type being decoded. - detail: Description of what went wrong. - offset: The byte offset where the error occurred (if known). - """ - - def __init__( - self, - type_name: str, - detail: str, - *, - offset: int | None = None, - ) -> None: - """Initialize the exception with decode error details.""" - self.type_name = type_name - self.detail = detail - self.offset = offset - - msg = f"Failed to decode {type_name}: {detail}" - if offset is not None: - msg = f"{msg} (at byte offset {offset})" - - super().__init__(msg) - - -class SSZStreamError(SSZSerializationError): - """ - Raised when a stream/IO error occurs during SSZ operations. - - Attributes: - type_name: The type being processed when the error occurred. - operation: The operation being performed (e.g., "read", "decode"). - expected_bytes: Number of bytes expected (if applicable). - actual_bytes: Number of bytes received (if applicable). - """ - - def __init__( - self, - type_name: str, - operation: str, - *, - expected_bytes: int | None = None, - actual_bytes: int | None = None, - ) -> None: - """Initialize the exception with stream error details.""" - self.type_name = type_name - self.operation = operation - self.expected_bytes = expected_bytes - self.actual_bytes = actual_bytes - - if expected_bytes is not None and actual_bytes is not None: - msg = ( - f"Stream error while {operation} {type_name}: " - f"expected {expected_bytes} bytes, got {actual_bytes}" - ) - elif expected_bytes is not None: - msg = ( - f"Stream ended prematurely while {operation} {type_name}: " - f"needed {expected_bytes} bytes" - ) - else: - msg = f"Stream error while {operation} {type_name}" - - super().__init__(msg) - - -class SSZOffsetError(SSZDecodeError): - """ - Raised when SSZ offset parsing fails during variable-size decoding. - - Attributes: - type_name: The container or collection type being decoded. - field_name: The field with the invalid offset (if applicable). - start_offset: The start offset value. - end_offset: The end offset value. - """ - - def __init__( - self, - type_name: str, - *, - field_name: str | None = None, - start_offset: int | None = None, - end_offset: int | None = None, - ) -> None: - """Initialize the exception with offset error details.""" - self.field_name = field_name - self.start_offset = start_offset - self.end_offset = end_offset - - if field_name and start_offset is not None and end_offset is not None: - detail = ( - f"invalid offsets for field '{field_name}' (start={start_offset}, end={end_offset})" - ) - elif field_name: - detail = f"invalid offset for field '{field_name}'" - elif start_offset is not None and end_offset is not None: - detail = f"invalid offsets: start={start_offset} > end={end_offset}" - else: - detail = "invalid offset structure" - - super().__init__(type_name, detail) - - -class SSZSelectorError(SSZDecodeError): - """ - Raised when a Union selector is invalid. - - Attributes: - type_name: The Union type being decoded. - selector: The invalid selector value. - num_options: The number of valid options. - """ - - def __init__( - self, - type_name: str, - selector: int, - num_options: int, - ) -> None: - """Initialize the exception with selector error details.""" - self.selector = selector - self.num_options = num_options - - detail = f"selector {selector} out of range for {num_options} options" - super().__init__(type_name, detail) + """Raised for serialization errors (encoding, decoding, stream issues).""" diff --git a/src/lean_spec/types/uint.py b/src/lean_spec/types/uint.py index fa3c3b28..1acef0f1 100644 --- a/src/lean_spec/types/uint.py +++ b/src/lean_spec/types/uint.py @@ -8,12 +8,7 @@ from pydantic_core import core_schema from typing_extensions import Self -from .exceptions import ( - SSZDecodeError, - SSZOverflowError, - SSZStreamError, - SSZTypeCoercionError, -) +from .exceptions import SSZSerializationError, SSZTypeError, SSZValueError from .ssz_base import SSZType @@ -33,19 +28,12 @@ def __new__(cls, value: SupportsInt) -> Self: """ # We should accept only ints. if not isinstance(value, int) or isinstance(value, bool): - raise SSZTypeCoercionError( - expected_type="int", - actual_type=type(value).__name__, - value=value, - ) + raise SSZTypeError(f"Expected int, got {type(value).__name__}") int_value = int(value) - if not (0 <= int_value < (2**cls.BITS)): - raise SSZOverflowError( - value=int_value, - type_name=cls.__name__, - max_value=2**cls.BITS - 1, - ) + max_value = 2**cls.BITS - 1 + if not (0 <= int_value <= max_value): + raise SSZValueError(f"{int_value} out of range for {cls.__name__} [0, {max_value}]") return super().__new__(cls, int_value) @classmethod @@ -126,9 +114,8 @@ def decode_bytes(cls, data: bytes) -> Self: # Ensure the input data has the correct number of bytes. expected_length = cls.get_byte_length() if len(data) != expected_length: - raise SSZDecodeError( - type_name=cls.__name__, - detail=f"expected {expected_length} bytes, got {len(data)}", + raise SSZSerializationError( + f"{cls.__name__}: expected {expected_length} bytes, got {len(data)}" ) # The `from_bytes` class method from `int` is used to convert the data. return cls(int.from_bytes(data, "little")) @@ -169,19 +156,15 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: # For a fixed-size type, the scope must exactly match the byte length. byte_length = cls.get_byte_length() if scope != byte_length: - raise SSZDecodeError( - type_name=cls.__name__, - detail=f"invalid scope: expected {byte_length} bytes, got {scope}", + raise SSZSerializationError( + f"{cls.__name__}: invalid scope, expected {byte_length} bytes, got {scope}" ) # Read the required number of bytes from the stream. data = stream.read(byte_length) # Ensure the correct number of bytes was read. if len(data) != byte_length: - raise SSZStreamError( - type_name=cls.__name__, - operation="decoding", - expected_bytes=byte_length, - actual_bytes=len(data), + raise SSZSerializationError( + f"{cls.__name__}: expected {byte_length} bytes, got {len(data)}" ) # Decode the bytes into a new instance. return cls.decode_bytes(data) diff --git a/src/lean_spec/types/union.py b/src/lean_spec/types/union.py index 5c231cf5..c8f46824 100644 --- a/src/lean_spec/types/union.py +++ b/src/lean_spec/types/union.py @@ -16,13 +16,7 @@ from pydantic import model_validator from typing_extensions import Self -from .exceptions import ( - SSZDecodeError, - SSZSelectorError, - SSZStreamError, - SSZTypeCoercionError, - SSZTypeDefinitionError, -) +from .exceptions import SSZSerializationError, SSZTypeError, SSZValueError from .ssz_base import SSZModel, SSZType # Constants for Union implementation @@ -110,42 +104,26 @@ def _validate_union_data(cls, data: Any) -> dict[str, Any]: """Validate selector and value together.""" # Check required class attributes and get options if not hasattr(cls, "OPTIONS") or not isinstance(cls.OPTIONS, tuple): - raise SSZTypeDefinitionError( - type_name=cls.__name__, - detail="must define OPTIONS as a tuple of SSZ types", - ) + raise SSZTypeError(f"{cls.__name__} must define OPTIONS as a tuple of SSZ types") options, options_count = cls.OPTIONS, len(cls.OPTIONS) # Validate OPTIONS constraints if options_count == 0: - raise SSZTypeDefinitionError( - type_name=cls.__name__, - detail="OPTIONS cannot be empty", - ) + raise SSZTypeError(f"{cls.__name__}: OPTIONS cannot be empty") if options_count > MAX_UNION_OPTIONS: - raise SSZTypeDefinitionError( - type_name=cls.__name__, - detail=f"has {options_count} options, but maximum is {MAX_UNION_OPTIONS}", + raise SSZTypeError( + f"{cls.__name__}: has {options_count} options, max is {MAX_UNION_OPTIONS}" ) if options[0] is None and options_count == 1: - raise SSZTypeDefinitionError( - type_name=cls.__name__, - detail="cannot have None as the only option", - ) + raise SSZTypeError(f"{cls.__name__}: cannot have None as the only option") # Validate None placement (only at index 0) and types for i, opt in enumerate(options): if opt is None and i != 0: - raise SSZTypeDefinitionError( - type_name=cls.__name__, - detail=f"can only have None at index 0, found at index {i}", - ) + raise SSZTypeError(f"{cls.__name__}: None only allowed at index 0, found at {i}") elif opt is not None and not isinstance(opt, type): - raise SSZTypeDefinitionError( - type_name=cls.__name__, - detail=f"option {i} must be a type, got {type(opt)}", - ) + raise SSZTypeError(f"{cls.__name__}: option {i} must be a type, got {type(opt)}") # Extract selector and value from input selector = data.get("selector") @@ -153,20 +131,13 @@ def _validate_union_data(cls, data: Any) -> dict[str, Any]: # Validate selector if not isinstance(selector, int) or not 0 <= selector < options_count: - raise SSZSelectorError( - type_name=cls.__name__, - selector=selector if isinstance(selector, int) else -1, - num_options=options_count, - ) + sel = selector if isinstance(selector, int) else -1 + raise SSZValueError(f"{cls.__name__}: selector {sel} out of range [0, {options_count})") # Handle None option if (selected_type := options[selector]) is None: if value is not None: - raise SSZTypeCoercionError( - expected_type="None", - actual_type=type(value).__name__, - value=value, - ) + raise SSZTypeError(f"Expected None, got {type(value).__name__}") return {"selector": selector, "value": None} # Handle non-None option - coerce value if needed @@ -177,10 +148,8 @@ def _validate_union_data(cls, data: Any) -> dict[str, Any]: coerced_value = cast(Any, selected_type)(value) return {"selector": selector, "value": coerced_value} except Exception as e: - raise SSZTypeCoercionError( - expected_type=selected_type.__name__, - actual_type=type(value).__name__, - value=value, + raise SSZTypeError( + f"Expected {selected_type.__name__}, got {type(value).__name__}" ) from e @property @@ -201,10 +170,7 @@ def is_fixed_size(cls) -> bool: @classmethod def get_byte_length(cls) -> int: """Union types are variable-size and don't have fixed length.""" - raise SSZTypeDefinitionError( - type_name=cls.__name__, - detail="variable-size union has no fixed byte length", - ) + raise SSZTypeError(f"{cls.__name__}: variable-size union has no fixed byte length") def serialize(self, stream: IO[bytes]) -> int: """Serialize this Union to a byte stream in SSZ format.""" @@ -219,19 +185,14 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: """Deserialize a Union from a byte stream using SSZ format.""" # Validate scope for selector byte if scope < SELECTOR_BYTE_SIZE: - raise SSZDecodeError( - type_name=cls.__name__, - detail="scope too small for Union selector", - ) + raise SSZSerializationError(f"{cls.__name__}: scope too small for selector") # Read selector byte selector_bytes = stream.read(SELECTOR_BYTE_SIZE) if len(selector_bytes) != SELECTOR_BYTE_SIZE: - raise SSZStreamError( - type_name=cls.__name__, - operation="reading selector", - expected_bytes=SELECTOR_BYTE_SIZE, - actual_bytes=len(selector_bytes), + raise SSZSerializationError( + f"{cls.__name__}: " + f"expected {SELECTOR_BYTE_SIZE} selector bytes, got {len(selector_bytes)}" ) selector = int.from_bytes(selector_bytes, byteorder="little") @@ -239,10 +200,8 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: # Validate selector range if not 0 <= selector < len(cls.OPTIONS): - raise SSZSelectorError( - type_name=cls.__name__, - selector=selector, - num_options=len(cls.OPTIONS), + raise SSZValueError( + f"{cls.__name__}: selector {selector} out of range [0, {len(cls.OPTIONS)})" ) selected_type = cls.OPTIONS[selector] @@ -250,21 +209,15 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: # Handle None option if selected_type is None: if remaining_bytes != 0: - raise SSZDecodeError( - type_name=cls.__name__, - detail="None arm must have no payload bytes", - ) + raise SSZSerializationError(f"{cls.__name__}: None arm must have no payload bytes") return cls(selector=selector, value=None) # Handle non-None option if selected_type.is_fixed_size() and hasattr(selected_type, "get_byte_length"): required_bytes = selected_type.get_byte_length() if remaining_bytes < required_bytes: - raise SSZStreamError( - type_name=cls.__name__, - operation=f"deserializing {selected_type.__name__}", - expected_bytes=required_bytes, - actual_bytes=remaining_bytes, + raise SSZSerializationError( + f"{cls.__name__}: expected {required_bytes} bytes, got {remaining_bytes}" ) # Deserialize value @@ -272,9 +225,8 @@ def deserialize(cls, stream: IO[bytes], scope: int) -> Self: value = selected_type.deserialize(stream, remaining_bytes) return cls(selector=selector, value=value) except Exception as e: - raise SSZDecodeError( - type_name=cls.__name__, - detail=f"failed to deserialize {selected_type.__name__}: {e}", + raise SSZSerializationError( + f"{cls.__name__}: failed to deserialize {selected_type.__name__}: {e}" ) from e def encode_bytes(self) -> bytes: diff --git a/tests/lean_spec/types/test_bitfields.py b/tests/lean_spec/types/test_bitfields.py index 5066a9ab..e30f0f37 100644 --- a/tests/lean_spec/types/test_bitfields.py +++ b/tests/lean_spec/types/test_bitfields.py @@ -9,15 +9,10 @@ from lean_spec.types.bitfields import BaseBitlist, BaseBitvector from lean_spec.types.boolean import Boolean -from lean_spec.types.exceptions import ( - SSZDecodeError, - SSZLengthError, - SSZStreamError, - SSZTypeDefinitionError, -) +from lean_spec.types.exceptions import SSZSerializationError, SSZTypeError, SSZValueError -# Type alias for errors that can be SSZLengthError or wrapped in ValidationError -LengthOrValidationError = (SSZLengthError, ValidationError) +# Type alias for errors that can be SSZValueError or wrapped in ValidationError +ValueOrValidationError = (SSZValueError, ValidationError) # Define bitfield types at module level for reuse and model classes @@ -63,7 +58,7 @@ class Bitvector16(BaseBitvector): def test_instantiate_raw_type_raises_error(self) -> None: """Tests that the raw, non-specialized BaseBitvector cannot be instantiated.""" - with pytest.raises(SSZTypeDefinitionError, match="must define LENGTH"): + with pytest.raises(SSZTypeError, match="must define LENGTH"): BaseBitvector(data=[]) def test_instantiation_success(self) -> None: @@ -83,7 +78,7 @@ def test_instantiation_success(self) -> None: ) def test_instantiation_with_wrong_length_raises_error(self, values: list[Boolean]) -> None: """Tests that providing the wrong number of items during instantiation fails.""" - with pytest.raises(LengthOrValidationError): + with pytest.raises(ValueOrValidationError): Bitvector4(data=values) def test_pydantic_validation_accepts_valid_list(self) -> None: @@ -102,7 +97,7 @@ def test_pydantic_validation_accepts_valid_list(self) -> None: ) def test_pydantic_validation_rejects_invalid_values(self, invalid_value: Any) -> None: """Tests that Pydantic validation rejects lists of the wrong length.""" - with pytest.raises(LengthOrValidationError): + with pytest.raises(ValueOrValidationError): Bitvector4Model(value=invalid_value) def test_bitvector_is_immutable(self) -> None: @@ -134,7 +129,7 @@ class Bitlist16(BaseBitlist): def test_instantiate_raw_type_raises_error(self) -> None: """Tests that the raw, non-specialized BaseBitlist cannot be instantiated.""" - with pytest.raises(SSZTypeDefinitionError, match="must define LIMIT"): + with pytest.raises(SSZTypeError, match="must define LIMIT"): BaseBitlist(data=[]) def test_instantiation_success(self) -> None: @@ -150,7 +145,7 @@ def test_instantiation_over_limit_raises_error(self) -> None: class Bitlist4(BaseBitlist): LIMIT = 4 - with pytest.raises(LengthOrValidationError): + with pytest.raises(ValueOrValidationError): Bitlist4(data=[Boolean(b) for b in [True, False, True, False, True]]) def test_pydantic_validation_accepts_valid_list(self) -> None: @@ -168,7 +163,7 @@ def test_pydantic_validation_accepts_valid_list(self) -> None: ) def test_pydantic_validation_rejects_invalid_values(self, invalid_value: Any) -> None: """Tests that Pydantic validation rejects lists that exceed the limit.""" - with pytest.raises(LengthOrValidationError): + with pytest.raises(ValueOrValidationError): Bitlist8Model(value=invalid_value) def test_add_with_list(self) -> None: @@ -206,7 +201,7 @@ class Bitlist4(BaseBitlist): LIMIT = 4 bitlist = Bitlist4(data=[Boolean(True), Boolean(False), Boolean(True)]) - with pytest.raises(LengthOrValidationError): + with pytest.raises(ValueOrValidationError): bitlist + [Boolean(False), Boolean(True)] @@ -276,7 +271,7 @@ def test_bitvector_decode_invalid_length(self) -> None: class Bitvector8(BaseBitvector): LENGTH = 8 - with pytest.raises(SSZLengthError, match="requires exactly 1"): + with pytest.raises(SSZValueError, match="expected 1 bytes, got 2"): Bitvector8.decode_bytes(b"\x01\x02") # Expects 1 byte, gets 2 def test_bitlist_decode_invalid_data(self) -> None: @@ -285,7 +280,7 @@ def test_bitlist_decode_invalid_data(self) -> None: class Bitlist8(BaseBitlist): LIMIT = 8 - with pytest.raises(SSZDecodeError, match="cannot decode empty bytes"): + with pytest.raises(SSZSerializationError, match="cannot decode empty bytes"): Bitlist8.decode_bytes(b"") @@ -304,7 +299,7 @@ class Bitlist10(BaseBitlist): LIMIT = 10 assert Bitlist10.is_fixed_size() is False - with pytest.raises(SSZTypeDefinitionError): + with pytest.raises(SSZTypeError): Bitlist10.get_byte_length() def test_bitvector_deserialize_invalid_scope(self) -> None: @@ -312,7 +307,7 @@ class Bitvector8(BaseBitvector): LENGTH = 8 stream = io.BytesIO(b"\xff") - with pytest.raises(SSZDecodeError, match="expected 1 bytes, got 2"): + with pytest.raises(SSZSerializationError, match="expected 1 bytes, got 2"): Bitvector8.deserialize(stream, scope=2) def test_bitvector_deserialize_premature_end(self) -> None: @@ -320,7 +315,7 @@ class Bitvector16(BaseBitvector): LENGTH = 16 stream = io.BytesIO(b"\xff") # Only 1 byte, expects 2 - with pytest.raises(SSZStreamError, match="expected 2 bytes, got 1"): + with pytest.raises(SSZSerializationError, match="expected 2 bytes, got 1"): Bitvector16.deserialize(stream, scope=2) def test_bitlist_deserialize_premature_end(self) -> None: @@ -328,7 +323,7 @@ class Bitlist16(BaseBitlist): LIMIT = 16 stream = io.BytesIO(b"\xff") # Only 1 byte - with pytest.raises(SSZStreamError, match="expected 2 bytes, got 1"): + with pytest.raises(SSZSerializationError, match="expected 2 bytes, got 1"): Bitlist16.deserialize(stream, scope=2) # Scope says to read 2 @pytest.mark.parametrize( diff --git a/tests/lean_spec/types/test_boolean.py b/tests/lean_spec/types/test_boolean.py index 89536c30..e9c17a4c 100644 --- a/tests/lean_spec/types/test_boolean.py +++ b/tests/lean_spec/types/test_boolean.py @@ -7,12 +7,7 @@ from pydantic import BaseModel, ValidationError from lean_spec.types.boolean import Boolean -from lean_spec.types.exceptions import ( - SSZDecodeError, - SSZStreamError, - SSZTypeCoercionError, - SSZValueError, -) +from lean_spec.types.exceptions import SSZSerializationError, SSZTypeError, SSZValueError class BooleanModel(BaseModel): @@ -52,8 +47,8 @@ def test_instantiation_from_invalid_int_raises_error(invalid_int: int) -> None: @pytest.mark.parametrize("invalid_type", [1.0, "True", b"\x01", None]) def test_instantiation_from_invalid_types_raises_error(invalid_type: Any) -> None: - """Tests that instantiating with non-bool/non-int types raises SSZTypeCoercionError.""" - with pytest.raises(SSZTypeCoercionError, match="Expected bool or int"): + """Tests that instantiating with non-bool/non-int types raises SSZTypeError.""" + with pytest.raises(SSZTypeError, match="Expected bool or int"): Boolean(invalid_type) @@ -216,16 +211,16 @@ def test_encode_decode_roundtrip(self, value: bool, expected_bytes: bytes) -> No def test_decode_invalid_length(self) -> None: """Tests that decode_bytes fails with incorrect byte length.""" - with pytest.raises(SSZDecodeError, match="expected 1 byte"): + with pytest.raises(SSZSerializationError, match="expected 1 byte"): Boolean.decode_bytes(b"") - with pytest.raises(SSZDecodeError, match="expected 1 byte"): + with pytest.raises(SSZSerializationError, match="expected 1 byte"): Boolean.decode_bytes(b"\x00\x01") def test_decode_invalid_value(self) -> None: """Tests that decode_bytes fails with an invalid byte value.""" - with pytest.raises(SSZDecodeError, match="must be 0x00 or 0x01"): + with pytest.raises(SSZSerializationError, match="must be 0x00 or 0x01"): Boolean.decode_bytes(b"\x02") - with pytest.raises(SSZDecodeError, match="must be 0x00 or 0x01"): + with pytest.raises(SSZSerializationError, match="must be 0x00 or 0x01"): Boolean.decode_bytes(b"\xff") @pytest.mark.parametrize("value", [True, False]) @@ -247,15 +242,15 @@ def test_serialize_deserialize_roundtrip(self, value: bool) -> None: def test_deserialize_invalid_scope(self) -> None: """Tests that deserialize fails with an incorrect scope.""" stream = io.BytesIO(b"\x01") - with pytest.raises(SSZDecodeError, match="expected scope of 1"): + with pytest.raises(SSZSerializationError, match="expected scope of 1"): Boolean.deserialize(stream, scope=0) stream.seek(0) - with pytest.raises(SSZDecodeError, match="expected scope of 1"): + with pytest.raises(SSZSerializationError, match="expected scope of 1"): Boolean.deserialize(stream, scope=2) def test_deserialize_premature_stream_end(self) -> None: """Tests that deserialize fails if the stream ends prematurely.""" stream = io.BytesIO(b"") # Empty stream - with pytest.raises(SSZStreamError, match="expected 1 bytes, got 0"): + with pytest.raises(SSZSerializationError, match="expected 1 byte, got 0"): Boolean.deserialize(stream, scope=1) diff --git a/tests/lean_spec/types/test_byte_arrays.py b/tests/lean_spec/types/test_byte_arrays.py index 46fe6f2b..5f9cb6cd 100644 --- a/tests/lean_spec/types/test_byte_arrays.py +++ b/tests/lean_spec/types/test_byte_arrays.py @@ -19,11 +19,7 @@ BaseBytes, BaseByteList, ) -from lean_spec.types.exceptions import ( - SSZDecodeError, - SSZLengthError, - SSZStreamError, -) +from lean_spec.types.exceptions import SSZSerializationError, SSZTypeError, SSZValueError def sha256(b: bytes) -> bytes: @@ -66,11 +62,11 @@ def test_bytevector_coercion(value: Any, expected: bytes) -> None: def test_bytevector_wrong_length_raises() -> None: - with pytest.raises(SSZLengthError): + with pytest.raises(SSZValueError): Bytes4(b"\x00\x01\x02") # 3 != 4 - with pytest.raises(SSZLengthError): + with pytest.raises(SSZValueError): Bytes4([0, 1, 2]) # 3 != 4 - with pytest.raises(SSZLengthError): + with pytest.raises(SSZValueError): Bytes4("000102") # 3 != 4 (hex nibbles -> 3 bytes) @@ -94,7 +90,7 @@ class ByteList5(BaseByteList): def test_bytelist_over_limit_raises() -> None: # Test with ByteList64 that has limit 64 - with pytest.raises(SSZLengthError): + with pytest.raises(SSZValueError): ByteList64(data=b"\x00" * 65) # Over the limit @@ -175,7 +171,7 @@ def test_encode_decode_roundtrip_vector(Typ: Type[BaseBytes], payload: bytes) -> def test_vector_deserialize_scope_mismatch_raises() -> None: v = Bytes4(b"\x00\x01\x02\x03") buf = io.BytesIO(v.encode_bytes()) - with pytest.raises(SSZDecodeError, match="expected 4 bytes, got 3"): + with pytest.raises(SSZSerializationError, match="expected 4 bytes, got 3"): Bytes4.deserialize(buf, 3) # wrong scope @@ -209,7 +205,7 @@ class TestByteList2(BaseByteList): LIMIT = 2 buf = io.BytesIO(b"\x00\x01\x02") - with pytest.raises(SSZLengthError): + with pytest.raises(SSZValueError): TestByteList2.deserialize(buf, 3) @@ -218,7 +214,7 @@ class TestByteList10(BaseByteList): LIMIT = 10 buf = io.BytesIO(b"\x00\x01") - with pytest.raises(SSZStreamError): + with pytest.raises(SSZSerializationError): TestByteList10.deserialize(buf, 3) # stream too short @@ -254,11 +250,11 @@ def test_pydantic_accepts_various_inputs_for_vectors() -> None: def test_pydantic_validates_vector_lengths() -> None: - with pytest.raises(SSZLengthError): + with pytest.raises(SSZValueError): ModelVectors(root=Bytes32(b"\x11" * 31), key=Bytes4(b"\x00\x01\x02\x03")) # too short - with pytest.raises(SSZLengthError): + with pytest.raises(SSZValueError): ModelVectors(root=Bytes32(b"\x11" * 33), key=Bytes4(b"\x00\x01\x02\x03")) # too long - with pytest.raises(SSZLengthError): + with pytest.raises(SSZValueError): ModelVectors(root=Bytes32(b"\x11" * 32), key=Bytes4(b"\x00\x01\x02")) # key too short @@ -282,7 +278,7 @@ def test_pydantic_accepts_and_serializes_bytelist() -> None: def test_pydantic_bytelist_limit_enforced() -> None: - with pytest.raises(SSZLengthError): + with pytest.raises(SSZValueError): ModelLists(payload=ByteList16(data=bytes(range(17)))) # over limit diff --git a/tests/lean_spec/types/test_collections.py b/tests/lean_spec/types/test_collections.py index b85edfcc..6529fb9c 100644 --- a/tests/lean_spec/types/test_collections.py +++ b/tests/lean_spec/types/test_collections.py @@ -10,11 +10,11 @@ from lean_spec.types.boolean import Boolean from lean_spec.types.collections import SSZList, SSZVector from lean_spec.types.container import Container -from lean_spec.types.exceptions import SSZLengthError, SSZTypeCoercionError, SSZTypeDefinitionError +from lean_spec.types.exceptions import SSZSerializationError, SSZTypeError, SSZValueError from lean_spec.types.uint import Uint8, Uint16, Uint32, Uint256 -# Type alias for errors that can be SSZLengthError or wrapped in ValidationError -LengthOrValidationError = (SSZLengthError, ValidationError) +# Type alias for errors that can be SSZValueError or wrapped in ValidationError +ValueOrValidationError = (SSZValueError, ValidationError) # Define some List types that are needed for Container definitions @@ -238,9 +238,9 @@ def test_instantiation_success(self) -> None: def test_instantiation_with_wrong_length_raises_error(self) -> None: """Tests that providing the wrong number of items during instantiation fails.""" vec_type = Uint8Vector4 - with pytest.raises(LengthOrValidationError): + with pytest.raises(ValueOrValidationError): vec_type(data=[Uint8(1), Uint8(2), Uint8(3)]) # Too few - with pytest.raises(LengthOrValidationError): + with pytest.raises(ValueOrValidationError): vec_type(data=[Uint8(1), Uint8(2), Uint8(3), Uint8(4), Uint8(5)]) # Too many def test_pydantic_validation(self) -> None: @@ -250,11 +250,11 @@ def test_pydantic_validation(self) -> None: assert isinstance(instance.value, Uint8Vector2) assert list(instance.value) == [Uint8(10), Uint8(20)] # Test invalid data - with pytest.raises(LengthOrValidationError): + with pytest.raises(ValueOrValidationError): Uint8Vector2Model(value={"data": [10]}) # type: ignore[arg-type] - with pytest.raises(LengthOrValidationError): + with pytest.raises(ValueOrValidationError): Uint8Vector2Model(value={"data": [10, 20, 30]}) # type: ignore[arg-type] - with pytest.raises(SSZTypeCoercionError): + with pytest.raises(SSZTypeError): Uint8Vector2Model(value={"data": [10, "bad"]}) # type: ignore[arg-type] def test_vector_is_immutable(self) -> None: @@ -285,13 +285,13 @@ def test_class_getitem_creates_specialized_type(self) -> None: def test_instantiate_raw_type_raises_error(self) -> None: """Tests that the raw, non-specialized SSZList cannot be instantiated.""" - with pytest.raises(SSZTypeDefinitionError, match="must define ELEMENT_TYPE and LIMIT"): + with pytest.raises(SSZTypeError, match="must define ELEMENT_TYPE and LIMIT"): SSZList(data=[]) def test_instantiation_over_limit_raises_error(self) -> None: """Tests that providing more items than the limit during instantiation fails.""" list_type = Uint8List4 - with pytest.raises(LengthOrValidationError): + with pytest.raises(ValueOrValidationError): list_type(data=[Uint8(1), Uint8(2), Uint8(3), Uint8(4), Uint8(5)]) def test_pydantic_validation(self) -> None: @@ -301,19 +301,19 @@ def test_pydantic_validation(self) -> None: assert isinstance(instance.value, Uint8List4) assert list(instance.value) == [Uint8(10), Uint8(20)] # Test invalid data - list too long - with pytest.raises(LengthOrValidationError): + with pytest.raises(ValueOrValidationError): Uint8List4Model( value=Uint8List4(data=[Uint8(10), Uint8(20), Uint8(30), Uint8(40), Uint8(50)]) ) def test_append_at_limit_raises_error(self) -> None: """Tests that creating a list at limit +1 fails during construction.""" - with pytest.raises(LengthOrValidationError): + with pytest.raises(ValueOrValidationError): BooleanList4(data=[Boolean(True)] * 5) def test_extend_over_limit_raises_error(self) -> None: """Tests that creating a list over the limit fails during construction.""" - with pytest.raises(LengthOrValidationError): + with pytest.raises(ValueOrValidationError): BooleanList4( data=[Boolean(True), Boolean(False), Boolean(True), Boolean(False), Boolean(True)] ) @@ -336,7 +336,7 @@ def test_add_with_sszlist(self) -> None: def test_add_exceeding_limit_raises_error(self) -> None: """Tests that concatenating beyond the limit raises an error.""" list1 = Uint8List4(data=[Uint8(1), Uint8(2), Uint8(3)]) - with pytest.raises(LengthOrValidationError): + with pytest.raises(ValueOrValidationError): list1 + [4, 5] diff --git a/tests/lean_spec/types/test_uint.py b/tests/lean_spec/types/test_uint.py index f22bd851..5f8f37ae 100644 --- a/tests/lean_spec/types/test_uint.py +++ b/tests/lean_spec/types/test_uint.py @@ -6,12 +6,7 @@ import pytest from pydantic import BaseModel, ValidationError -from lean_spec.types.exceptions import ( - SSZDecodeError, - SSZOverflowError, - SSZStreamError, - SSZTypeCoercionError, -) +from lean_spec.types.exceptions import SSZSerializationError, SSZTypeError, SSZValueError from lean_spec.types.uint import ( BaseUint, Uint8, @@ -98,9 +93,9 @@ def test_pydantic_strict_mode_rejects_invalid_types( def test_instantiation_from_invalid_types_raises_error( uint_class: Type[BaseUint], invalid_value: Any, expected_type_name: str ) -> None: - """Tests that instantiating with non-integer types raises SSZTypeCoercionError.""" + """Tests that instantiating with non-integer types raises SSZTypeError.""" expected_msg = f"Expected int, got {expected_type_name}" - with pytest.raises(SSZTypeCoercionError, match=expected_msg): + with pytest.raises(SSZTypeError, match=expected_msg): uint_class(invalid_value) @@ -115,16 +110,16 @@ def test_instantiation_and_type(uint_class: Type[BaseUint]) -> None: @pytest.mark.parametrize("uint_class", ALL_UINT_TYPES) def test_instantiation_negative(uint_class: Type[BaseUint]) -> None: - """Tests that instantiating with a negative number raises SSZOverflowError.""" - with pytest.raises(SSZOverflowError): + """Tests that instantiating with a negative number raises SSZValueError.""" + with pytest.raises(SSZValueError): uint_class(-5) @pytest.mark.parametrize("uint_class", ALL_UINT_TYPES) def test_instantiation_too_large(uint_class: Type[BaseUint]) -> None: - """Tests that instantiating with a value >= MAX raises SSZOverflowError.""" + """Tests that instantiating with a value >= MAX raises SSZValueError.""" max_value = 2**uint_class.BITS - with pytest.raises(SSZOverflowError): + with pytest.raises(SSZValueError): uint_class(max_value) @@ -146,17 +141,17 @@ def test_arithmetic_operators(uint_class: Type[BaseUint]) -> None: # Addition assert a + b == uint_class(a_val + b_val) - with pytest.raises(SSZOverflowError): + with pytest.raises(SSZValueError): _ = max_val + b # Subtraction assert a - b == uint_class(a_val - b_val) - with pytest.raises(SSZOverflowError): + with pytest.raises(SSZValueError): _ = b - a # Multiplication assert a * b == uint_class(a_val * b_val) - with pytest.raises(SSZOverflowError): + with pytest.raises(SSZValueError): _ = max_val * b # Floor Division @@ -168,7 +163,7 @@ def test_arithmetic_operators(uint_class: Type[BaseUint]) -> None: # Exponentiation assert uint_class(b_val) ** 4 == uint_class(b_val**4) if uint_class.BITS <= 16: # Pow gets too big quickly - with pytest.raises(SSZOverflowError): + with pytest.raises(SSZValueError): _ = a ** int(b) @@ -405,10 +400,10 @@ def test_encode_decode_roundtrip( @pytest.mark.parametrize("uint_class", ALL_UINT_TYPES) def test_decode_bytes_invalid_length(self, uint_class: Type[BaseUint]) -> None: - """Tests that `decode_bytes` raises an SSZDecodeError for data of the wrong length.""" + """Tests that `decode_bytes` raises SSZSerializationError for wrong length data.""" # Create byte string that is one byte too short. invalid_data = b"\x00" * (uint_class.get_byte_length() - 1) - with pytest.raises(SSZDecodeError, match="expected .* bytes, got"): + with pytest.raises(SSZSerializationError, match="expected .* bytes, got"): uint_class.decode_bytes(invalid_data) @pytest.mark.parametrize("uint_class", ALL_UINT_TYPES) @@ -432,17 +427,17 @@ def test_serialize_deserialize_stream_roundtrip(self, uint_class: Type[BaseUint] @pytest.mark.parametrize("uint_class", ALL_UINT_TYPES) def test_deserialize_invalid_scope(self, uint_class: Type[BaseUint]) -> None: - """Tests that `deserialize` raises an SSZDecodeError if the scope is incorrect.""" + """Tests that `deserialize` raises an SSZSerializationError if the scope is incorrect.""" stream = io.BytesIO(b"\x00" * uint_class.get_byte_length()) invalid_scope = uint_class.get_byte_length() - 1 - with pytest.raises(SSZDecodeError, match="invalid scope"): + with pytest.raises(SSZSerializationError, match="invalid scope"): uint_class.deserialize(stream, scope=invalid_scope) @pytest.mark.parametrize("uint_class", ALL_UINT_TYPES) def test_deserialize_stream_too_short(self, uint_class: Type[BaseUint]) -> None: - """Tests that `deserialize` raises an SSZStreamError if the stream ends prematurely.""" + """Tests that `deserialize` raises SSZSerializationError if stream ends prematurely.""" byte_length = uint_class.get_byte_length() # Create a stream that is shorter than what the type requires. stream = io.BytesIO(b"\x00" * (byte_length - 1)) - with pytest.raises(SSZStreamError, match="expected .* bytes, got"): + with pytest.raises(SSZSerializationError, match="expected .* bytes, got"): uint_class.deserialize(stream, scope=byte_length) diff --git a/tests/lean_spec/types/test_union.py b/tests/lean_spec/types/test_union.py index a5bd58a7..59b010dd 100644 --- a/tests/lean_spec/types/test_union.py +++ b/tests/lean_spec/types/test_union.py @@ -9,12 +9,7 @@ from lean_spec.types.collections import SSZList, SSZVector from lean_spec.types.container import Container -from lean_spec.types.exceptions import ( - SSZDecodeError, - SSZSelectorError, - SSZTypeCoercionError, - SSZTypeDefinitionError, -) +from lean_spec.types.exceptions import SSZSerializationError, SSZTypeError, SSZValueError from lean_spec.types.ssz_base import SSZType from lean_spec.types.uint import Uint8, Uint16, Uint32 from lean_spec.types.union import SSZUnion @@ -118,14 +113,14 @@ def test_constructor_success() -> None: def test_constructor_errors() -> None: """Test Union construction error cases.""" # Invalid selector (out of range) - with pytest.raises(SSZSelectorError, match="out of range"): + with pytest.raises(SSZValueError, match="out of range"): OptionalNumericUnion(selector=3, value=None) # None value for None option should work OptionalNumericUnion(selector=0, value=None) # Non-None value for None option should fail - with pytest.raises(SSZTypeCoercionError, match="Expected None"): + with pytest.raises(SSZTypeError, match="Expected None"): OptionalNumericUnion(selector=0, value=Uint16(1)) @@ -144,22 +139,22 @@ def test_pydantic_validation_ok() -> None: def test_pydantic_validation_errors() -> None: """Test Pydantic validation error cases.""" # Test invalid selector directly - with pytest.raises(SSZSelectorError, match="out of range"): + with pytest.raises(SSZValueError, match="out of range"): OptionalNumericUnion(selector=9, value=0) # Test invalid value for None option directly - with pytest.raises(SSZTypeCoercionError, match="Expected None"): + with pytest.raises(SSZTypeError, match="Expected None"): OptionalNumericUnion(selector=0, value=1) # Test with Pydantic model wrapper - should catch underlying errors model = create_model("M", v=(OptionalNumericUnion, ...)) # Invalid selector in model context - with pytest.raises((ValidationError, SSZSelectorError)): + with pytest.raises((ValidationError, SSZValueError)): model(v={"selector": 9, "value": 0}) # Invalid value for None option in model context - with pytest.raises((ValidationError, SSZTypeCoercionError)): + with pytest.raises((ValidationError, SSZTypeError)): model(v={"selector": 0, "value": 1}) @@ -198,15 +193,15 @@ def test_union_with_nested_composites_roundtrip() -> None: def test_deserialize_errors() -> None: """Test deserialization error cases.""" # Too small scope - with pytest.raises(SSZDecodeError, match="scope too small"): + with pytest.raises(SSZSerializationError, match="scope too small"): SimpleUnion.deserialize(io.BytesIO(b""), 0) # Invalid selector - with pytest.raises(SSZSelectorError, match="out of range"): + with pytest.raises(SSZValueError, match="out of range"): SimpleUnion.deserialize(io.BytesIO(b"\x09"), 1) # None option with payload - with pytest.raises(SSZDecodeError, match="no payload bytes"): + with pytest.raises(SSZSerializationError, match="no payload bytes"): OptionalNumericUnion.deserialize(io.BytesIO(b"\x00\xff"), 2) @@ -261,7 +256,7 @@ def test_is_fixed_size_helper() -> None: def test_get_byte_length_raises() -> None: """Test get_byte_length() raises for variable-size types.""" - with pytest.raises(SSZTypeDefinitionError, match="variable-size"): + with pytest.raises(SSZTypeError, match="variable-size"): NumericUnion.get_byte_length() @@ -276,7 +271,7 @@ class ValidUnion(SSZUnion): assert instance.selector == 0 # Invalid union with None not at index 0 should fail during validation - with pytest.raises(SSZTypeDefinitionError, match="None at index 0"): + with pytest.raises(SSZTypeError, match="None only allowed at index 0"): class InvalidUnion1(SSZUnion): OPTIONS = (Uint16, None) @@ -287,7 +282,7 @@ class InvalidUnion1(SSZUnion): class NotSSZ: pass - with pytest.raises(SSZTypeCoercionError): + with pytest.raises(SSZTypeError): class InvalidUnion2(SSZUnion): OPTIONS = (cast(PyType[SSZType], NotSSZ),) @@ -303,7 +298,7 @@ def test_union_boundary_cases() -> None: assert u.value == Uint16(42) # None-only union should fail validation - with pytest.raises(SSZTypeDefinitionError, match="only option"): + with pytest.raises(SSZTypeError, match="only option"): class NoneOnlyUnion(SSZUnion): OPTIONS = (None,)