diff --git a/Dockerfile-local b/Dockerfile-local index e2093cc9..e3dbbbf6 100644 --- a/Dockerfile-local +++ b/Dockerfile-local @@ -23,11 +23,12 @@ RUN apt-get update && apt-get -y upgrade && apt-get install -y --no-install-reco && update-ca-certificates ARG userid=10001 ARG groupid=10001 -RUN groupadd -g ${groupid} assertoor && useradd -m -u ${userid} -g assertoor assertoor +RUN (getent group ${groupid} || groupadd -g ${groupid} assertoor) && \ + useradd -m -u ${userid} -g ${groupid} assertoor RUN echo "assertoor ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers.d/assertoor WORKDIR /app COPY --from=builder /src/bin/* /app/ -RUN chown -R assertoor:assertoor /app +RUN chown -R assertoor:${groupid} /app RUN mkdir /workspace USER assertoor WORKDIR /workspace diff --git a/Makefile b/Makefile index 1db24f76..bc8d1cf0 100644 --- a/Makefile +++ b/Makefile @@ -26,7 +26,7 @@ devnet: .hack/devnet/run.sh devnet-run: devnet - go run main.go --config .hack/devnet/generated-assertoor-config.yaml + go run main.go --config .hack/devnet/generated-assertoor-config.yaml --verbose devnet-run-docker: devnet docker build --file ./Dockerfile-local -t assertoor:devnet-run --build-arg userid=$(CURRENT_UID) --build-arg groupid=$(CURRENT_GID) . diff --git a/pkg/coordinator/clients/consensus/rpc/beaconapi.go b/pkg/coordinator/clients/consensus/rpc/beaconapi.go index b152a602..dd249127 100644 --- a/pkg/coordinator/clients/consensus/rpc/beaconapi.go +++ b/pkg/coordinator/clients/consensus/rpc/beaconapi.go @@ -496,3 +496,30 @@ func (bc *BeaconClient) SubmitProposerSlashing(ctx context.Context, slashing *ph return nil } + +type NodeIdentity struct { + PeerID string `json:"peer_id"` + ENR string `json:"enr"` + P2PAddresses []string `json:"p2p_addresses"` + DiscoveryAddresses []string `json:"discovery_addresses"` + Metadata struct { + SeqNumber uint64 `json:"seq_number,string"` + Attnets string `json:"attnets"` + Syncnets string `json:"syncnets"` + } `json:"metadata"` +} + +type apiNodeIdentity struct { + Data *NodeIdentity `json:"data"` +} + +func (bc *BeaconClient) GetNodeIdentity(ctx context.Context) (*NodeIdentity, error) { + var nodeIdentity apiNodeIdentity + + err := bc.getJSON(ctx, fmt.Sprintf("%s/eth/v1/node/identity", bc.endpoint), &nodeIdentity) + if err != nil { + return nil, fmt.Errorf("error retrieving node identity: %v", err) + } + + return nodeIdentity.Data, nil +} diff --git a/pkg/coordinator/tasks/check_consensus_identity/README.md b/pkg/coordinator/tasks/check_consensus_identity/README.md new file mode 100644 index 00000000..158d8e88 --- /dev/null +++ b/pkg/coordinator/tasks/check_consensus_identity/README.md @@ -0,0 +1,117 @@ +# `check_consensus_identity` Task + +This task checks consensus client node identity information by querying the `/eth/v1/node/identity` API endpoint. It can verify various aspects of the node identity including CGC (Custody Group Count) extracted from ENR (Ethereum Node Record). + +## Configuration + +### Required Parameters +- **`clientPattern`** *(string)*: Pattern to match client names (e.g., `"lodestar-*"`, `"*"` for all) + +### Optional Parameters +- **`pollInterval`** *(duration)*: Interval between checks (default: `10s`) +- **`minClientCount`** *(int)*: Minimum number of clients that must pass checks (default: `1`) +- **`maxFailCount`** *(int)*: Maximum number of clients that can fail (-1 for no limit, default: `-1`) +- **`failOnCheckMiss`** *(bool)*: Whether to fail the task when checks don't pass (default: `false`) + +### CGC (Custody Group Count) Checks +- **`expectCgc`** *(int)*: Expect exact CGC value +- **`minCgc`** *(int)*: Minimum CGC value required +- **`maxCgc`** *(int)*: Maximum CGC value allowed + +### ENR Checks +- **`expectEnrField`** *(map[string]interface{})*: Expected ENR field values + +### PeerID Checks +- **`expectPeerIdPattern`** *(string)*: Regex pattern that PeerID must match + +### P2P Address Checks +- **`expectP2pAddressCount`** *(int)*: Expected number of P2P addresses +- **`expectP2pAddressMatch`** *(string)*: Regex pattern that at least one P2P address must match + +### Metadata Checks +- **`expectSeqNumber`** *(uint64)*: Expected sequence number +- **`minSeqNumber`** *(uint64)*: Minimum sequence number required + +## Outputs + +The task exports the following data via `ctx.Outputs`: + +- **`matchingClients`**: Array of clients that passed all checks +- **`failedClients`**: Array of clients that failed checks +- **`totalCount`**: Total number of clients checked +- **`matchingCount`**: Number of clients that passed checks +- **`failedCount`**: Number of clients that failed checks + +Each client result includes: +- `clientName`: Name of the consensus client +- `peerId`: Peer ID from node identity +- `enr`: ENR string +- `p2pAddresses`: Array of P2P addresses +- `discoveryAddresses`: Array of discovery addresses +- `seqNumber`: Metadata sequence number +- `attnets`: Attestation subnets +- `syncnets`: Sync subnets +- `cgc`: Extracted Custody Group Count +- `enrFields`: Parsed ENR fields +- `checksPassed`: Whether all configured checks passed +- `failureReasons`: Array of reasons why checks failed (if any) + +## Example Configurations + +### Basic Identity Check +```yaml +- name: check_node_identity + task: check_consensus_identity + config: + clientPattern: "lodestar-*" + minClientCount: 1 +``` + +### CGC Validation +```yaml +- name: validate_cgc + task: check_consensus_identity + config: + clientPattern: "*" + expectCgc: 8 + failOnCheckMiss: true +``` + +### Comprehensive Identity Check +```yaml +- name: full_identity_check + task: check_consensus_identity + config: + clientPattern: "prysm-*" + minCgc: 4 + maxCgc: 16 + expectP2pAddressCount: 2 + expectPeerIdPattern: "^16Uiu2HA.*" + minSeqNumber: 1 + pollInterval: 30s + failOnCheckMiss: true +``` + +### Using Outputs in Subsequent Tasks +```yaml +- name: check_identity + task: check_consensus_identity + config: + clientPattern: "*" + expectCgc: 8 + +- name: verify_results + task: run_shell + config: + command: | + echo "Found ${check_identity.matchingCount} matching clients" + echo "Total CGC sum: $(echo '${check_identity.matchingClients}' | jq '[.[] | .cgc] | add')" +``` + +## Use Cases + +1. **PeerDAS Validation**: Verify nodes have correct custody assignments +2. **Network Health**: Check node identity consistency across clients +3. **Configuration Validation**: Ensure nodes are properly configured for specific network requirements +4. **Testing**: Validate node behavior changes after deposits or configuration updates +5. **Monitoring**: Track node identity changes over time \ No newline at end of file diff --git a/pkg/coordinator/tasks/check_consensus_identity/config.go b/pkg/coordinator/tasks/check_consensus_identity/config.go new file mode 100644 index 00000000..769b1dac --- /dev/null +++ b/pkg/coordinator/tasks/check_consensus_identity/config.go @@ -0,0 +1,59 @@ +package checkconsensusidentity + +import ( + "fmt" + "time" + + "github.com/ethpandaops/assertoor/pkg/coordinator/helper" +) + +type Config struct { + ClientPattern string `yaml:"clientPattern" json:"clientPattern"` + PollInterval helper.Duration `yaml:"pollInterval" json:"pollInterval"` + MinClientCount int `yaml:"minClientCount" json:"minClientCount"` + MaxFailCount int `yaml:"maxFailCount" json:"maxFailCount"` + FailOnCheckMiss bool `yaml:"failOnCheckMiss" json:"failOnCheckMiss"` + + // CGC (Custody Group Count) checks + ExpectCGC *uint64 `yaml:"expectCgc" json:"expectCgc"` + MinCGC *uint64 `yaml:"minCgc" json:"minCgc"` + MaxCGC *uint64 `yaml:"maxCgc" json:"maxCgc"` + + // ENR checks + ExpectENRField map[string]interface{} `yaml:"expectEnrField" json:"expectEnrField"` + + // PeerID checks + ExpectPeerIDPattern string `yaml:"expectPeerIdPattern" json:"expectPeerIdPattern"` + + // P2P address checks + ExpectP2PAddressCount *int `yaml:"expectP2pAddressCount" json:"expectP2pAddressCount"` + ExpectP2PAddressMatch string `yaml:"expectP2pAddressMatch" json:"expectP2pAddressMatch"` + + // Metadata checks + ExpectSeqNumber *uint64 `yaml:"expectSeqNumber" json:"expectSeqNumber"` + MinSeqNumber *uint64 `yaml:"minSeqNumber" json:"minSeqNumber"` +} + +func DefaultConfig() Config { + return Config{ + PollInterval: helper.Duration{Duration: 10 * time.Second}, + MaxFailCount: -1, + MinClientCount: 1, + } +} + +func (c *Config) Validate() error { + if c.ClientPattern == "" { + return fmt.Errorf("clientPattern is required") + } + + if c.MinCGC != nil && c.MaxCGC != nil && *c.MinCGC > *c.MaxCGC { + return fmt.Errorf("minCgc must be <= maxCgc") + } + + if c.ExpectP2PAddressCount != nil && *c.ExpectP2PAddressCount < 0 { + return fmt.Errorf("expectP2pAddressCount must be >= 0") + } + + return nil +} diff --git a/pkg/coordinator/tasks/check_consensus_identity/task.go b/pkg/coordinator/tasks/check_consensus_identity/task.go new file mode 100644 index 00000000..c25f13b9 --- /dev/null +++ b/pkg/coordinator/tasks/check_consensus_identity/task.go @@ -0,0 +1,484 @@ +package checkconsensusidentity + +import ( + "context" + "encoding/base64" + "encoding/hex" + "fmt" + "regexp" + "strconv" + "strings" + "time" + + "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/ethereum/go-ethereum/rlp" + "github.com/ethpandaops/assertoor/pkg/coordinator/clients" + "github.com/ethpandaops/assertoor/pkg/coordinator/types" + "github.com/ethpandaops/assertoor/pkg/coordinator/vars" + "github.com/sirupsen/logrus" +) + +var ( + TaskName = "check_consensus_identity" + TaskDescriptor = &types.TaskDescriptor{ + Name: TaskName, + Description: "Checks consensus client node identity information including CGC extraction from ENR.", + Config: DefaultConfig(), + NewTask: NewTask, + } +) + +type Task struct { + ctx *types.TaskContext + options *types.TaskOptions + config Config + logger logrus.FieldLogger +} + +type IdentityCheckResult struct { + ClientName string `json:"clientName"` + PeerID string `json:"peerId"` + ENR string `json:"enr"` + P2PAddresses []string `json:"p2pAddresses"` + DiscoveryAddresses []string `json:"discoveryAddresses"` + SeqNumber uint64 `json:"seqNumber"` + Attnets string `json:"attnets"` + Syncnets string `json:"syncnets"` + CGC uint64 `json:"cgc"` + ENRFields map[string]interface{} `json:"enrFields"` + ChecksPassed bool `json:"checksPassed"` + FailureReasons []string `json:"failureReasons"` +} + +func NewTask(ctx *types.TaskContext, options *types.TaskOptions) (types.Task, error) { + return &Task{ + ctx: ctx, + options: options, + logger: ctx.Logger.GetLogger(), + }, nil +} + +func (t *Task) Config() interface{} { + return t.config +} + +func (t *Task) Timeout() time.Duration { + return t.options.Timeout.Duration +} + +func (t *Task) LoadConfig() error { + config := DefaultConfig() + + if t.options.Config != nil { + if err := t.options.Config.Unmarshal(&config); err != nil { + return fmt.Errorf("error parsing task config for %v: %w", TaskName, err) + } + } + + err := t.ctx.Vars.ConsumeVars(&config, t.options.ConfigVars) + if err != nil { + return err + } + + if err := config.Validate(); err != nil { + return err + } + + t.config = config + + return nil +} + +func (t *Task) Execute(ctx context.Context) error { + t.processCheck() + + for { + select { + case <-time.After(t.config.PollInterval.Duration): + t.processCheck() + case <-ctx.Done(): + return nil + } + } +} + +func (t *Task) processCheck() { + passResultCount := 0 + totalClientCount := 0 + matchingClients := []*IdentityCheckResult{} + failedClients := []*IdentityCheckResult{} + failedClientNames := []string{} + + t.logger.Infof("Starting identity check for pattern: %s", t.config.ClientPattern) + + for _, client := range t.ctx.Scheduler.GetServices().ClientPool().GetClientsByNamePatterns(t.config.ClientPattern, "") { + if client.ConsensusClient == nil { + t.logger.Warnf("Client %s has no consensus client, skipping", client.Config.Name) + continue + } + + totalClientCount++ + + t.logger.Infof("Checking identity for client: %s", client.Config.Name) + + result := t.checkClientIdentity(client) + + // Debug output for each client + t.logger.Infof("Client %s identity check result:", result.ClientName) + t.logger.Infof(" PeerID: %s", result.PeerID) + t.logger.Infof(" ENR: %s", result.ENR) + t.logger.Infof(" CGC: %d", result.CGC) + t.logger.Infof(" P2P Addresses: %v", result.P2PAddresses) + t.logger.Infof(" Discovery Addresses: %v", result.DiscoveryAddresses) + t.logger.Infof(" Sequence Number: %d", result.SeqNumber) + t.logger.Infof(" Checks Passed: %v", result.ChecksPassed) + + if len(result.FailureReasons) > 0 { + t.logger.Infof(" Failure Reasons: %v", result.FailureReasons) + } + + if result.ChecksPassed { + passResultCount++ + + matchingClients = append(matchingClients, result) + t.logger.Infof("✅ Client %s passed all checks", result.ClientName) + } else { + failedClients = append(failedClients, result) + failedClientNames = append(failedClientNames, result.ClientName) + t.logger.Warnf("❌ Client %s failed checks: %v", result.ClientName, result.FailureReasons) + } + } + + requiredPassCount := t.config.MinClientCount + if requiredPassCount == 0 { + requiredPassCount = totalClientCount + } + + resultPass := passResultCount >= requiredPassCount + + // Set output variables using context.Outputs + if matchingClientsData, err := vars.GeneralizeData(matchingClients); err == nil { + t.ctx.Outputs.SetVar("matchingClients", matchingClientsData) + } else { + t.logger.Warnf("Failed setting `matchingClients` output: %v", err) + } + + if failedClientsData, err := vars.GeneralizeData(failedClients); err == nil { + t.ctx.Outputs.SetVar("failedClients", failedClientsData) + } else { + t.logger.Warnf("Failed setting `failedClients` output: %v", err) + } + + t.ctx.Outputs.SetVar("totalCount", totalClientCount) + t.ctx.Outputs.SetVar("matchingCount", passResultCount) + t.ctx.Outputs.SetVar("failedCount", totalClientCount-passResultCount) + + // Enhanced summary logging + t.logger.Infof("📊 Identity check summary:") + t.logger.Infof(" Total clients: %d", totalClientCount) + t.logger.Infof(" Passed: %d", passResultCount) + t.logger.Infof(" Failed: %d", totalClientCount-passResultCount) + t.logger.Infof(" Required pass count: %d", requiredPassCount) + t.logger.Infof(" Overall result: %v", resultPass) + + if len(failedClientNames) > 0 { + t.logger.Infof(" Failed clients: %v", failedClientNames) + } + + // Set task result - default to pending instead of failure unless explicitly configured + switch { + case t.config.MaxFailCount > -1 && len(failedClients) > t.config.MaxFailCount: + if t.config.FailOnCheckMiss { + t.logger.Infof("Setting result to FAILURE (too many failures: %d > %d)", len(failedClients), t.config.MaxFailCount) + t.ctx.SetResult(types.TaskResultFailure) + } else { + t.logger.Infof("Setting result to PENDING (too many failures but failOnCheckMiss=false)") + t.ctx.SetResult(types.TaskResultNone) + } + case resultPass: + t.logger.Infof("Setting result to SUCCESS (requirements met)") + t.ctx.SetResult(types.TaskResultSuccess) + default: + if t.config.FailOnCheckMiss { + t.logger.Infof("Setting result to FAILURE (requirements not met and failOnCheckMiss=true)") + t.ctx.SetResult(types.TaskResultFailure) + } else { + t.logger.Infof("Setting result to PENDING (requirements not met but failOnCheckMiss=false)") + t.ctx.SetResult(types.TaskResultNone) + } + } +} + +func (t *Task) checkClientIdentity(client *clients.PoolClient) *IdentityCheckResult { + result := &IdentityCheckResult{ + ClientName: client.Config.Name, + ChecksPassed: true, + FailureReasons: []string{}, + } + + t.logger.Debugf("Getting node identity for client %s", client.Config.Name) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + identity, err := client.ConsensusClient.GetRPCClient().GetNodeIdentity(ctx) + if err != nil { + t.logger.Errorf("Failed to get node identity for client %s: %v", client.Config.Name, err) + + result.ChecksPassed = false + result.FailureReasons = append(result.FailureReasons, fmt.Sprintf("Failed to get node identity: %v", err)) + + return result + } + + t.logger.Debugf("Retrieved node identity for client %s: PeerID=%s, ENR=%s", + client.Config.Name, identity.PeerID, identity.ENR) + + result.PeerID = identity.PeerID + result.ENR = identity.ENR + result.P2PAddresses = identity.P2PAddresses + result.DiscoveryAddresses = identity.DiscoveryAddresses + result.SeqNumber = identity.Metadata.SeqNumber + result.Attnets = identity.Metadata.Attnets + result.Syncnets = identity.Metadata.Syncnets + + // Extract CGC from ENR + t.logger.Debugf("Extracting CGC from ENR for client %s", client.Config.Name) + + cgc, enrFields, err := t.extractCGCFromENR(identity.ENR) + if err != nil { + t.logger.Errorf("Failed to parse ENR for client %s: %v", client.Config.Name, err) + + result.ChecksPassed = false + result.FailureReasons = append(result.FailureReasons, fmt.Sprintf("Failed to parse ENR: %v", err)) + + return result + } + + t.logger.Debugf("Extracted CGC=%d for client %s", cgc, client.Config.Name) + + result.CGC = cgc + result.ENRFields = enrFields + + // Perform configured checks + t.logger.Debugf("Performing checks for client %s", client.Config.Name) + t.performChecks(result) + + return result +} + +func (t *Task) performChecks(result *IdentityCheckResult) { + // Check CGC expectations + if t.config.ExpectCGC != nil && result.CGC != *t.config.ExpectCGC { + result.ChecksPassed = false + result.FailureReasons = append(result.FailureReasons, + fmt.Sprintf("Expected CGC %d, got %d", *t.config.ExpectCGC, result.CGC)) + } + + if t.config.MinCGC != nil && result.CGC < *t.config.MinCGC { + result.ChecksPassed = false + result.FailureReasons = append(result.FailureReasons, + fmt.Sprintf("CGC %d is below minimum %d", result.CGC, *t.config.MinCGC)) + } + + if t.config.MaxCGC != nil && result.CGC > *t.config.MaxCGC { + result.ChecksPassed = false + result.FailureReasons = append(result.FailureReasons, + fmt.Sprintf("CGC %d is above maximum %d", result.CGC, *t.config.MaxCGC)) + } + + // Check PeerID pattern + if t.config.ExpectPeerIDPattern != "" { + matched, err := regexp.MatchString(t.config.ExpectPeerIDPattern, result.PeerID) + if err != nil { + result.ChecksPassed = false + result.FailureReasons = append(result.FailureReasons, + fmt.Sprintf("Invalid PeerID pattern: %v", err)) + } else if !matched { + result.ChecksPassed = false + result.FailureReasons = append(result.FailureReasons, + fmt.Sprintf("PeerID %s does not match pattern %s", result.PeerID, t.config.ExpectPeerIDPattern)) + } + } + + // Check P2P address count + if t.config.ExpectP2PAddressCount != nil && len(result.P2PAddresses) != *t.config.ExpectP2PAddressCount { + result.ChecksPassed = false + result.FailureReasons = append(result.FailureReasons, + fmt.Sprintf("Expected %d P2P addresses, got %d", *t.config.ExpectP2PAddressCount, len(result.P2PAddresses))) + } + + // Check P2P address match + if t.config.ExpectP2PAddressMatch != "" { + found := false + + for _, addr := range result.P2PAddresses { + if matched, _ := regexp.MatchString(t.config.ExpectP2PAddressMatch, addr); matched { + found = true + break + } + } + + if !found { + result.ChecksPassed = false + result.FailureReasons = append(result.FailureReasons, + fmt.Sprintf("No P2P address matches pattern %s", t.config.ExpectP2PAddressMatch)) + } + } + + // Check sequence number + if t.config.ExpectSeqNumber != nil && result.SeqNumber != *t.config.ExpectSeqNumber { + result.ChecksPassed = false + result.FailureReasons = append(result.FailureReasons, + fmt.Sprintf("Expected sequence number %d, got %d", *t.config.ExpectSeqNumber, result.SeqNumber)) + } + + if t.config.MinSeqNumber != nil && result.SeqNumber < *t.config.MinSeqNumber { + result.ChecksPassed = false + result.FailureReasons = append(result.FailureReasons, + fmt.Sprintf("Sequence number %d is below minimum %d", result.SeqNumber, *t.config.MinSeqNumber)) + } + + // Check ENR fields + for expectedField, expectedValue := range t.config.ExpectENRField { + if actualValue, exists := result.ENRFields[expectedField]; !exists { + result.ChecksPassed = false + result.FailureReasons = append(result.FailureReasons, + fmt.Sprintf("Expected ENR field %s not found", expectedField)) + } else if actualValue != expectedValue { + result.ChecksPassed = false + result.FailureReasons = append(result.FailureReasons, + fmt.Sprintf("ENR field %s expected %v, got %v", expectedField, expectedValue, actualValue)) + } + } +} + +// extractCGCFromENR extracts the Custody Group Count from ENR using proper ENR parsing +func (t *Task) extractCGCFromENR(enrStr string) (cgc uint64, enrFields map[string]interface{}, err error) { + if enrStr == "" { + t.logger.Debugf("Empty ENR provided") + return 0, nil, fmt.Errorf("empty ENR") + } + + t.logger.Debugf("Parsing ENR: %s", enrStr) + + // Decode ENR using go-ethereum's ENR package + record, err := t.decodeENR(enrStr) + if err != nil { + t.logger.Errorf("Failed to decode ENR: %v", err) + return 0, nil, err + } + + // Get all key-value pairs from ENR + enrFields = t.getKeyValuesFromENR(record) + + if cgcHex, ok := enrFields["cgc"]; ok { + // CGC is stored as hex string, parse it + cgcStr, ok := cgcHex.(string) + if !ok { + t.logger.Warnf("CGC field is not a string: %v", cgcHex) + } else { + // Remove "0x" prefix if present + cgcStr = strings.TrimPrefix(cgcStr, "0x") + + val, err := strconv.ParseUint(cgcStr, 16, 64) + if err != nil { + t.logger.Errorf("Failed to parse CGC value %s: %v", cgcStr, err) + } else { + cgc = val + t.logger.Debugf("Found CGC in ENR: %d", cgc) + } + } + } else { + t.logger.Debugf("No CGC field found in ENR") + } + + enrFields["enr_original"] = enrStr + + return cgc, enrFields, nil +} + +// decodeENR decodes an ENR string into a Record (from Dora's implementation) +func (t *Task) decodeENR(raw string) (*enr.Record, error) { + b := []byte(raw) + if strings.HasPrefix(raw, "enr:") { + b = b[4:] + } + + dec := make([]byte, base64.RawURLEncoding.DecodedLen(len(b))) + + n, err := base64.RawURLEncoding.Decode(dec, b) + if err != nil { + return nil, err + } + + var r enr.Record + err = rlp.DecodeBytes(dec[:n], &r) + + return &r, err +} + +// getKeyValuesFromENR extracts all key-value pairs from an ENR record (from Dora's implementation) +func (t *Task) getKeyValuesFromENR(r *enr.Record) map[string]interface{} { + fields := make(map[string]interface{}) + + fields["seq"] = r.Seq() + fields["signature"] = "0x" + hex.EncodeToString(r.Signature()) + + // Get all key-value pairs from the record + kv := r.AppendElements(nil)[1:] // Skip the sequence number + for i := 0; i < len(kv); i += 2 { + key, ok := kv[i].(string) + if !ok { + t.logger.Warnf("Invalid ENR key type: %T", kv[i]) + continue + } + + val, ok := kv[i+1].(rlp.RawValue) + if !ok { + t.logger.Warnf("Invalid ENR value type for key %s: %T", key, kv[i+1]) + continue + } + + // Format the value based on the key + fmtval := t.formatENRValue(key, val) + fields[key] = fmtval + } + + return fields +} + +// formatENRValue formats an ENR value based on its key type +func (t *Task) formatENRValue(key string, val rlp.RawValue) string { + switch key { + case "id": + content, _, err := rlp.SplitString(val) + if err == nil { + return string(content) + } + case "ip", "ip6": + content, _, err := rlp.SplitString(val) + if err == nil && (len(content) == 4 || len(content) == 16) { + return fmt.Sprintf("%v", content) // Return IP as string + } + case "tcp", "tcp6", "udp", "udp6": + var x uint64 + if err := rlp.DecodeBytes(val, &x); err == nil { + return strconv.FormatUint(x, 10) + } + case "cgc": + // CGC is stored as a single byte + content, _, err := rlp.SplitString(val) + if err == nil && len(content) > 0 { + return "0x" + hex.EncodeToString(content) + } + } + + // Default: return as hex + content, _, err := rlp.SplitString(val) + if err == nil { + return "0x" + hex.EncodeToString(content) + } + + return "0x" + hex.EncodeToString(val) +} diff --git a/pkg/coordinator/tasks/get_consensus_validators/README.md b/pkg/coordinator/tasks/get_consensus_validators/README.md new file mode 100644 index 00000000..a5bf8af7 --- /dev/null +++ b/pkg/coordinator/tasks/get_consensus_validators/README.md @@ -0,0 +1,112 @@ +# `get_consensus_validators` Task + +This task retrieves validators from the consensus layer that match specified filtering criteria. It's useful for finding validators associated with specific clients, status conditions, or other attributes. + +## Configuration + +### Client/Name Filtering +- **`clientPattern`** *(string)*: Regex pattern to match client names in validator names +- **`validatorNamePattern`** *(string)*: Regex pattern to match validator names directly + +### Status Filtering +- **`validatorStatus`** *([]string)*: Array of allowed validator statuses (default: all statuses) + - Possible values: `pending_initialized`, `pending_queued`, `active_ongoing`, `active_exiting`, `active_slashed`, `exited_unslashed`, `exited_slashed`, `withdrawal_possible`, `withdrawal_done` + +### Balance Filtering +- **`minValidatorBalance`** *(uint64)*: Minimum validator balance in Gwei +- **`maxValidatorBalance`** *(uint64)*: Maximum validator balance in Gwei + +### Index Filtering +- **`minValidatorIndex`** *(uint64)*: Minimum validator index +- **`maxValidatorIndex`** *(uint64)*: Maximum validator index + +### Other Filtering +- **`withdrawalCredsPrefix`** *(string)*: Required prefix for withdrawal credentials (e.g., "0x01", "0x02") + +### Output Options +- **`maxResults`** *(int)*: Maximum number of validators to return (default: 100) +- **`outputFormat`** *(string)*: Output format - "full", "pubkeys", or "indices" (default: "full") + +## Outputs + +Depending on `outputFormat`, the task exports: + +### Format: "full" (default) +- **`validators`**: Array of full validator information objects +- **`count`**: Number of matching validators + +Each validator object includes: +- `index`: Validator index +- `pubkey`: Validator public key (0x prefixed hex) +- `balance`: Current balance in Gwei +- `status`: Validator status string +- `effectiveBalance`: Effective balance in Gwei +- `withdrawalCredentials`: Withdrawal credentials (0x prefixed hex) +- `activationEpoch`: Activation epoch +- `exitEpoch`: Exit epoch +- `withdrawableEpoch`: Withdrawable epoch +- `slashed`: Boolean indicating if validator is slashed + +### Format: "pubkeys" +- **`pubkeys`**: Array of validator public keys as hex strings +- **`count`**: Number of matching validators + +### Format: "indices" +- **`indices`**: Array of validator indices as integers +- **`count`**: Number of matching validators + +## Example Configurations + +### Find Validators for Specific Client +```yaml +- name: find_lighthouse_validators + task: get_consensus_validators + config: + clientPattern: "lighthouse.*" + validatorStatus: ["active_ongoing"] + maxResults: 10 + outputFormat: "full" +``` + +### Get Validator Pubkeys for BLS Changes +```yaml +- name: get_validator_pubkeys + task: get_consensus_validators + config: + validatorNamePattern: "validator_[0-9]+" + validatorStatus: ["active_ongoing"] + withdrawalCredsPrefix: "0x01" + outputFormat: "pubkeys" + maxResults: 5 +``` + +### Find Validators by Balance Range +```yaml +- name: find_rich_validators + task: get_consensus_validators + config: + clientPattern: "prysm.*" + minValidatorBalance: 40000000000 # > 40 ETH + validatorStatus: ["active_ongoing"] + maxResults: 20 +``` + +### Get Validator Indices for Operations +```yaml +- name: get_validator_indices + task: get_consensus_validators + config: + validatorNamePattern: "test_validator_.*" + validatorStatus: ["pending_initialized", "pending_queued"] + outputFormat: "indices" + maxResults: 2 +``` + +## Use Cases + +1. **Client-Specific Operations**: Find validators belonging to specific consensus clients +2. **BLS Changes**: Identify validators that need withdrawal credential updates +3. **Deposit Operations**: Find validators for top-up deposits +4. **Status Monitoring**: Track validators in specific states +5. **Balance Analysis**: Identify validators by balance criteria +6. **Bulk Operations**: Get validator sets for matrix operations \ No newline at end of file diff --git a/pkg/coordinator/tasks/get_consensus_validators/config.go b/pkg/coordinator/tasks/get_consensus_validators/config.go new file mode 100644 index 00000000..3527a856 --- /dev/null +++ b/pkg/coordinator/tasks/get_consensus_validators/config.go @@ -0,0 +1,63 @@ +package getconsensusvalidators + +import ( + "fmt" +) + +type Config struct { + ClientPattern string `yaml:"clientPattern" json:"clientPattern"` + ValidatorNamePattern string `yaml:"validatorNamePattern" json:"validatorNamePattern"` + ValidatorStatus []string `yaml:"validatorStatus" json:"validatorStatus"` + MinValidatorBalance *uint64 `yaml:"minValidatorBalance" json:"minValidatorBalance"` + MaxValidatorBalance *uint64 `yaml:"maxValidatorBalance" json:"maxValidatorBalance"` + WithdrawalCredsPrefix string `yaml:"withdrawalCredsPrefix" json:"withdrawalCredsPrefix"` + MinValidatorIndex *uint64 `yaml:"minValidatorIndex" json:"minValidatorIndex"` + MaxValidatorIndex *uint64 `yaml:"maxValidatorIndex" json:"maxValidatorIndex"` + MaxResults int `yaml:"maxResults" json:"maxResults"` + + // Output format options + OutputFormat string `yaml:"outputFormat" json:"outputFormat"` // "full", "pubkeys", "indices" +} + +func DefaultConfig() Config { + return Config{ + MaxResults: 100, + OutputFormat: "full", + ValidatorStatus: []string{ + "pending_initialized", + "pending_queued", + "active_ongoing", + "active_exiting", + "active_slashed", + "exited_unslashed", + "exited_slashed", + "withdrawal_possible", + "withdrawal_done", + }, + } +} + +func (c *Config) Validate() error { + if c.ClientPattern == "" && c.ValidatorNamePattern == "" { + return fmt.Errorf("either clientPattern or validatorNamePattern is required") + } + + if c.MaxResults <= 0 { + return fmt.Errorf("maxResults must be > 0") + } + + if c.MinValidatorIndex != nil && c.MaxValidatorIndex != nil && *c.MinValidatorIndex > *c.MaxValidatorIndex { + return fmt.Errorf("minValidatorIndex must be <= maxValidatorIndex") + } + + if c.MinValidatorBalance != nil && c.MaxValidatorBalance != nil && *c.MinValidatorBalance > *c.MaxValidatorBalance { + return fmt.Errorf("minValidatorBalance must be <= maxValidatorBalance") + } + + validFormats := map[string]bool{"full": true, "pubkeys": true, "indices": true} + if !validFormats[c.OutputFormat] { + return fmt.Errorf("outputFormat must be one of: full, pubkeys, indices") + } + + return nil +} diff --git a/pkg/coordinator/tasks/get_consensus_validators/task.go b/pkg/coordinator/tasks/get_consensus_validators/task.go new file mode 100644 index 00000000..2a294ff2 --- /dev/null +++ b/pkg/coordinator/tasks/get_consensus_validators/task.go @@ -0,0 +1,253 @@ +package getconsensusvalidators + +import ( + "context" + "fmt" + "regexp" + "strings" + "time" + + "github.com/ethpandaops/assertoor/pkg/coordinator/types" + "github.com/ethpandaops/assertoor/pkg/coordinator/vars" + "github.com/sirupsen/logrus" +) + +var ( + TaskName = "get_consensus_validators" + TaskDescriptor = &types.TaskDescriptor{ + Name: TaskName, + Description: "Retrieves validators from the consensus layer matching specified criteria.", + Config: DefaultConfig(), + NewTask: NewTask, + } +) + +type Task struct { + ctx *types.TaskContext + options *types.TaskOptions + config Config + logger logrus.FieldLogger +} + +type ValidatorInfo struct { + Index uint64 `json:"index"` + Pubkey string `json:"pubkey"` + Balance uint64 `json:"balance"` + Status string `json:"status"` + EffectiveBalance uint64 `json:"effectiveBalance"` + WithdrawalCredentials string `json:"withdrawalCredentials"` + ActivationEpoch uint64 `json:"activationEpoch"` + ExitEpoch uint64 `json:"exitEpoch"` + WithdrawableEpoch uint64 `json:"withdrawableEpoch"` + Slashed bool `json:"slashed"` +} + +func NewTask(ctx *types.TaskContext, options *types.TaskOptions) (types.Task, error) { + return &Task{ + ctx: ctx, + options: options, + logger: ctx.Logger.GetLogger(), + }, nil +} + +func (t *Task) Config() interface{} { + return t.config +} + +func (t *Task) Timeout() time.Duration { + return t.options.Timeout.Duration +} + +func (t *Task) LoadConfig() error { + config := DefaultConfig() + + if t.options.Config != nil { + if err := t.options.Config.Unmarshal(&config); err != nil { + return fmt.Errorf("error parsing task config for %v: %w", TaskName, err) + } + } + + err := t.ctx.Vars.ConsumeVars(&config, t.options.ConfigVars) + if err != nil { + return err + } + + if err := config.Validate(); err != nil { + return err + } + + t.config = config + + return nil +} + +//nolint:gocyclo // ignore +func (t *Task) Execute(_ context.Context) error { + // Get client pool and validator names + clientPool := t.ctx.Scheduler.GetServices().ClientPool() + consensusPool := clientPool.GetConsensusPool() + validatorNames := t.ctx.Scheduler.GetServices().ValidatorNames() + + // Get validator set from consensus client + validatorSet := consensusPool.GetValidatorSet() + if validatorSet == nil { + return fmt.Errorf("validator set not available") + } + + matchingValidators := []ValidatorInfo{} + + // Compile validator name pattern regex if provided + var validatorNameRegex *regexp.Regexp + + if t.config.ValidatorNamePattern != "" { + var err error + validatorNameRegex, err = regexp.Compile(t.config.ValidatorNamePattern) + + if err != nil { + return fmt.Errorf("invalid validator name pattern: %v", err) + } + } + + // Compile client pattern regex if provided + var clientNameRegex *regexp.Regexp + + if t.config.ClientPattern != "" { + var err error + + clientNameRegex, err = regexp.Compile(t.config.ClientPattern) + if err != nil { + return fmt.Errorf("invalid client pattern: %v", err) + } + } + + // Iterate through validators and apply filters + for validatorIndex, validator := range validatorSet { + if len(matchingValidators) >= t.config.MaxResults { + break + } + + // Check index range + if t.config.MinValidatorIndex != nil && uint64(validatorIndex) < *t.config.MinValidatorIndex { + continue + } + + if t.config.MaxValidatorIndex != nil && uint64(validatorIndex) > *t.config.MaxValidatorIndex { + continue + } + + // Check balance range + if t.config.MinValidatorBalance != nil && uint64(validator.Balance) < *t.config.MinValidatorBalance { + continue + } + + if t.config.MaxValidatorBalance != nil && uint64(validator.Balance) > *t.config.MaxValidatorBalance { + continue + } + + // Check withdrawal credentials prefix + if t.config.WithdrawalCredsPrefix != "" { + credsHex := fmt.Sprintf("0x%x", validator.Validator.WithdrawalCredentials) + if !strings.HasPrefix(credsHex, t.config.WithdrawalCredsPrefix) { + continue + } + } + + // Check validator status + if len(t.config.ValidatorStatus) > 0 { + statusMatch := false + validatorStatus := validator.Status.String() + + for _, allowedStatus := range t.config.ValidatorStatus { + if validatorStatus == allowedStatus { + statusMatch = true + break + } + } + + if !statusMatch { + continue + } + } + + // Get validator name for pattern matching + validatorName := "" + if validatorNames != nil { + validatorName = validatorNames.GetValidatorName(uint64(validatorIndex)) + } + + // Check validator name pattern + if validatorNameRegex != nil && validatorName != "" { + if !validatorNameRegex.MatchString(validatorName) { + continue + } + } + + // Check client pattern (match validator name against client pattern) + if clientNameRegex != nil && validatorName != "" { + if !clientNameRegex.MatchString(validatorName) { + continue + } + } + + // Create validator info + validatorInfo := ValidatorInfo{ + Index: uint64(validatorIndex), + Pubkey: fmt.Sprintf("0x%x", validator.Validator.PublicKey), + Balance: uint64(validator.Balance), + Status: validator.Status.String(), + EffectiveBalance: uint64(validator.Validator.EffectiveBalance), + WithdrawalCredentials: fmt.Sprintf("0x%x", validator.Validator.WithdrawalCredentials), + ActivationEpoch: uint64(validator.Validator.ActivationEpoch), + ExitEpoch: uint64(validator.Validator.ExitEpoch), + WithdrawableEpoch: uint64(validator.Validator.WithdrawableEpoch), + Slashed: validator.Validator.Slashed, + } + + matchingValidators = append(matchingValidators, validatorInfo) + } + + // Set outputs based on format + switch t.config.OutputFormat { + case "full": + if validatorsData, err := vars.GeneralizeData(matchingValidators); err == nil { + t.ctx.Outputs.SetVar("validators", validatorsData) + } else { + return fmt.Errorf("failed to generalize validators data: %v", err) + } + case "pubkeys": + pubkeys := make([]string, len(matchingValidators)) + for i, validator := range matchingValidators { + pubkeys[i] = validator.Pubkey + } + + if pubkeysData, err := vars.GeneralizeData(pubkeys); err == nil { + t.ctx.Outputs.SetVar("pubkeys", pubkeysData) + } else { + return fmt.Errorf("failed to generalize pubkeys data: %v", err) + } + case "indices": + indices := make([]uint64, len(matchingValidators)) + for i, validator := range matchingValidators { + indices[i] = validator.Index + } + + if indicesData, err := vars.GeneralizeData(indices); err == nil { + t.ctx.Outputs.SetVar("indices", indicesData) + } else { + return fmt.Errorf("failed to generalize indices data: %v", err) + } + } + + // Always set count + t.ctx.Outputs.SetVar("count", len(matchingValidators)) + + t.logger.Infof("Found %d validators matching criteria", len(matchingValidators)) + + if len(matchingValidators) > 0 { + t.ctx.SetResult(types.TaskResultSuccess) + } else { + t.ctx.SetResult(types.TaskResultNone) + } + + return nil +} diff --git a/pkg/coordinator/tasks/tasks.go b/pkg/coordinator/tasks/tasks.go index bd482f49..52c7085b 100644 --- a/pkg/coordinator/tasks/tasks.go +++ b/pkg/coordinator/tasks/tasks.go @@ -8,6 +8,7 @@ import ( checkconsensusblockproposals "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_block_proposals" checkconsensusfinality "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_finality" checkconsensusforks "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_forks" + checkconsensusidentity "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_identity" checkconsensusproposerduty "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_proposer_duty" checkconsensusreorgs "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_reorgs" checkconsensusslotrange "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/check_consensus_slot_range" @@ -26,6 +27,7 @@ import ( generatetransaction "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/generate_transaction" generatewithdrawalrequests "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/generate_withdrawal_requests" getconsensusspecs "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/get_consensus_specs" + getconsensusvalidators "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/get_consensus_validators" checkexecutionblock "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/get_execution_block" getpubkeysfrommnemonic "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/get_pubkeys_from_mnemonic" getrandommnemonic "github.com/ethpandaops/assertoor/pkg/coordinator/tasks/get_random_mnemonic" @@ -47,6 +49,7 @@ var AvailableTaskDescriptors = []*types.TaskDescriptor{ checkconsensusblockproposals.TaskDescriptor, checkconsensusfinality.TaskDescriptor, checkconsensusforks.TaskDescriptor, + checkconsensusidentity.TaskDescriptor, checkconsensusproposerduty.TaskDescriptor, checkconsensusreorgs.TaskDescriptor, checkconsensusslotrange.TaskDescriptor, @@ -67,6 +70,7 @@ var AvailableTaskDescriptors = []*types.TaskDescriptor{ generatewithdrawalrequests.TaskDescriptor, getpubkeysfrommnemonic.TaskDescriptor, getconsensusspecs.TaskDescriptor, + getconsensusvalidators.TaskDescriptor, getrandommnemonic.TaskDescriptor, getwalletdetails.TaskDescriptor, runcommand.TaskDescriptor, diff --git a/pkg/coordinator/web/templates/test_run/test_run.html b/pkg/coordinator/web/templates/test_run/test_run.html index 95269429..c7593de9 100644 --- a/pkg/coordinator/web/templates/test_run/test_run.html +++ b/pkg/coordinator/web/templates/test_run/test_run.html @@ -441,27 +441,22 @@