From e5cb8390890fa4674bfe6d582b76dddd4d1f716d Mon Sep 17 00:00:00 2001 From: Raoul Schaffranek Date: Sun, 22 Mar 2026 23:09:32 +0100 Subject: [PATCH] First version of the annotator --- README.md | 113 ++-- src/annotator/__init__.py | 3 + src/annotator/__main__.py | 91 +++ src/annotator/annotate.py | 184 ++++++ src/annotator/ast_walker.py | 190 +++++++ src/annotator/bytecode.py | 90 +++ src/annotator/opcodes.py | 97 ++++ src/annotator/program.py | 137 +++++ src/annotator/types.py | 191 +++++++ src/annotator/variables.py | 115 ++++ src/tests/conftest.py | 256 +++++++++ src/tests/ethdebug_dsl.py | 530 ++++++++++++++++++ src/tests/test_annotate_solc.py | 966 ++++++++++++++++++++++++++++++++ 13 files changed, 2927 insertions(+), 36 deletions(-) create mode 100644 src/annotator/__init__.py create mode 100644 src/annotator/__main__.py create mode 100644 src/annotator/annotate.py create mode 100644 src/annotator/ast_walker.py create mode 100644 src/annotator/bytecode.py create mode 100644 src/annotator/opcodes.py create mode 100644 src/annotator/program.py create mode 100644 src/annotator/types.py create mode 100644 src/annotator/variables.py create mode 100644 src/tests/conftest.py create mode 100644 src/tests/ethdebug_dsl.py create mode 100644 src/tests/test_annotate_solc.py diff --git a/README.md b/README.md index 3c6544f..4422cf2 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,28 @@ -# EthDebug.py +
-EthDebug.py is a library offering debugging primitives that are commonly used by developer tools, such as breakpoint-style debuggers, testing frameworks, or static analyzers/linters. Notably, it includes a complete debugger-side implementation of the EthDebug format. The main function is reading Solidity runtime information (like local variables) from a running Ethereum Virtual Machine. +# 🐞 EthDebug.py + +> A Python library of debugging primitives for Ethereum developer tools — breakpoint debuggers, testing frameworks, and static analyzers. + +[![Discord](https://img.shields.io/badge/discord-join-7289da)](https://discord.gg/CurfmXNtbN) +[![License](https://img.shields.io/badge/license-BSD--3-orange)](LICENSE) + +
+ +EthDebug.py includes a complete debugger-side implementation of the [EthDebug format](https://ethdebug.github.io/format/). Its core capability is reading Solidity runtime information (such as local variables) from a running Ethereum Virtual Machine. + +**What you can do with EthDebug.py:** -Things you can do with EthDebug.py: - Read the value of a Solidity variable at a paused machine state - See which variables are in scope at a specific source location -- Provide better log and error messages by replacing unreadable EVM details with their human-readable Solidity counterparts +- Replace unreadable EVM details with their human-readable Solidity counterparts in logs and error messages +- Work in Progress: Generate ethdebug data for existing compilation units + +--- -This library is agnostic of any specific virtual machine implementation and compiler. The following diagram shows the relationship between the different components: +## Architecture + +This library is agnostic of any specific virtual machine implementation and compiler. The diagram below shows how the components relate: ```mermaid flowchart TD @@ -31,55 +46,81 @@ flowchart TD Dereference-->View-->Debugger ``` +--- + ## Goals and Non-Goals -- Improve ecosystem-wide developer experience by providing a rich set of debugging primitives -- Provide feedback on the specification and implementation of the EthDebug format -- Assist compilers when implementing the counterpart of the EthDebug protocol -- It is explicitly beyond the scope of this project to develop a fully-featured stand-alone debugger. For a debugger that uses this library, see [Simbolik](https://simbolik.runtimeverification.com/). In fact, this library used to be a part of Simbolik but has since been extracted into its own project. +| | | +|---|---| +| ✅ | Improve ecosystem-wide developer experience by providing a rich set of debugging primitives | +| ✅ | Provide feedback on the specification and implementation of the EthDebug format | +| ✅ | Assist compilers when implementing the counterpart of the EthDebug protocol | +| ❌ | Develop a fully-featured stand-alone debugger — see [Simbolik](https://simbolik.runtimeverification.com/) for that (this library was originally extracted from Simbolik) | + +--- ## API Docs -The [Project Structure](#project-structure) section provides a high-level overview of the provided modules. Inside each module, you'll find extensive pydoc comments detailing how the module is meant to be used. +The [Project Structure](#project-structure) section provides a high-level overview of the provided modules. Inside each module you'll find extensive pydoc comments detailing how it is meant to be used. -For examples of how to use the library for a specific task, the tests generally offer a good starting point. +For concrete usage examples, the tests are a good starting point. ### Project Structure -- `src/ethdebug/format` \ - This module contains parsers and generators for all EthDebug schemas. The module structure closely follows the sub-schema hierarchy. These models are auto-generated directly from the spec and kept up to date as the spec evolves. -- `src/ethdebug/evaluate.py` \ - This module contains data structures and algorithms for evaluating pointers in the context of a paused machine state. Notice that "evaluating" here is not the same as "dereferencing." -- `src/ethdebug/dereference` \ - This module offers a complete pointer dereferencing algorithm. This algorithm is a rewrite of the TypeScript reference implementation in Python. It has support for all pointer regions, collections, expressions, and templates. -- `src/ethdebug/cursor.py` \ - This module defines the result of dereferencing a pointer. -- `src/ethdebug/data.py` \ - The data module defines low-level primitives to convert between different data representations, such as converting between raw bytes and unsigned integers. -- `src/ethdebug/machine.py` \ - This module defines abstract protocols `Machine`, `MachineTrace`, and `MachineState`. EthDebug.py aims to be agnostic of any specific EVM implementation. Users of the library must implement these protocols themselves. -- `tests` contains all sorts of automated tests. Some tests are ported from the reference implementation to ensure consistency. Other tests are specifically developed to test the integration with the Solidity compiler. +| Module | Description | +|---|---| +| `src/ethdebug/format` | Parsers and generators for all EthDebug schemas. Structure mirrors the sub-schema hierarchy. Auto-generated from the spec and kept in sync as it evolves. | +| `src/ethdebug/evaluate.py` | Data structures and algorithms for evaluating pointers in the context of a paused machine state. Note: "evaluating" is distinct from "dereferencing." | +| `src/ethdebug/dereference` | Complete pointer dereferencing algorithm. A Python rewrite of the TypeScript reference implementation, with support for all pointer regions, collections, expressions, and templates. | +| `src/ethdebug/cursor.py` | Defines the result of dereferencing a pointer. | +| `src/ethdebug/data.py` | Low-level primitives for converting between data representations (e.g. raw bytes ↔ unsigned integers). | +| `src/ethdebug/machine.py` | Abstract protocols `Machine`, `MachineTrace`, and `MachineState`. Users of the library implement these to integrate their own EVM. | +| `tests/` | Automated tests. Some are ported from the reference implementation to ensure consistency; others test integration with the Solidity compiler. | + +--- + +## EthDebug Annotation Tool + +> **Work in Progress:** The annotator is under active development and not yet feature-complete. Expect gaps in coverage, breaking changes, and incomplete output. + +The solc compiler does not yet generate EthDebug data, but the annotation tool can be used to add it to existing solc output. This is useful for testing and prototyping while compiler support is in progress. +It can also be used as backwards-compatibility layer for tools that want to support EthDebug but rely on solc output. + +```bash +# Compile +solc --standard-json < input.json > output.json + +# Annotate +python -m annotator output.json -o annotated.json + +# Pipeline +solc --standard-json < input.json | python -m annotator > annotated.json +``` + +Run `python -m annotator --help` for full CLI options. + +--- ## For Contributors and Maintainers ### Regenerating the Validators -The data models used for parsing and validating the EthDebug format are generated from the JSON schema using the `generate_model.py` script. The files should be regenerated when the JSON schema files change or when the `datamodel-code-generator` library is updated. - -~~~bash -uv run python ./generate_model.py -~~~ +The data models for parsing and validating the EthDebug format are generated from the JSON schema using `generate_model.py`. Regenerate them whenever the JSON schema files change or `datamodel-code-generator` is updated: -The `datamodel-code-generator` library we use to generate the validators has some custom changes to make it work with the EthDebug JSON schema files. The library is therefore embedded as a subtree in the `datamodel-code-generator` directory. To update the library, you can run the following command: +```bash +uv run python ./generate_model.py +``` -~~~bash -git subtree pull --prefix=datamodel-code-generator git@github.com:koxudaxi/datamodel-code-generator.git main --squash -~~~ +> **Note:** The `datamodel-code-generator` library has custom patches to work with the EthDebug JSON schema files and is embedded as a subtree in the `datamodel-code-generator/` directory. To update it: +> +> ```bash +> git subtree pull --prefix=datamodel-code-generator git@github.com:koxudaxi/datamodel-code-generator.git main --squash +> ``` -### Using solc to Generate Standard JSON Output Files +### Generating Test Fixtures with `solc` -~~~bash +```bash pushd tests && solc --standard-json mega_playground/input.json > mega_playground/output.json && popd pushd tests && solc --standard-json abstract_and_interface/input.json --pretty-json > abstract_and_interface/output.json && popd pushd tests && solc --standard-json standard_yul_debug_info_ethdebug_compatible_output/input.json > standard_yul_debug_info_ethdebug_compatible_output/output.json --allow-paths . && popd -~~~ +``` diff --git a/src/annotator/__init__.py b/src/annotator/__init__.py new file mode 100644 index 0000000..3850a38 --- /dev/null +++ b/src/annotator/__init__.py @@ -0,0 +1,3 @@ +from .annotate import annotate, check_optimizer_disabled + +__all__ = ["annotate", "check_optimizer_disabled"] diff --git a/src/annotator/__main__.py b/src/annotator/__main__.py new file mode 100644 index 0000000..bbfe2b9 --- /dev/null +++ b/src/annotator/__main__.py @@ -0,0 +1,91 @@ +""" +__main__.py + +CLI entry point: python -m annotator +""" + +from __future__ import annotations + +import argparse +import json +import sys +from typing import Optional + +from .annotate import annotate, check_optimizer_disabled + + +def main() -> None: + parser = argparse.ArgumentParser( + description=( + "Annotate solc standard JSON output with ethdebug format data.\n" + "\n" + "The compilation must have been performed with the optimizer disabled.\n" + "For full annotation, request the following output fields:\n" + " ast, metadata, storageLayout,\n" + " evm.bytecode.object, evm.bytecode.sourceMap,\n" + " evm.deployedBytecode.object, evm.deployedBytecode.sourceMap" + ), + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "output_file", + nargs="?", + default="-", + help="Path to solc standard JSON output file, or '-' for stdin (default: stdin)", + ) + parser.add_argument( + "-o", + "--output", + help="Annotated output file path (default: stdout)", + ) + parser.add_argument( + "-i", + "--input-json", + dest="input_json", + help=( + "Path to the original solc standard JSON *input* file. " + "Used to include source file contents in the ethdebug Info object." + ), + ) + parser.add_argument( + "-s", + "--sources-dir", + dest="sources_dirs", + action="append", + default=[], + metavar="DIR", + help=( + "Directory to search for source files (can be repeated). " + "Used to include source file contents when --input-json is not provided." + ), + ) + args = parser.parse_args() + + if args.output_file == "-": + solc_output = json.load(sys.stdin) + else: + with open(args.output_file) as f: + solc_output = json.load(f) + + input_json: Optional[dict] = None + if args.input_json: + with open(args.input_json) as f: + input_json = json.load(f) + + check_optimizer_disabled(solc_output) + annotated = annotate( + solc_output, + source_dirs=args.sources_dirs or None, + input_json=input_json, + ) + + text = json.dumps(annotated, indent=2) + if args.output: + with open(args.output, "w") as f: + f.write(text) + else: + print(text) + + +if __name__ == "__main__": + main() diff --git a/src/annotator/annotate.py b/src/annotator/annotate.py new file mode 100644 index 0000000..89ba094 --- /dev/null +++ b/src/annotator/annotate.py @@ -0,0 +1,184 @@ +""" +annotate.py + +Core annotation logic: validates optimizer settings and annotates +solc standard JSON output with ethdebug data. +""" + +from __future__ import annotations + +import hashlib +import json +import os +from typing import Optional + +from .ast_walker import _ASTWalker, _src_to_range +from .program import _get_compiler_version, build_program +from .variables import build_storage_map + + +def check_optimizer_disabled(solc_output: dict) -> None: + """Raise RuntimeError if any contract's metadata shows optimizer enabled.""" + for filename, contracts in solc_output.get("contracts", {}).items(): + for contract_name, contract_data in contracts.items(): + metadata_str = contract_data.get("metadata") + if not metadata_str: + continue + try: + metadata = json.loads(metadata_str) + except json.JSONDecodeError: + continue + optimizer = metadata.get("settings", {}).get("optimizer", {}) + if optimizer.get("enabled", False): + raise RuntimeError( + f"Optimizer is enabled for {filename}:{contract_name}. " + "ethdebug annotation requires compilation with optimizer disabled." + ) + + +def _read_source_content( + path: str, + source_dirs: Optional[list[str]] = None, + input_json: Optional[dict] = None, +) -> Optional[str]: + """Try to resolve source file contents from disk or from the original input JSON.""" + # Try the original solc input JSON first (fastest, no I/O) + if input_json: + src_entry = input_json.get("sources", {}).get(path) + if src_entry and isinstance(src_entry.get("content"), str): + return src_entry["content"] + + # Try to read from disk + search_dirs: list[str] = ["."] + if source_dirs: + search_dirs = source_dirs + search_dirs + + for base in search_dirs: + candidate = os.path.join(base, path) + if os.path.isfile(candidate): + try: + with open(candidate, encoding="utf-8") as f: + return f.read() + except OSError: + pass + + return None + + +def annotate( + solc_output: dict, + source_dirs: Optional[list[str]] = None, + input_json: Optional[dict] = None, +) -> dict: + """Annotate a solc standard JSON output dict with ethdebug data in-place.""" + + # Source ID map: AST file_id (int) → ethdebug source id (same int for solc) + source_id_map: dict[int, int] = {} + sources_list: list[dict] = [] + + for path, src_data in solc_output.get("sources", {}).items(): + fid = src_data.get("id", 0) + source_id_map[fid] = fid + contents = _read_source_content(path, source_dirs, input_json) + src_entry: dict = {"id": fid, "path": path, "language": "Solidity"} + if contents is not None: + src_entry["contents"] = contents + sources_list.append(src_entry) + + # Walk all ASTs to collect variables, structs, enums + walker = _ASTWalker() + for src_data in solc_output.get("sources", {}).values(): + ast = src_data.get("ast") + if ast: + walker.walk(ast) + + # Compilation ID (deterministic hash of the output) + comp_hash = hashlib.sha256( + json.dumps(solc_output, sort_keys=True, default=str).encode() + ).hexdigest()[:16] + compilation_id = f"__{comp_hash}" + + compiler_version = _get_compiler_version(solc_output) + + # Contract source ranges: contract_name → ethdebug source range + contract_definitions: dict[str, dict] = {} + for src_data in solc_output.get("sources", {}).values(): + ast = src_data.get("ast") + if not ast: + continue + for node in ast.get("nodes", []): + if node.get("nodeType") == "ContractDefinition": + cname = node.get("name") + src = _src_to_range(node.get("src")) + if src: + sid = source_id_map.get(src["file_id"], src["file_id"]) + contract_definitions[cname] = { + "source": {"id": sid}, + "range": {"offset": src["offset"], "length": src["length"]}, + } + + programs: list[dict] = [] + + for filename, contracts in solc_output.get("contracts", {}).items(): + for contract_name, contract_data in contracts.items(): + storage_map: Optional[dict] = None + sl = contract_data.get("storageLayout") + if sl: + storage_map = build_storage_map(sl) + + state_vars = walker.contract_state_vars.get(contract_name, []) + contract_def = contract_definitions.get(contract_name) + + evm = contract_data.get("evm", {}) + + # Creation bytecode + bytecode_obj = evm.get("bytecode", {}) + bytecode_hex = bytecode_obj.get("object", "") + if bytecode_hex and bytecode_hex != "0x": + prog = build_program( + contract_name=contract_name, + contract_definition=contract_def, + environment="create", + bytecode_hex=bytecode_hex, + source_map_str=bytecode_obj.get("sourceMap", ""), + state_vars=state_vars, + storage_map=storage_map, + source_id_map=source_id_map, + walker=walker, + ) + programs.append(prog) + solc_output["contracts"][filename][contract_name]["evm"]["bytecode"][ + "ethdebug" + ] = prog + + # Deployed bytecode + deployed_obj = evm.get("deployedBytecode", {}) + deployed_hex = deployed_obj.get("object", "") + if deployed_hex and deployed_hex != "0x": + deployed_prog = build_program( + contract_name=contract_name, + contract_definition=contract_def, + environment="call", + bytecode_hex=deployed_hex, + source_map_str=deployed_obj.get("sourceMap", ""), + state_vars=state_vars, + storage_map=storage_map, + source_id_map=source_id_map, + walker=walker, + ) + programs.append(deployed_prog) + solc_output["contracts"][filename][contract_name]["evm"]["deployedBytecode"][ + "ethdebug" + ] = deployed_prog + + # Top-level ethdebug Info object + solc_output["ethdebug"] = { + "compilation": { + "id": compilation_id, + "compiler": {"name": "solc", "version": compiler_version}, + "sources": sources_list, + }, + "programs": programs, + } + + return solc_output diff --git a/src/annotator/ast_walker.py b/src/annotator/ast_walker.py new file mode 100644 index 0000000..e80328a --- /dev/null +++ b/src/annotator/ast_walker.py @@ -0,0 +1,190 @@ +""" +ast_walker.py + +Solidity AST traversal: collects variables, structs, and enums. +""" + +from __future__ import annotations + +from typing import Optional + +from .types import ast_type_to_ethdebug + + +def _src_to_range(src: Optional[str]) -> Optional[dict]: + """Parse "offset:length:fileId" src string.""" + if not src: + return None + parts = src.split(":") + if len(parts) < 3: + return None + try: + return { + "offset": int(parts[0]), + "length": int(parts[1]), + "file_id": int(parts[2]), + } + except (ValueError, IndexError): + return None + + +def _collect_var_node( + node: dict, + kind: str, + contract: Optional[str], + function: Optional[str], + param_index: Optional[int], +) -> Optional[dict]: + if not isinstance(node, dict): + return None + name = node.get("name") or "" + src = _src_to_range(node.get("src")) + type_node = node.get("typeName") + type_desc = node.get("typeDescriptions", {}) + type_str = type_desc.get("typeString") + ethdebug_type = ast_type_to_ethdebug(type_node, type_str) + return { + "name": name, + "kind": kind, + "contract": contract, + "function": function, + "param_index": param_index, + "src": src, + "type": ethdebug_type, + "type_str": type_str, # preserved for struct/enum resolution + "node_id": node.get("id"), + } + + +class _ASTWalker: + """Walks a Solidity AST and collects all variable declarations.""" + + def __init__(self) -> None: + self.variables: list[dict] = [] + # Maps function src range → list of variable dicts for that function + self.function_vars: dict[tuple, list[dict]] = {} + # State vars per contract: contract_name → list + self.contract_state_vars: dict[str, list[dict]] = {} + # Enum values per qualified name + self.enum_values: dict[str, list[str]] = {} + # Struct members per qualified name + self.struct_members: dict[str, list[dict]] = {} + + def walk(self, node: dict, contract: Optional[str] = None, function: Optional[str] = None) -> None: + if not isinstance(node, dict): + return + + nt = node.get("nodeType") + + if nt == "SourceUnit": + for child in node.get("nodes", []): + self.walk(child) + return + + if nt == "ContractDefinition": + cname = node.get("name") + self.contract_state_vars.setdefault(cname, []) + for child in node.get("nodes", []): + self.walk(child, contract=cname) + return + + if nt == "StructDefinition": + qname = f"{contract}.{node.get('name')}" if contract else node.get("name", "") + members = [] + for m in node.get("members", []): + members.append( + { + "name": m.get("name"), + "type": ast_type_to_ethdebug( + m.get("typeName"), + m.get("typeDescriptions", {}).get("typeString"), + ), + } + ) + self.struct_members[qname] = members + return + + if nt == "EnumDefinition": + qname = f"{contract}.{node.get('name')}" if contract else node.get("name", "") + vals = [m.get("name") for m in node.get("members", [])] + self.enum_values[qname] = vals + return + + if nt == "VariableDeclaration" and node.get("stateVariable"): + mutability = node.get("mutability", "mutable") + kind = ( + "constant" + if mutability == "constant" + else ("immutable" if mutability == "immutable" else "state_variable") + ) + v = _collect_var_node(node, kind, contract, None, None) + if v: + self.variables.append(v) + if contract: + self.contract_state_vars.setdefault(contract, []).append(v) + return + + if nt == "FunctionDefinition": + fname = node.get("name") or node.get("kind", "") + func_src = _src_to_range(node.get("src")) + func_vars: list[dict] = [] + + # Parameters + for idx, p in enumerate(node.get("parameters", {}).get("parameters", [])): + v = _collect_var_node(p, "parameter", contract, fname, idx) + if v: + self.variables.append(v) + func_vars.append(v) + + # Return parameters + for idx, p in enumerate(node.get("returnParameters", {}).get("parameters", [])): + v = _collect_var_node(p, "return_parameter", contract, fname, idx) + if v: + self.variables.append(v) + func_vars.append(v) + + # Body (local variables) + body = node.get("body") + if body: + self._collect_locals(body, contract, fname, func_vars) + + if func_src: + key = (func_src["file_id"], func_src["offset"], func_src["length"]) + self.function_vars[key] = func_vars + + return + + # Generic recursion + for value in node.values(): + if isinstance(value, dict): + self.walk(value, contract, function) + elif isinstance(value, list): + for item in value: + if isinstance(item, dict): + self.walk(item, contract, function) + + def _collect_locals( + self, node: dict, contract: Optional[str], function: Optional[str], out: list[dict] + ) -> None: + if not isinstance(node, dict): + return + nt = node.get("nodeType") + if nt == "VariableDeclarationStatement": + for decl in node.get("declarations", []): + if decl: + v = _collect_var_node(decl, "local_variable", contract, function, None) + if v: + self.variables.append(v) + out.append(v) + # Also recurse into the value expression + init = node.get("initialValue") + if init: + self._collect_locals(init, contract, function, out) + return + for value in node.values(): + if isinstance(value, dict): + self._collect_locals(value, contract, function, out) + elif isinstance(value, list): + for item in value: + if isinstance(item, dict): + self._collect_locals(item, contract, function, out) diff --git a/src/annotator/bytecode.py b/src/annotator/bytecode.py new file mode 100644 index 0000000..bd6c60f --- /dev/null +++ b/src/annotator/bytecode.py @@ -0,0 +1,90 @@ +""" +bytecode.py + +EVM bytecode and solc source-map decoders. +""" + +from __future__ import annotations + +import re + +from .opcodes import _OPCODE_NAMES, _PUSH_SIZES + + +def decode_bytecode(hex_bytes: str) -> list[dict]: + """Decode hex bytecode into a list of {offset, mnemonic, arguments} dicts.""" + if hex_bytes.startswith("0x"): + hex_bytes = hex_bytes[2:] + # Bytecode may contain linker placeholders like __$...$__ (34 hex chars). + # Replace them with zeros so the decoder can proceed. + hex_bytes = re.sub(r"__\$[0-9a-fA-F]{34}\$__", "00" * 20, hex_bytes) + # Strip trailing metadata hash markers, etc. (non-hex chars) + hex_bytes = re.sub(r"[^0-9a-fA-F]", "0", hex_bytes) + if len(hex_bytes) % 2: + hex_bytes = hex_bytes[:-1] # truncate odd nibble + if not hex_bytes: + return [] + + data = bytes.fromhex(hex_bytes) + instructions: list[dict] = [] + i = 0 + while i < len(data): + opcode = data[i] + offset = i + push_size = _PUSH_SIZES.get(opcode) + if push_size is not None: + arg_bytes = data[i + 1 : i + push_size] + arg_hex = "0x" + arg_bytes.hex() if arg_bytes else "0x00" + instructions.append( + { + "offset": offset, + "mnemonic": f"PUSH{push_size - 1}", + "arguments": [arg_hex], + } + ) + i += push_size + else: + instructions.append( + { + "offset": offset, + "mnemonic": _OPCODE_NAMES.get(opcode, f"0x{opcode:02x}"), + "arguments": [], + } + ) + i += 1 + return instructions + + +def decode_source_map(source_map: str) -> list[dict]: + """ + Decode a compressed solc source map into one dict per instruction. + + Each entry: {"s": int, "l": int, "f": int, "j": str, "m": int} + Missing fields inherit from the previous entry. + """ + entries: list[dict] = [] + prev: dict = {"s": -1, "l": -1, "f": -1, "j": "-", "m": 0} + + for part in source_map.split(";"): + fields = part.split(":") + entry = dict(prev) + + def _field(idx: int, cast=int) -> None: + nonlocal fields, entry + if idx < len(fields) and fields[idx]: + key = ("s", "l", "f", "j", "m")[idx] + try: + entry[key] = cast(fields[idx]) + except (ValueError, TypeError): + pass + + _field(0) + _field(1) + _field(2) + _field(3, str) + _field(4) + + entries.append(dict(entry)) + prev = entry + + return entries diff --git a/src/annotator/opcodes.py b/src/annotator/opcodes.py new file mode 100644 index 0000000..4523659 --- /dev/null +++ b/src/annotator/opcodes.py @@ -0,0 +1,97 @@ +""" +opcodes.py + +EVM opcode table and push-size lookup. +""" + +from __future__ import annotations + +_OPCODE_NAMES: dict[int, str] = { + 0x00: "STOP", + 0x01: "ADD", + 0x02: "MUL", + 0x03: "SUB", + 0x04: "DIV", + 0x05: "SDIV", + 0x06: "MOD", + 0x07: "SMOD", + 0x08: "ADDMOD", + 0x09: "MULMOD", + 0x0A: "EXP", + 0x0B: "SIGNEXTEND", + 0x10: "LT", + 0x11: "GT", + 0x12: "SLT", + 0x13: "SGT", + 0x14: "EQ", + 0x15: "ISZERO", + 0x16: "AND", + 0x17: "OR", + 0x18: "XOR", + 0x19: "NOT", + 0x1A: "BYTE", + 0x1B: "SHL", + 0x1C: "SHR", + 0x1D: "SAR", + 0x20: "KECCAK256", + 0x30: "ADDRESS", + 0x31: "BALANCE", + 0x32: "ORIGIN", + 0x33: "CALLER", + 0x34: "CALLVALUE", + 0x35: "CALLDATALOAD", + 0x36: "CALLDATASIZE", + 0x37: "CALLDATACOPY", + 0x38: "CODESIZE", + 0x39: "CODECOPY", + 0x3A: "GASPRICE", + 0x3B: "EXTCODESIZE", + 0x3C: "EXTCODECOPY", + 0x3D: "RETURNDATASIZE", + 0x3E: "RETURNDATACOPY", + 0x3F: "EXTCODEHASH", + 0x40: "BLOCKHASH", + 0x41: "COINBASE", + 0x42: "TIMESTAMP", + 0x43: "NUMBER", + 0x44: "PREVRANDAO", + 0x45: "GASLIMIT", + 0x46: "CHAINID", + 0x47: "SELFBALANCE", + 0x48: "BASEFEE", + 0x49: "BLOBHASH", + 0x4A: "BLOBBASEFEE", + 0x50: "POP", + 0x51: "MLOAD", + 0x52: "MSTORE", + 0x53: "MSTORE8", + 0x54: "SLOAD", + 0x55: "SSTORE", + 0x56: "JUMP", + 0x57: "JUMPI", + 0x58: "PC", + 0x59: "MSIZE", + 0x5A: "GAS", + 0x5B: "JUMPDEST", + 0x5C: "TLOAD", + 0x5D: "TSTORE", + 0x5E: "MCOPY", + 0x5F: "PUSH0", + **{0x60 + i: f"PUSH{i + 1}" for i in range(32)}, + **{0x80 + i: f"DUP{i + 1}" for i in range(16)}, + **{0x90 + i: f"SWAP{i + 1}" for i in range(16)}, + **{0xA0 + i: f"LOG{i}" for i in range(5)}, + 0xF0: "CREATE", + 0xF1: "CALL", + 0xF2: "CALLCODE", + 0xF3: "RETURN", + 0xF4: "DELEGATECALL", + 0xF5: "CREATE2", + 0xFA: "STATICCALL", + 0xFD: "REVERT", + 0xFE: "INVALID", + 0xFF: "SELFDESTRUCT", +} + +# PUSH1..PUSH32: opcode byte + N immediate bytes +_PUSH_SIZES: dict[int, int] = {0x60 + i: 2 + i for i in range(32)} diff --git a/src/annotator/program.py b/src/annotator/program.py new file mode 100644 index 0000000..4b174fc --- /dev/null +++ b/src/annotator/program.py @@ -0,0 +1,137 @@ +""" +program.py + +Builds ethdebug Program objects from bytecode and source maps. +""" + +from __future__ import annotations + +import json +from typing import Optional + +from .ast_walker import _ASTWalker +from .bytecode import decode_bytecode, decode_source_map +from .variables import build_variable_entry + + +def _range_contains(outer: dict, inner: dict) -> bool: + """True if outer source range fully contains inner.""" + if outer.get("file_id") != inner.get("file_id"): + return False + o_start = outer["offset"] + o_end = o_start + outer["length"] + i_start = inner["offset"] + i_end = i_start + inner["length"] + return o_start <= i_start and i_end <= o_end + + +def _find_function_vars_for_instr( + instr_src: dict, + walker: _ASTWalker, +) -> list[dict]: + """Return all function-level variables whose function body contains instr_src.""" + result = [] + for (fid, foffset, flength), fvars in walker.function_vars.items(): + func_range = {"file_id": fid, "offset": foffset, "length": flength} + if _range_contains(func_range, instr_src): + result.extend(fvars) + break + return result + + +def _get_compiler_version(solc_output: dict) -> str: + for contracts in solc_output.get("contracts", {}).values(): + for contract_data in contracts.values(): + metadata_str = contract_data.get("metadata") + if metadata_str: + try: + meta = json.loads(metadata_str) + v = meta.get("compiler", {}).get("version") + if v: + return v + except json.JSONDecodeError: + pass + return solc_output.get("version", "unknown") + + +def build_program( + contract_name: str, + contract_definition: Optional[dict], + environment: str, + bytecode_hex: str, + source_map_str: str, + state_vars: list[dict], + storage_map: Optional[dict[str, dict]], + source_id_map: dict[int, int], + walker: _ASTWalker, +) -> dict: + """Build one ethdebug Program object.""" + + raw_instrs = decode_bytecode(bytecode_hex) + smap = decode_source_map(source_map_str) if source_map_str else [] + + # --- Initial context: state variables always in scope --- + initial_vars = [] + for v in state_vars: + entry = build_variable_entry(v, storage_map, source_id_map, walker) + if entry: + initial_vars.append(entry) + + # --- Build instructions --- + instructions = [] + for i, raw in enumerate(raw_instrs): + instr: dict = {"offset": raw["offset"]} + + op: dict = {"mnemonic": raw["mnemonic"]} + if raw["arguments"]: + op["arguments"] = raw["arguments"] + instr["operation"] = op + + ctx: dict = {} + + # Source range from source map + if i < len(smap): + sm = smap[i] + s, l, f = sm.get("s", -1), sm.get("l", -1), sm.get("f", -1) + if s >= 0 and l >= 0 and f >= 0: + sid = source_id_map.get(f, f) + ctx["code"] = { + "source": {"id": sid}, + "range": {"offset": s, "length": l}, + } + + # Add function-level variables in scope at this instruction + instr_src = {"file_id": f, "offset": s, "length": l} + func_vars_here = _find_function_vars_for_instr(instr_src, walker) + if func_vars_here: + var_entries = [] + for fv in func_vars_here: + entry = build_variable_entry(fv, None, source_id_map, walker) + if entry: + var_entries.append(entry) + if var_entries: + ctx["variables"] = var_entries + + if ctx: + instr["context"] = ctx + + instructions.append(instr) + + # --- Assemble program --- + program: dict = { + "environment": environment, + "instructions": instructions, + } + + if contract_definition: + program["contract"] = { + "name": contract_name, + "definition": contract_definition, + } + else: + program["contract"] = {"name": contract_name, "definition": {}} + + if initial_vars: + program["context"] = {"variables": initial_vars} + + return program diff --git a/src/annotator/types.py b/src/annotator/types.py new file mode 100644 index 0000000..1530362 --- /dev/null +++ b/src/annotator/types.py @@ -0,0 +1,191 @@ +""" +types.py + +Solidity AST type nodes → ethdebug type dicts. +""" + +from __future__ import annotations + +import re +from typing import Optional + + +def ast_type_to_ethdebug(type_node: Optional[dict], type_str: Optional[str] = None) -> Optional[dict]: + """Convert a Solidity AST TypeName node to an ethdebug type dict.""" + if type_node is None: + return _type_str_to_ethdebug(type_str) if type_str else None + + node_kind = type_node.get("nodeType") + + if node_kind == "ElementaryTypeName": + return _elementary(type_node.get("name", "")) + + if node_kind == "ArrayTypeName": + base = ast_type_to_ethdebug(type_node.get("baseType")) + result: dict = { + "class": "complex", + "kind": "array", + "contains": {"type": base} if base else {"type": {"kind": "unknown"}}, + } + length_node = type_node.get("length") + if length_node and length_node.get("nodeType") == "Literal": + result["count"] = int(length_node.get("value", 0)) + return result + + if node_kind == "Mapping": + key = ast_type_to_ethdebug(type_node.get("keyType")) + val = ast_type_to_ethdebug(type_node.get("valueType")) + return { + "class": "complex", + "kind": "mapping", + "contains": { + "key": {"type": key} if key else {"type": {"kind": "unknown"}}, + "value": {"type": val} if val else {"type": {"kind": "unknown"}}, + }, + } + + if node_kind == "UserDefinedTypeName": + fallback = type_str or type_node.get("typeDescriptions", {}).get("typeString", "") + return _type_str_to_ethdebug(fallback, type_node) + + if node_kind == "FunctionTypeName": + vis = type_node.get("visibility", "internal") + return {"class": "complex", "kind": "function", "internal": vis != "external"} + + # Fallback: try typeDescriptions.typeString + ts = type_str or type_node.get("typeDescriptions", {}).get("typeString") + return _type_str_to_ethdebug(ts) + + +def _elementary(name: str) -> dict: + name = name.strip() + + if name == "bool": + return {"kind": "bool"} + if name in ("address", "address payable"): + return {"kind": "address"} + if name == "string": + return {"kind": "string"} + if name == "bytes": + return {"kind": "bytes"} + + m = re.fullmatch(r"(u?int)(\d*)", name) + if m: + bits = int(m.group(2)) if m.group(2) else 256 + return {"kind": m.group(1), "bits": bits} + + m = re.fullmatch(r"bytes(\d+)", name) + if m: + return {"kind": "bytes", "bytes": int(m.group(1))} + + m = re.fullmatch(r"(u?fixed)(\d+x\d+)?", name) + if m: + kind = m.group(1) + dims = m.group(2) + if dims: + m2 = re.fullmatch(r"(\d+)x(\d+)", dims) + if m2: + return {"kind": kind, "bits": int(m2.group(1)), "places": int(m2.group(2))} + return {"kind": kind, "bits": 128, "places": 18} + + return {"kind": name} + + +def _type_str_to_ethdebug(ts: Optional[str], node: Optional[dict] = None) -> Optional[dict]: + if not ts: + return None + + ts = ts.strip() + + # Strip storage/memory/calldata location suffixes + for loc in ( + " storage ref", + " memory ref", + " calldata ref", + " storage pointer", + " memory", + " calldata", + " storage", + ): + if ts.endswith(loc): + ts = ts[: -len(loc)] + + # Strip "type(...)" wrapper + if ts.startswith("type(") and ts.endswith(")"): + ts = ts[5:-1] + + if ts in ("bool", "address", "address payable", "string", "bytes"): + return _elementary(ts) + + if re.fullmatch(r"u?int\d*", ts): + return _elementary(ts) + + if re.fullmatch(r"bytes\d+", ts): + return _elementary(ts) + + if re.fullmatch(r"u?fixed(\d+x\d+)?", ts): + return _elementary(ts) + + # Array: T[] or T[N] + m = re.fullmatch(r"(.+)\[(\d*)\]", ts) + if m: + base = _type_str_to_ethdebug(m.group(1).strip()) + result: dict = { + "class": "complex", + "kind": "array", + "contains": {"type": base} if base else {"type": {"kind": "unknown"}}, + } + if m.group(2): + result["count"] = int(m.group(2)) + return result + + # Mapping + # We need to handle nested mappings: split at '=>' but only at the top level. + # A simple approach: strip outer "mapping(" ... ")" + if ts.startswith("mapping(") and ts.endswith(")"): + inner = ts[8:-1] + depth = 0 + split_at = -1 + for idx, ch in enumerate(inner): + if ch == "(": + depth += 1 + elif ch == ")": + depth -= 1 + elif ch == "=" and depth == 0 and idx + 1 < len(inner) and inner[idx + 1] == ">": + split_at = idx + break + if split_at >= 0: + key_str = inner[:split_at].strip() + val_str = inner[split_at + 2 :].strip() + key = _type_str_to_ethdebug(key_str) + val = _type_str_to_ethdebug(val_str) + return { + "class": "complex", + "kind": "mapping", + "contains": { + "key": {"type": key} if key else {"type": {"kind": "unknown"}}, + "value": {"type": val} if val else {"type": {"kind": "unknown"}}, + }, + } + + # Struct + if ts.startswith("struct "): + return {"class": "complex", "kind": "struct", "contains": []} + + # Enum + if ts.startswith("enum "): + return {"kind": "enum", "values": []} + + # Contract / interface + if ts.startswith("contract ") or ts.startswith("interface "): + return {"kind": "contract"} + + # Function + if ts.startswith("function"): + return {"class": "complex", "kind": "function", "internal": True} + + # Tuple + if ts.startswith("(") and ts.endswith(")"): + return {"class": "complex", "kind": "tuple", "contains": []} + + return None diff --git a/src/annotator/variables.py b/src/annotator/variables.py new file mode 100644 index 0000000..da813f1 --- /dev/null +++ b/src/annotator/variables.py @@ -0,0 +1,115 @@ +""" +variables.py + +Storage layout parsing and ethdebug variable entry construction. +""" + +from __future__ import annotations + +from typing import Optional + +from .ast_walker import _ASTWalker + + +def build_storage_map(storage_layout: dict) -> dict[str, dict]: + """Return {label: {slot, offset}} from solc storageLayout output.""" + result: dict[str, dict] = {} + for entry in storage_layout.get("storage", []): + label = entry.get("label") + if label: + result[label] = { + "slot": int(entry.get("slot", "0")), + "offset": int(entry.get("offset", 0)), + } + return result + + +def _make_source_range(src: Optional[dict], source_id_map: dict[int, int]) -> Optional[dict]: + if not src: + return None + file_id = src.get("file_id", -1) + source_id = source_id_map.get(file_id, file_id) + return { + "source": {"id": source_id}, + "range": {"offset": src["offset"], "length": src["length"]}, + } + + +def build_variable_entry( + var: dict, + storage_map: Optional[dict[str, dict]], + source_id_map: dict[int, int], + walker: Optional[_ASTWalker] = None, +) -> Optional[dict]: + """Build a single ethdebug variable entry dict.""" + entry: dict = {} + + name = var.get("name") or "" + if name: + entry["identifier"] = name + + decl = _make_source_range(var.get("src"), source_id_map) + if decl: + entry["declaration"] = decl + + typ = var.get("type") + + # Resolve struct members and enum values from walker when available + if walker and typ: + raw_type_str = var.get("type_str", "") + if typ.get("kind") == "struct" and not typ.get("contains"): + # raw_type_str is like "struct ContractName.StructName" or "struct StructName" + struct_name = raw_type_str.replace("struct ", "").strip() if raw_type_str else "" + # Try qualified name first, then simple name + members = walker.struct_members.get(struct_name, []) + if not members and "." in struct_name: + members = walker.struct_members.get(struct_name.split(".")[-1], []) + if not members: + # Last resort: search all keys + for key in walker.struct_members: + if key == struct_name or key.endswith("." + struct_name): + members = walker.struct_members[key] + break + if members: + typ = dict(typ) + typ["contains"] = [ + {"name": m["name"], "type": m["type"]} for m in members if m.get("type") + ] + elif typ.get("kind") == "enum" and not typ.get("values"): + enum_name = raw_type_str.replace("enum ", "").strip() if raw_type_str else "" + vals = walker.enum_values.get(enum_name, []) + if not vals and "." in enum_name: + vals = walker.enum_values.get(enum_name.split(".")[-1], []) + if not vals: + for key in walker.enum_values: + if key == enum_name or key.endswith("." + enum_name): + vals = walker.enum_values[key] + break + if vals: + typ = dict(typ) + typ["values"] = vals + + if typ: + entry["type"] = typ + + kind = var.get("kind") + + # Storage pointer for state variables + if kind in ("state_variable",) and storage_map and name: + pos = storage_map.get(name) + if pos: + entry["pointer"] = { + "location": "storage", + "slot": pos["slot"], + "offset": pos["offset"], + } + + # Constants are inlined in bytecode — no storage pointer, but we note their kind + # via the type (no change needed; the ethdebug format doesn't have a "constant" flag). + + # Immutables are written to deployed bytecode at fixed offsets. + # Without immutableReferences analysis we can't provide a precise pointer. + # The entry is still useful for its type and declaration info. + + # Must have at least one property + return entry if entry else None diff --git a/src/tests/conftest.py b/src/tests/conftest.py new file mode 100644 index 0000000..58d5a02 --- /dev/null +++ b/src/tests/conftest.py @@ -0,0 +1,256 @@ +""" +conftest.py + +Pytest fixtures shared across the test suite. + +The ``compile_solidity`` fixture compiles a Solidity snippet and returns an +:class:`ethdebug_dsl.AnnotatedResult` ready for DSL-based assertions. + +Pass ``--generate-docs`` on the pytest command line to emit one Markdown file +per test under ``docs/examples/``. A file is only generated when the total +instruction count across all programs is at most ``--docs-max-instructions`` +(default 200), keeping generated examples small and readable. +""" + +from __future__ import annotations + +import copy +import json +import os +import shutil +import subprocess +import textwrap +from pathlib import Path +from typing import Optional, cast + +import pytest + +# --------------------------------------------------------------------------- +# pytest CLI options +# --------------------------------------------------------------------------- + +def pytest_addoption(parser: pytest.Parser) -> None: + parser.addoption( + "--generate-docs", + action="store_true", + default=False, + help="Generate Markdown examples under docs/examples/ for qualifying tests.", + ) + parser.addoption( + "--docs-max-instructions", + type=int, + default=200, + metavar="N", + help="Maximum total instruction count across all programs for a test to " + "have docs generated (default: 200).", + ) + + +# --------------------------------------------------------------------------- +# Locate the solc binary +# --------------------------------------------------------------------------- + +def _find_solc() -> Optional[str]: + """Return the path to the best available solc binary, or None.""" + # 1. solc-select managed installs (newest version first) + artifacts_dir = Path.home() / ".solc-select" / "artifacts" + if artifacts_dir.exists(): + candidates = sorted( + (p for p in artifacts_dir.glob("solc-*/solc-*") if p.is_file()), + key=lambda p: p.name, + reverse=True, + ) + for c in candidates: + if os.access(c, os.X_OK): + return str(c) + + # 2. PATH + return shutil.which("solc") + + +_SOLC_PATH: Optional[str] = _find_solc() + + +# --------------------------------------------------------------------------- +# Helper: add standard Solidity boilerplate if missing +# --------------------------------------------------------------------------- + +_DEFAULT_PRAGMA = "pragma solidity ^0.8.28;" + +def _prepare_source(source: str, pragma: Optional[str] = None) -> str: + source = textwrap.dedent(source).strip() + if "pragma solidity" not in source: + header = f"// SPDX-License-Identifier: MIT\n{pragma or _DEFAULT_PRAGMA}" + source = f"{header}\n\n{source}" + return source + + +# --------------------------------------------------------------------------- +# Helper: build solc standard-JSON input +# --------------------------------------------------------------------------- + +def _build_input_json(source: str, filename: str = "test.sol") -> dict: + return { + "language": "Solidity", + "sources": {filename: {"content": source}}, + "settings": { + "optimizer": {"enabled": False}, + "outputSelection": { + "*": { + "": ["ast"], + "*": [ + "metadata", + "storageLayout", + "evm.bytecode.object", + "evm.bytecode.sourceMap", + "evm.deployedBytecode.object", + "evm.deployedBytecode.sourceMap", + ], + } + }, + }, + } + + +# --------------------------------------------------------------------------- +# Helper: invoke solc and annotate +# --------------------------------------------------------------------------- + +def _compile_and_annotate( + source: str, + pragma: Optional[str] = None, + filename: str = "test.sol", +) -> "ethdebug_dsl.AnnotatedResult": + """Compile *source* with solc and annotate with ethdebug data.""" + from tests.ethdebug_dsl import AnnotatedResult + from annotator import annotate, check_optimizer_disabled + + prepared = _prepare_source(source, pragma) + input_json = _build_input_json(prepared, filename) + + result = subprocess.run( + [_SOLC_PATH, "--standard-json"], + input=json.dumps(input_json).encode(), + capture_output=True, + timeout=30, + ) + + output = json.loads(result.stdout) + + errors = [e for e in output.get("errors", []) if e.get("severity") == "error"] + if errors: + messages = "\n".join(e.get("formattedMessage", e.get("message", "")) for e in errors) + raise RuntimeError(f"solc compilation failed:\n{messages}") + + check_optimizer_disabled(output) + annotated = annotate(output, input_json=input_json) + + return AnnotatedResult(annotated) + + +# --------------------------------------------------------------------------- +# Doc generation helpers +# --------------------------------------------------------------------------- + +_REPO_ROOT = Path(__file__).parent.parent.parent + + +def _total_instruction_count(annotated: dict) -> int: + return sum( + len(prog.get("instructions", [])) + for prog in annotated.get("ethdebug", {}).get("programs", []) + ) + + +def _ethdebug_for_docs(annotated: dict) -> dict: + """Return a copy of the ethdebug section with instructions stripped out. + + The full instruction list can be hundreds of entries; for documentation + purposes we keep the program metadata and initial context but omit the + per-instruction detail. + """ + ethdebug = copy.deepcopy(annotated.get("ethdebug", {})) + for prog in ethdebug.get("programs", []): + count = len(prog.pop("instructions", [])) + prog["instructions_count"] = count # leave a breadcrumb + return ethdebug + + +def _write_doc( + test_name: str, + test_doc: Optional[str], + source: str, + annotated: dict, + docs_dir: Path, +) -> None: + """Write a Markdown file for *test_name* into *docs_dir*.""" + docs_dir.mkdir(parents=True, exist_ok=True) + + ethdebug = _ethdebug_for_docs(annotated) + ethdebug_json = json.dumps(ethdebug, indent=2) + + lines: list[str] = [] + lines.append(f"# `{test_name}`\n") + + if test_doc: + lines.append(textwrap.dedent(test_doc).strip()) + lines.append("\n") + + lines.append("## Solidity source\n") + lines.append("```solidity") + lines.append(source) + lines.append("```\n") + + lines.append("## ethdebug output\n") + lines.append("```json") + lines.append(ethdebug_json) + lines.append("```\n") + + out_path = docs_dir / f"{test_name}.md" + out_path.write_text("\n".join(lines), encoding="utf-8") + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def compile_solidity(request: pytest.FixtureRequest): + """Function-scoped factory fixture. + + Returns a callable ``compile(source, *, pragma=None, filename="test.sol")`` + that compiles the given Solidity source and returns an + :class:`ethdebug_dsl.AnnotatedResult`. + + When ``--generate-docs`` is passed on the command line, the first call to + the returned factory also writes a Markdown example to ``docs/examples/`` + (provided the total instruction count stays within the configured limit). + + Skips the test if no solc binary can be found. + """ + if _SOLC_PATH is None: + pytest.skip("solc binary not found; install via solc-select or PATH") + + generate_docs = bool(cast(bool, request.config.getoption("--generate-docs"))) + max_instructions = int(cast(int, request.config.getoption("--docs-max-instructions"))) + test_name: str = request.node.name + test_doc: Optional[str] = request.node.function.__doc__ + + def _compile( + source: str, + *, + pragma: Optional[str] = None, + filename: str = "test.sol", + ) -> "ethdebug_dsl.AnnotatedResult": + result = _compile_and_annotate(source, pragma=pragma, filename=filename) + + if generate_docs: + total = _total_instruction_count(result.raw) + if total <= max_instructions: + prepared = _prepare_source(source, pragma) + docs_dir = _REPO_ROOT / "docs" / "examples" + _write_doc(test_name, test_doc, prepared, result.raw, docs_dir) + + return result + + return _compile diff --git a/src/tests/ethdebug_dsl.py b/src/tests/ethdebug_dsl.py new file mode 100644 index 0000000..07a642a --- /dev/null +++ b/src/tests/ethdebug_dsl.py @@ -0,0 +1,530 @@ +""" +ethdebug_dsl.py + +Domain-specific embedded language for asserting on ethdebug data. + +Example usage: + + result = compile_solidity(\"\"\" + contract C { + uint256 public x; + address public constant ADMIN = address(0); + function set(uint256 v) external { x = v; } + } + \"\"\") + + deployed = result.contract("C").deployed() + deployed.state_var("x").has_type(uint(256)).at_storage_slot(0) + deployed.state_var("ADMIN").has_type(address_t).is_constant() + + fn = deployed.in_function("set") + fn.param("v").has_type(uint(256)) +""" + +from __future__ import annotations + +from typing import Any, Optional + + +# --------------------------------------------------------------------------- +# Type specification objects +# --------------------------------------------------------------------------- + +class TypeSpec: + """Represents an expected ethdebug type for use in assertions. + + Matching is done as a *subset* check: every key in the TypeSpec dict + must be present and equal in the actual type, but the actual type may + have additional keys. This allows ergonomic partial matching. + """ + + def __init__(self, spec: dict) -> None: + self._spec = spec + + def _as_dict(self) -> dict: + return self._spec + + def __repr__(self) -> str: + return f"TypeSpec({self._spec})" + + def __eq__(self, other: object) -> bool: + if isinstance(other, TypeSpec): + return self._spec == other._spec + return NotImplemented + + +# --- Elementary type singletons and factories --- + +def uint(bits: int) -> TypeSpec: + return TypeSpec({"kind": "uint", "bits": bits}) + +def int_(bits: int) -> TypeSpec: + return TypeSpec({"kind": "int", "bits": bits}) + +def fixed_bytes(n: int) -> TypeSpec: + """bytesN (fixed-size byte arrays, e.g. bytes32).""" + return TypeSpec({"kind": "bytes", "bytes": n}) + +def ufixed(bits: int, places: int) -> TypeSpec: + return TypeSpec({"kind": "ufixed", "bits": bits, "places": places}) + +def fixed(bits: int, places: int) -> TypeSpec: + return TypeSpec({"kind": "fixed", "bits": bits, "places": places}) + +# Singletons for the unparameterised elementary types +bool_t = TypeSpec({"kind": "bool"}) +address_t = TypeSpec({"kind": "address"}) +string_t = TypeSpec({"kind": "string"}) +bytes_t = TypeSpec({"kind": "bytes"}) +contract_t = TypeSpec({"kind": "contract"}) + +# Convenient aliases for the most common uint widths +uint256 = uint(256) +uint128 = uint(128) +uint64 = uint(64) +uint32 = uint(32) +uint16 = uint(16) +uint8 = uint(8) + +int256 = int_(256) +int128 = int_(128) +int64 = int_(64) +int32 = int_(32) +int16 = int_(16) +int8 = int_(8) + +bytes32 = fixed_bytes(32) +bytes16 = fixed_bytes(16) +bytes4 = fixed_bytes(4) +bytes1 = fixed_bytes(1) + + +# --- Complex type factories --- + +def mapping(key: TypeSpec, value: TypeSpec) -> TypeSpec: + return TypeSpec({ + "class": "complex", + "kind": "mapping", + "contains": { + "key": {"type": key._as_dict()}, + "value": {"type": value._as_dict()}, + }, + }) + +def array(element: TypeSpec, count: Optional[int] = None) -> TypeSpec: + spec: dict = { + "class": "complex", + "kind": "array", + "contains": {"type": element._as_dict()}, + } + if count is not None: + spec["count"] = count + return TypeSpec(spec) + +def struct(name: Optional[str] = None, members: Optional[dict[str, TypeSpec]] = None) -> TypeSpec: + """Match a struct type. ``name`` is ignored (ethdebug doesn't store it in kind). + ``members`` is a {field_name: TypeSpec} dict for deeper checking.""" + spec: dict = {"class": "complex", "kind": "struct"} + if members is not None: + spec["contains"] = [ + {"name": k, "type": v._as_dict()} for k, v in members.items() + ] + return TypeSpec(spec) + +def enum_t(values: Optional[list[str]] = None) -> TypeSpec: + spec: dict = {"kind": "enum"} + if values is not None: + spec["values"] = values + return TypeSpec(spec) + +def function_t(internal: bool = True) -> TypeSpec: + return TypeSpec({"class": "complex", "kind": "function", "internal": internal}) + +def tuple_t() -> TypeSpec: + return TypeSpec({"class": "complex", "kind": "tuple"}) + + +# --------------------------------------------------------------------------- +# Type matching +# --------------------------------------------------------------------------- + +def _type_matches(actual: Optional[dict], expected: TypeSpec) -> bool: + """Partial subset match: every key in expected must appear with the same + value in actual. Nested dicts are matched recursively.""" + return _dict_subset(actual, expected._as_dict()) + + +def _dict_subset(actual: Any, expected: Any) -> bool: + if isinstance(expected, dict): + if not isinstance(actual, dict): + return False + return all( + k in actual and _dict_subset(actual[k], expected[k]) + for k in expected + ) + if isinstance(expected, list): + if not isinstance(actual, list): + return False + if len(expected) != len(actual): + return False + return all(_dict_subset(a, e) for a, e in zip(actual, expected)) + return actual == expected + + +# --------------------------------------------------------------------------- +# AST helpers (extract function source ranges from annotated output) +# --------------------------------------------------------------------------- + +def _extract_function_ranges(annotated: dict) -> dict[str, dict[str, tuple]]: + """Return {contract_name: {func_name: (file_id, offset, length)}}.""" + result: dict[str, dict[str, tuple]] = {} + + def _src_to_tuple(src: Optional[str]) -> Optional[tuple]: + if not src: + return None + parts = src.split(":") + if len(parts) < 3: + return None + try: + return (int(parts[2]), int(parts[0]), int(parts[1])) + except ValueError: + return None + + for _path, src_data in annotated.get("sources", {}).items(): + ast = src_data.get("ast") + if not ast: + continue + for node in ast.get("nodes", []): + if node.get("nodeType") == "ContractDefinition": + cname = node.get("name", "") + result.setdefault(cname, {}) + for child in node.get("nodes", []): + if child.get("nodeType") == "FunctionDefinition": + fname = child.get("name") or child.get("kind", "") + t = _src_to_tuple(child.get("src")) + if t: + result[cname][fname] = t + + return result + + +def _range_contains(outer: tuple, inner: tuple) -> bool: + """True if outer (file_id, offset, length) fully contains inner.""" + fid_o, off_o, len_o = outer + fid_i, off_i, len_i = inner + if fid_o != fid_i: + return False + return off_o <= off_i and (off_i + len_i) <= (off_o + len_o) + + +# --------------------------------------------------------------------------- +# VariableView – leaf of the DSL chain +# --------------------------------------------------------------------------- + +class VariableView: + """Wraps a single variable entry in the ethdebug output and provides + assertion helpers that return ``self`` for chaining.""" + + def __init__(self, identifier: str, data: dict, context_desc: str) -> None: + self._id = identifier + self._data = data + self._desc = context_desc + + # --- type assertions --- + + def has_type(self, expected: TypeSpec) -> "VariableView": + actual = self._data.get("type") + if not _type_matches(actual, expected): + raise AssertionError( + f"{self._desc}: expected type {expected}, got {actual}" + ) + return self + + # --- location / pointer assertions --- + + def at_storage_slot(self, slot: int, offset: int = 0) -> "VariableView": + ptr = self._data.get("pointer") + if ptr is None: + raise AssertionError( + f"{self._desc}: expected storage pointer at slot {slot} " + "but no pointer is present" + ) + if ptr.get("location") != "storage": + raise AssertionError( + f"{self._desc}: expected location 'storage', " + f"got {ptr.get('location')!r}" + ) + actual_slot = ptr.get("slot") + if actual_slot != slot: + raise AssertionError( + f"{self._desc}: expected storage slot {slot}, " + f"got {actual_slot}" + ) + actual_offset = ptr.get("offset", 0) + if actual_offset != offset: + raise AssertionError( + f"{self._desc}: expected storage offset {offset}, " + f"got {actual_offset}" + ) + return self + + def is_constant(self) -> "VariableView": + """Constants are inlined; they must have no storage pointer.""" + ptr = self._data.get("pointer") + if ptr is not None: + raise AssertionError( + f"{self._desc}: constant variable should have no storage pointer, " + f"but found {ptr}" + ) + return self + + def is_immutable(self) -> "VariableView": + """Immutables are written to the deployed bytecode; no storage pointer.""" + ptr = self._data.get("pointer") + if ptr is not None: + raise AssertionError( + f"{self._desc}: immutable variable should have no storage pointer, " + f"but found {ptr}" + ) + return self + + def has_declaration(self) -> "VariableView": + decl = self._data.get("declaration") + if decl is None: + raise AssertionError( + f"{self._desc}: expected a declaration source range, but none found" + ) + source = decl.get("source") + rng = decl.get("range") + if source is None or rng is None: + raise AssertionError( + f"{self._desc}: declaration is malformed: {decl}" + ) + return self + + +# --------------------------------------------------------------------------- +# FunctionScope – narrows to a particular function's instruction contexts +# --------------------------------------------------------------------------- + +class FunctionScope: + """Provides variable look-up scoped to a single function's body.""" + + def __init__( + self, + func_name: str, + contract_name: str, + program: dict, + func_ranges: dict[str, tuple], + ) -> None: + self._func_name = func_name + self._contract = contract_name + self._program = program + self._func_ranges = func_ranges + self._vars: dict[str, dict] = self._collect() + + def _collect(self) -> dict[str, dict]: + """Collect unique variables from instruction contexts that belong to + this function's source range. Falls back to all instruction vars if + the function range cannot be determined.""" + func_range = self._func_ranges.get(self._func_name) + seen: dict[str, dict] = {} + + for instr in self._program.get("instructions", []): + ctx = instr.get("context", {}) + fvars = ctx.get("variables", []) + if not fvars: + continue + + # If we have a function range, only include instructions inside it + if func_range is not None: + code = ctx.get("code", {}) + rng = code.get("range", {}) + src = code.get("source", {}) + fid = src.get("id", -1) + off = rng.get("offset", -1) + lng = rng.get("length", 0) + instr_range = (fid, off, lng) + if off < 0 or not _range_contains(func_range, instr_range): + continue + + for v in fvars: + ident = v.get("identifier") + if ident and ident not in seen: + seen[ident] = v + + return seen + + def _get(self, identifier: str) -> VariableView: + v = self._vars.get(identifier) + if v is None: + available = sorted(self._vars.keys()) + raise AssertionError( + f"Variable '{identifier}' not found in function " + f"'{self._func_name}' of {self._contract}. " + f"Available identifiers: {available}" + ) + return VariableView( + identifier, v, + f"'{identifier}' in {self._contract}.{self._func_name}" + ) + + def param(self, identifier: str) -> VariableView: + """Assert on a function parameter by name.""" + return self._get(identifier) + + def returns(self, identifier: str) -> VariableView: + """Assert on a named return parameter.""" + return self._get(identifier) + + def local(self, identifier: str) -> VariableView: + """Assert on a local variable.""" + return self._get(identifier) + + def has_variable(self, identifier: str) -> VariableView: + """Assert that any function-level variable with this name exists.""" + return self._get(identifier) + + def variable_identifiers(self) -> list[str]: + """Return all found variable identifiers (useful in debugging).""" + return sorted(self._vars.keys()) + + +# --------------------------------------------------------------------------- +# ProgramView – one bytecode program (create or call environment) +# --------------------------------------------------------------------------- + +class ProgramView: + def __init__( + self, + contract_name: str, + environment: str, + program: dict, + func_ranges: dict[str, tuple], + ) -> None: + self._contract = contract_name + self._environment = environment + self._program = program + self._func_ranges = func_ranges + + # --- state variable assertions --- + + def state_var(self, identifier: str) -> VariableView: + """Assert on a state variable present in the program's initial context.""" + vars_ = self._program.get("context", {}).get("variables", []) + for v in vars_: + if v.get("identifier") == identifier: + return VariableView( + identifier, v, + f"state var '{identifier}' in {self._contract}/{self._environment}" + ) + available = [v.get("identifier") for v in vars_] + raise AssertionError( + f"State variable '{identifier}' not found in initial context of " + f"{self._contract}/{self._environment}. " + f"Available: {available}" + ) + + def state_var_identifiers(self) -> list[str]: + vars_ = self._program.get("context", {}).get("variables", []) + return [v.get("identifier") for v in vars_] + + # --- function scope --- + + def in_function(self, name: str) -> FunctionScope: + """Return a FunctionScope that narrows assertions to ``name``'s body.""" + return FunctionScope( + func_name=name, + contract_name=self._contract, + program=self._program, + func_ranges=self._func_ranges, + ) + + # --- instruction-level helpers --- + + def instruction_count(self) -> int: + return len(self._program.get("instructions", [])) + + def instructions_with_source(self) -> list[dict]: + return [ + i for i in self._program.get("instructions", []) + if i.get("context", {}).get("code") + ] + + +# --------------------------------------------------------------------------- +# ContractView +# --------------------------------------------------------------------------- + +class ContractView: + def __init__( + self, + name: str, + annotated: dict, + func_ranges: dict[str, dict[str, tuple]], + ) -> None: + self._name = name + self._annotated = annotated + self._func_ranges = func_ranges + + def deployed(self) -> ProgramView: + """The runtime (call) bytecode program.""" + return self._get_program("call") + + def creation(self) -> ProgramView: + """The creation bytecode program.""" + return self._get_program("create") + + def _get_program(self, environment: str) -> ProgramView: + for prog in self._annotated.get("ethdebug", {}).get("programs", []): + if ( + prog.get("contract", {}).get("name") == self._name + and prog.get("environment") == environment + ): + return ProgramView( + self._name, + environment, + prog, + self._func_ranges.get(self._name, {}), + ) + available = [ + f"{p.get('contract',{}).get('name')}/{p.get('environment')}" + for p in self._annotated.get("ethdebug", {}).get("programs", []) + ] + raise AssertionError( + f"No '{environment}' program found for contract '{self._name}'. " + f"Available programs: {available}" + ) + + +# --------------------------------------------------------------------------- +# AnnotatedResult – root of the DSL +# --------------------------------------------------------------------------- + +class AnnotatedResult: + """Root object returned by the ``compile_solidity`` fixture. + + Use ``.contract(name)`` to navigate to a specific contract, then + ``.deployed()`` / ``.creation()`` for the runtime / creation bytecode, + and then use assertion helpers on the resulting :class:`ProgramView`. + """ + + def __init__(self, annotated: dict) -> None: + self._annotated = annotated + self._func_ranges = _extract_function_ranges(annotated) + + def contract(self, name: str) -> ContractView: + return ContractView(name, self._annotated, self._func_ranges) + + def contract_names(self) -> list[str]: + names: list[str] = [] + for prog in self._annotated.get("ethdebug", {}).get("programs", []): + n = prog.get("contract", {}).get("name") + if n and n not in names: + names.append(n) + return names + + @property + def raw(self) -> dict: + """Access the full annotated output dict.""" + return self._annotated diff --git a/src/tests/test_annotate_solc.py b/src/tests/test_annotate_solc.py new file mode 100644 index 0000000..3310a87 --- /dev/null +++ b/src/tests/test_annotate_solc.py @@ -0,0 +1,966 @@ +""" +test_annotate_solc.py + +End-to-end tests for the annotator. + +Each test defines a self-contained Solidity snippet, compiles it through solc +(optimizer off), runs the annotator, and then uses the DSL from ethdebug_dsl.py +to assert on the resulting ethdebug annotations. + +Structure of a typical test: + + def test_something(compile_solidity): + result = compile_solidity(\"\"\" + contract C { + ... + } + \"\"\") + + deployed = result.contract("C").deployed() + deployed.state_var("x").has_type(uint(256)).at_storage_slot(0) + + fn = deployed.in_function("foo") + fn.param("a").has_type(uint(256)) +""" + +import pytest + +from tests.ethdebug_dsl import ( + AnnotatedResult, + address_t, + array, + bool_t, + bytes_t, + bytes1, + bytes4, + bytes32, + contract_t, + enum_t, + fixed_bytes, + function_t, + int_, + int256, + mapping, + string_t, + struct, + tuple_t, + uint, + uint8, + uint16, + uint32, + uint256, +) + + +# =========================================================================== +# 1. Optimizer guard +# =========================================================================== + +def test_optimizer_enabled_raises(compile_solidity): + """The tool must reject output compiled with the optimizer enabled.""" + import json, subprocess + + # Find solc + from tests.conftest import _SOLC_PATH + if _SOLC_PATH is None: + pytest.skip("solc not found") + + from annotator import check_optimizer_disabled + + input_json = { + "language": "Solidity", + "sources": {"a.sol": {"content": "pragma solidity ^0.8.28; contract A {}"}}, + "settings": { + "optimizer": {"enabled": True, "runs": 200}, + "outputSelection": {"*": {"*": ["metadata"]}}, + }, + } + result = subprocess.run( + [_SOLC_PATH, "--standard-json"], + input=json.dumps(input_json).encode(), + capture_output=True, timeout=30, + ) + output = json.loads(result.stdout) + + with pytest.raises(RuntimeError, match="Optimizer is enabled"): + check_optimizer_disabled(output) + + +# =========================================================================== +# 2. Compilation metadata in the Info object +# =========================================================================== + +def test_info_contains_compiler_name(compile_solidity): + result = compile_solidity("contract C {}") + info = result.raw.get("ethdebug", {}) + assert info["compilation"]["compiler"]["name"] == "solc" + + +def test_info_contains_compiler_version(compile_solidity): + result = compile_solidity("contract C {}") + info = result.raw.get("ethdebug", {}) + version = info["compilation"]["compiler"]["version"] + assert version.startswith("0.8.") + + +def test_info_contains_source_path(compile_solidity): + result = compile_solidity("contract C {}", filename="my_contract.sol") + sources = result.raw["ethdebug"]["compilation"]["sources"] + assert any(s["path"] == "my_contract.sol" for s in sources) + + +def test_info_source_has_contents(compile_solidity): + result = compile_solidity("contract C {}", filename="my_contract.sol") + sources = result.raw["ethdebug"]["compilation"]["sources"] + src = next(s for s in sources if s["path"] == "my_contract.sol") + assert "contents" in src + assert "contract C" in src["contents"] + + +def test_info_programs_populated(compile_solidity): + result = compile_solidity("contract C {}") + programs = result.raw["ethdebug"]["programs"] + assert len(programs) >= 2 # create + call for C + + +# =========================================================================== +# 3. State variables – elementary types +# =========================================================================== + +def test_state_var_uint256(compile_solidity): + result = compile_solidity(""" + contract C { + uint256 public x; + } + """) + result.contract("C").deployed().state_var("x") \ + .has_type(uint256) \ + .at_storage_slot(0) + + +def test_state_var_address(compile_solidity): + result = compile_solidity(""" + contract C { + address public owner; + } + """) + result.contract("C").deployed().state_var("owner") \ + .has_type(address_t) \ + .at_storage_slot(0) + + +def test_state_var_bool(compile_solidity): + result = compile_solidity(""" + contract C { + bool public paused; + } + """) + result.contract("C").deployed().state_var("paused") \ + .has_type(bool_t) \ + .at_storage_slot(0) + + +def test_state_var_string(compile_solidity): + result = compile_solidity(""" + contract C { + string public name; + } + """) + result.contract("C").deployed().state_var("name") \ + .has_type(string_t) \ + .at_storage_slot(0) + + +def test_state_var_bytes_dynamic(compile_solidity): + result = compile_solidity(""" + contract C { + bytes public data; + } + """) + result.contract("C").deployed().state_var("data") \ + .has_type(bytes_t) \ + .at_storage_slot(0) + + +def test_state_var_bytes32(compile_solidity): + result = compile_solidity(""" + contract C { + bytes32 public root; + } + """) + result.contract("C").deployed().state_var("root") \ + .has_type(bytes32) \ + .at_storage_slot(0) + + +def test_state_var_bytes4(compile_solidity): + result = compile_solidity(""" + contract C { + bytes4 public selector; + } + """) + result.contract("C").deployed().state_var("selector") \ + .has_type(bytes4) \ + .at_storage_slot(0) + + +def test_state_var_int256(compile_solidity): + result = compile_solidity(""" + contract C { + int256 public delta; + } + """) + result.contract("C").deployed().state_var("delta") \ + .has_type(int256) \ + .at_storage_slot(0) + + +def test_state_var_uint8(compile_solidity): + result = compile_solidity(""" + contract C { + uint8 public decimals; + } + """) + result.contract("C").deployed().state_var("decimals") \ + .has_type(uint8) \ + .at_storage_slot(0) + + +# =========================================================================== +# 4. Constants +# =========================================================================== + +def test_constant_uint256(compile_solidity): + result = compile_solidity(""" + contract C { + uint256 public constant MAX = 1000; + } + """) + result.contract("C").deployed().state_var("MAX") \ + .has_type(uint256) \ + .is_constant() + + +def test_constant_address(compile_solidity): + result = compile_solidity(""" + contract C { + address public constant ZERO = address(0); + } + """) + result.contract("C").deployed().state_var("ZERO") \ + .has_type(address_t) \ + .is_constant() + + +def test_constant_bool(compile_solidity): + result = compile_solidity(""" + contract C { + bool public constant FLAG = true; + } + """) + result.contract("C").deployed().state_var("FLAG") \ + .has_type(bool_t) \ + .is_constant() + + +def test_constant_bytes32(compile_solidity): + result = compile_solidity(""" + contract C { + bytes32 public constant DOMAIN = keccak256("domain"); + } + """) + result.contract("C").deployed().state_var("DOMAIN") \ + .has_type(bytes32) \ + .is_constant() + + +# =========================================================================== +# 5. Immutables +# =========================================================================== + +def test_immutable_uint256(compile_solidity): + result = compile_solidity(""" + contract C { + uint256 public immutable creationTime; + constructor() { creationTime = block.timestamp; } + } + """) + result.contract("C").deployed().state_var("creationTime") \ + .has_type(uint256) \ + .is_immutable() + + +def test_immutable_address(compile_solidity): + result = compile_solidity(""" + contract C { + address public immutable owner; + constructor(address _owner) { owner = _owner; } + } + """) + result.contract("C").deployed().state_var("owner") \ + .has_type(address_t) \ + .is_immutable() + + +def test_immutable_bool(compile_solidity): + result = compile_solidity(""" + contract C { + bool public immutable locked; + constructor(bool _locked) { locked = _locked; } + } + """) + result.contract("C").deployed().state_var("locked") \ + .has_type(bool_t) \ + .is_immutable() + + +# =========================================================================== +# 6. Complex state variable types +# =========================================================================== + +def test_state_var_mapping(compile_solidity): + result = compile_solidity(""" + contract C { + mapping(address => uint256) public balances; + } + """) + result.contract("C").deployed().state_var("balances") \ + .has_type(mapping(address_t, uint256)) \ + .at_storage_slot(0) + + +def test_state_var_nested_mapping(compile_solidity): + result = compile_solidity(""" + contract C { + mapping(address => mapping(address => uint256)) public allowances; + } + """) + result.contract("C").deployed().state_var("allowances") \ + .has_type(mapping(address_t, mapping(address_t, uint256))) \ + .at_storage_slot(0) + + +def test_state_var_dynamic_array(compile_solidity): + result = compile_solidity(""" + contract C { + address[] public users; + } + """) + result.contract("C").deployed().state_var("users") \ + .has_type(array(address_t)) \ + .at_storage_slot(0) + + +def test_state_var_fixed_array(compile_solidity): + result = compile_solidity(""" + contract C { + uint256[4] public slots; + } + """) + result.contract("C").deployed().state_var("slots") \ + .has_type(array(uint256, count=4)) \ + .at_storage_slot(0) + + +def test_state_var_struct(compile_solidity): + result = compile_solidity(""" + struct Point { int256 x; int256 y; } + contract C { + Point public origin; + } + """) + result.contract("C").deployed().state_var("origin") \ + .has_type(struct(members={"x": int256, "y": int256})) \ + .at_storage_slot(0) + + +def test_state_var_enum(compile_solidity): + result = compile_solidity(""" + enum Status { Pending, Active, Inactive } + contract C { + Status public status; + } + """) + result.contract("C").deployed().state_var("status") \ + .has_type(enum_t(values=["Pending", "Active", "Inactive"])) \ + .at_storage_slot(0) + + +def test_state_var_mapping_to_struct(compile_solidity): + result = compile_solidity(""" + struct Profile { string name; uint256 age; } + contract C { + mapping(address => Profile) public profiles; + } + """) + result.contract("C").deployed().state_var("profiles") \ + .has_type(mapping(address_t, struct())) \ + .at_storage_slot(0) + + +# =========================================================================== +# 7. Multiple state variables – storage slot ordering +# =========================================================================== + +def test_storage_slot_ordering(compile_solidity): + result = compile_solidity(""" + contract C { + uint256 public a; + uint256 public b; + uint256 public c; + } + """) + deployed = result.contract("C").deployed() + deployed.state_var("a").at_storage_slot(0) + deployed.state_var("b").at_storage_slot(1) + deployed.state_var("c").at_storage_slot(2) + + +def test_storage_slot_packing(compile_solidity): + """uint128 variables pack two-to-a-slot.""" + result = compile_solidity(""" + contract C { + uint128 public lo; + uint128 public hi; + } + """) + deployed = result.contract("C").deployed() + deployed.state_var("lo").at_storage_slot(0, offset=0) + deployed.state_var("hi").at_storage_slot(0, offset=16) + + +def test_mixed_constant_and_storage(compile_solidity): + """Constants do not occupy storage; regular vars are slotted from 0.""" + result = compile_solidity(""" + contract C { + uint256 public constant VERSION = 1; + address public immutable owner; + uint256 public counter; + constructor(address _o) { owner = _o; } + } + """) + deployed = result.contract("C").deployed() + deployed.state_var("VERSION").is_constant() + deployed.state_var("owner").is_immutable() + deployed.state_var("counter").at_storage_slot(0) + + +# =========================================================================== +# 8. Function parameters +# =========================================================================== + +def test_function_single_param(compile_solidity): + result = compile_solidity(""" + contract C { + function foo(uint256 x) external pure returns (uint256) { + return x; + } + } + """) + result.contract("C").deployed() \ + .in_function("foo") \ + .param("x").has_type(uint256) + + +def test_function_multiple_params(compile_solidity): + result = compile_solidity(""" + contract C { + function transfer(address to, uint256 amount) external { + } + } + """) + fn = result.contract("C").deployed().in_function("transfer") + fn.param("to").has_type(address_t) + fn.param("amount").has_type(uint256) + + +def test_function_bool_param(compile_solidity): + result = compile_solidity(""" + contract C { + function setFlag(bool enabled) external { + } + } + """) + result.contract("C").deployed() \ + .in_function("setFlag") \ + .param("enabled").has_type(bool_t) + + +def test_function_bytes_param(compile_solidity): + result = compile_solidity(""" + contract C { + function process(bytes calldata data) external { + } + } + """) + result.contract("C").deployed() \ + .in_function("process") \ + .param("data").has_type(bytes_t) + + +def test_function_string_param(compile_solidity): + result = compile_solidity(""" + contract C { + function greet(string calldata name) external pure returns (string memory) { + return name; + } + } + """) + result.contract("C").deployed() \ + .in_function("greet") \ + .param("name").has_type(string_t) + + +def test_function_struct_param(compile_solidity): + result = compile_solidity(""" + struct Point { int256 x; int256 y; } + contract C { + function move(Point memory p) external pure returns (int256) { + return p.x; + } + } + """) + result.contract("C").deployed() \ + .in_function("move") \ + .param("p").has_type(struct()) + + +def test_function_array_param(compile_solidity): + result = compile_solidity(""" + contract C { + function sum(uint256[] calldata vals) external pure returns (uint256 total) { + for (uint256 i = 0; i < vals.length; i++) { total += vals[i]; } + } + } + """) + result.contract("C").deployed() \ + .in_function("sum") \ + .param("vals").has_type(array(uint256)) + + +def test_function_address_payable_param(compile_solidity): + result = compile_solidity(""" + contract C { + function send(address payable recipient, uint256 amount) external { + } + } + """) + fn = result.contract("C").deployed().in_function("send") + fn.param("recipient").has_type(address_t) + fn.param("amount").has_type(uint256) + + +def test_function_bytes32_param(compile_solidity): + result = compile_solidity(""" + contract C { + function verify(bytes32 hash, bytes32 sig) external pure returns (bool) { + return hash == sig; + } + } + """) + fn = result.contract("C").deployed().in_function("verify") + fn.param("hash").has_type(bytes32) + fn.param("sig").has_type(bytes32) + + +def test_function_enum_param(compile_solidity): + result = compile_solidity(""" + enum Direction { North, South, East, West } + contract C { + function move(Direction d) external pure returns (uint8) { + return uint8(d); + } + } + """) + result.contract("C").deployed() \ + .in_function("move") \ + .param("d").has_type(enum_t()) + + +def test_function_uint_variants(compile_solidity): + result = compile_solidity(""" + contract C { + function f(uint8 a, uint16 b, uint32 c, uint256 d) external pure + returns (uint256) { return a + b + c + d; } + } + """) + fn = result.contract("C").deployed().in_function("f") + fn.param("a").has_type(uint8) + fn.param("b").has_type(uint16) + fn.param("c").has_type(uint32) + fn.param("d").has_type(uint256) + + +def test_function_int_variants(compile_solidity): + result = compile_solidity(""" + contract C { + function f(int8 a, int256 b) external pure returns (int256) { return a + b; } + } + """) + fn = result.contract("C").deployed().in_function("f") + fn.param("a").has_type(int_(8)) + fn.param("b").has_type(int256) + + +# =========================================================================== +# 9. Return parameters +# =========================================================================== + +def test_named_return_parameter(compile_solidity): + result = compile_solidity(""" + contract C { + function compute(uint256 x) external pure returns (uint256 result) { + result = x * 2; + } + } + """) + result.contract("C").deployed() \ + .in_function("compute") \ + .returns("result").has_type(uint256) + + +def test_multiple_named_returns(compile_solidity): + result = compile_solidity(""" + contract C { + function divide(uint256 a, uint256 b) + external pure + returns (uint256 quotient, uint256 remainder) + { + quotient = a / b; + remainder = a % b; + } + } + """) + fn = result.contract("C").deployed().in_function("divide") + fn.returns("quotient").has_type(uint256) + fn.returns("remainder").has_type(uint256) + + +def test_named_return_bool(compile_solidity): + result = compile_solidity(""" + contract C { + function check(uint256 x) external pure returns (bool ok) { + ok = x > 0; + } + } + """) + result.contract("C").deployed() \ + .in_function("check") \ + .returns("ok").has_type(bool_t) + + +def test_named_return_address(compile_solidity): + result = compile_solidity(""" + contract C { + address public owner; + function getOwner() external view returns (address addr) { + addr = owner; + } + } + """) + result.contract("C").deployed() \ + .in_function("getOwner") \ + .returns("addr").has_type(address_t) + + +# =========================================================================== +# 10. Local variables +# =========================================================================== + +def test_local_variable_uint256(compile_solidity): + result = compile_solidity(""" + contract C { + function compute(uint256 x) external pure returns (uint256) { + uint256 doubled = x * 2; + return doubled; + } + } + """) + result.contract("C").deployed() \ + .in_function("compute") \ + .local("doubled").has_type(uint256) + + +def test_local_variable_bool(compile_solidity): + result = compile_solidity(""" + contract C { + function check() external pure returns (bool) { + bool flag = true; + return flag; + } + } + """) + result.contract("C").deployed() \ + .in_function("check") \ + .local("flag").has_type(bool_t) + + +def test_local_variable_address(compile_solidity): + result = compile_solidity(""" + contract C { + function caller() external view returns (address) { + address who = msg.sender; + return who; + } + } + """) + result.contract("C").deployed() \ + .in_function("caller") \ + .local("who").has_type(address_t) + + +def test_local_variable_string(compile_solidity): + result = compile_solidity(""" + contract C { + function greet() external pure returns (string memory) { + string memory msg = "hello"; + return msg; + } + } + """) + result.contract("C").deployed() \ + .in_function("greet") \ + .local("msg").has_type(string_t) + + +def test_local_variable_struct(compile_solidity): + result = compile_solidity(""" + struct Point { int256 x; int256 y; } + contract C { + function origin() external pure returns (int256) { + Point memory p = Point(0, 0); + return p.x; + } + } + """) + result.contract("C").deployed() \ + .in_function("origin") \ + .local("p").has_type(struct()) + + +def test_local_variable_bytes32(compile_solidity): + result = compile_solidity(""" + contract C { + function hashIt(string calldata s) external pure returns (bytes32) { + bytes32 h = keccak256(bytes(s)); + return h; + } + } + """) + result.contract("C").deployed() \ + .in_function("hashIt") \ + .local("h").has_type(bytes32) + + +def test_local_variable_low_level_call(compile_solidity): + result = compile_solidity(""" + contract C { + function callTarget(address target) external payable returns (bool) { + (bool ok,) = target.call{value: msg.value}(""); + return ok; + } + } + """) + result.contract("C").deployed() \ + .in_function("callTarget") \ + .local("ok").has_type(bool_t) + + +# =========================================================================== +# 11. Declaration source ranges +# =========================================================================== + +def test_state_var_has_declaration(compile_solidity): + result = compile_solidity(""" + contract C { + uint256 public x; + } + """) + result.contract("C").deployed().state_var("x").has_declaration() + + +def test_function_param_has_declaration(compile_solidity): + result = compile_solidity(""" + contract C { + function foo(uint256 val) external pure returns (uint256) { return val; } + } + """) + result.contract("C").deployed() \ + .in_function("foo") \ + .param("val").has_declaration() + + +# =========================================================================== +# 12. Multiple contracts in one compilation unit +# =========================================================================== + +def test_two_contracts_independent(compile_solidity): + result = compile_solidity(""" + contract Token { + uint256 public totalSupply; + mapping(address => uint256) public balances; + } + + contract Vault { + address public asset; + uint256 public reserve; + } + """) + token = result.contract("Token").deployed() + token.state_var("totalSupply").has_type(uint256).at_storage_slot(0) + token.state_var("balances").has_type(mapping(address_t, uint256)).at_storage_slot(1) + + vault = result.contract("Vault").deployed() + vault.state_var("asset").has_type(address_t).at_storage_slot(0) + vault.state_var("reserve").has_type(uint256).at_storage_slot(1) + + +def test_contract_names_discovered(compile_solidity): + result = compile_solidity(""" + contract A {} + contract B {} + contract C {} + """) + names = result.contract_names() + assert "A" in names + assert "B" in names + assert "C" in names + + +# =========================================================================== +# 13. Creation vs deployed bytecode programs +# =========================================================================== + +def test_creation_program_exists(compile_solidity): + result = compile_solidity(""" + contract C { + uint256 public x; + constructor() { x = 42; } + } + """) + creation = result.contract("C").creation() + assert creation.instruction_count() > 0 + + +def test_deployed_program_exists(compile_solidity): + result = compile_solidity(""" + contract C { + uint256 public x; + } + """) + deployed = result.contract("C").deployed() + assert deployed.instruction_count() > 0 + + +def test_state_vars_in_creation_context(compile_solidity): + """State variables should also appear in the creation bytecode context.""" + result = compile_solidity(""" + contract C { + uint256 public x; + constructor() { x = 1; } + } + """) + result.contract("C").creation().state_var("x") \ + .has_type(uint256) \ + .at_storage_slot(0) + + +# =========================================================================== +# 14. Instructions carry source ranges +# =========================================================================== + +def test_instructions_have_source_ranges(compile_solidity): + result = compile_solidity(""" + contract C { + uint256 public x; + function set(uint256 v) external { x = v; } + } + """) + deployed = result.contract("C").deployed() + # At least some instructions should have a code context + mapped = deployed.instructions_with_source() + assert len(mapped) > 0 + + +# =========================================================================== +# 15. Complex real-world contract +# =========================================================================== + +def test_erc20_like_contract(compile_solidity): + result = compile_solidity(""" + contract ERC20 { + string public name; + string public symbol; + uint8 public decimals; + uint256 public totalSupply; + + mapping(address => uint256) public balanceOf; + mapping(address => mapping(address => uint256)) public allowance; + + address public immutable deployer; + + constructor( + string memory _name, + string memory _symbol, + uint8 _decimals, + uint256 _totalSupply + ) { + deployer = msg.sender; + name = _name; + symbol = _symbol; + decimals = _decimals; + totalSupply = _totalSupply; + balanceOf[msg.sender] = _totalSupply; + } + + function transfer(address to, uint256 amount) + external returns (bool success) + { + require(balanceOf[msg.sender] >= amount, "insufficient"); + balanceOf[msg.sender] -= amount; + balanceOf[to] += amount; + bool ok = true; + success = ok; + return success; + } + + function approve(address spender, uint256 amount) + external returns (bool) + { + allowance[msg.sender][spender] = amount; + return true; + } + } + """) + + deployed = result.contract("ERC20").deployed() + + # State variables + deployed.state_var("name").has_type(string_t).at_storage_slot(0) + deployed.state_var("symbol").has_type(string_t).at_storage_slot(1) + deployed.state_var("decimals").has_type(uint8).at_storage_slot(2) + deployed.state_var("totalSupply").has_type(uint256).at_storage_slot(3) + deployed.state_var("balanceOf").has_type(mapping(address_t, uint256)).at_storage_slot(4) + deployed.state_var("allowance") \ + .has_type(mapping(address_t, mapping(address_t, uint256))) \ + .at_storage_slot(5) + deployed.state_var("deployer").has_type(address_t).is_immutable() + + # Constructor parameters (in creation bytecode) + ctor = result.contract("ERC20").creation().in_function("constructor") + ctor.param("_name").has_type(string_t) + ctor.param("_symbol").has_type(string_t) + ctor.param("_decimals").has_type(uint8) + ctor.param("_totalSupply").has_type(uint256) + + # transfer function + transfer = deployed.in_function("transfer") + transfer.param("to").has_type(address_t) + transfer.param("amount").has_type(uint256) + transfer.returns("success").has_type(bool_t) + transfer.local("ok").has_type(bool_t) + + # approve function + approve = deployed.in_function("approve") + approve.param("spender").has_type(address_t) + approve.param("amount").has_type(uint256)