diff --git a/cmd/ascii.go b/cmd/ascii.go index ca81f1fcbe..54603a69c7 100644 --- a/cmd/ascii.go +++ b/cmd/ascii.go @@ -39,7 +39,7 @@ func validatorASCII() []string { func mevASCII() []string { return []string{ - "__ __ ________ __ ", + " __ __ ________ __ ", "| \\/ | ____\\ \\ / / ", "| \\ / | |__ \\ \\ / / ", "| |\\/| | __| \\ \\/ / ", diff --git a/cmd/cmd.go b/cmd/cmd.go index 1eda3c9cdb..51dc3a7f83 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -48,6 +48,7 @@ func New() *cobra.Command { newCombineCmd(newCombineFunc), newAlphaCmd( newTestCmd( + newTestAllCmd(runTestAll), newTestPeersCmd(runTestPeers), newTestBeaconCmd(runTestBeacon), newTestValidatorCmd(runTestValidator), diff --git a/cmd/test.go b/cmd/test.go index 5e62f1efc3..79d2bc9883 100644 --- a/cmd/test.go +++ b/cmd/test.go @@ -6,8 +6,11 @@ import ( "context" "fmt" "io" + "net/http" + "net/http/httptrace" "os" "os/signal" + "slices" "sort" "strings" "syscall" @@ -16,6 +19,7 @@ import ( "github.com/pelletier/go-toml/v2" "github.com/spf13/cobra" + "github.com/spf13/pflag" "golang.org/x/exp/maps" "github.com/obolnetwork/charon/app/errors" @@ -34,6 +38,7 @@ const ( validatorTestCategory = "validator" mevTestCategory = "mev" performanceTestCategory = "performance" + allTestCategory = "all" ) type testConfig struct { @@ -62,6 +67,13 @@ func bindTestFlags(cmd *cobra.Command, config *testConfig) { cmd.Flags().BoolVar(&config.Quiet, "quiet", false, "Do not print test results to stdout.") } +func bindTestLogFlags(flags *pflag.FlagSet, config *log.Config) { + flags.StringVar(&config.Format, "log-format", "console", "Log format; console, logfmt or json") + flags.StringVar(&config.Level, "log-level", "info", "Log level; debug, info, warn or error") + flags.StringVar(&config.Color, "log-color", "auto", "Log color; auto, force, disable.") + flags.StringVar(&config.LogOutputPath, "log-output-path", "", "Path in which to write on-disk logs.") +} + func listTestCases(cmd *cobra.Command) []string { var testCaseNames []testCaseName switch cmd.Name() { @@ -76,6 +88,16 @@ func listTestCases(cmd *cobra.Command) []string { testCaseNames = maps.Keys(supportedMEVTestCases()) case performanceTestCategory: testCaseNames = maps.Keys(supportedPerformanceTestCases()) + case allTestCategory: + testCaseNames = slices.Concat( + maps.Keys(supportedPeerTestCases()), + maps.Keys(supportedSelfTestCases()), + maps.Keys(supportedRelayTestCases()), + maps.Keys(supportedBeaconTestCases()), + maps.Keys(supportedValidatorTestCases()), + maps.Keys(supportedMEVTestCases()), + maps.Keys(supportedPerformanceTestCases()), + ) default: log.Warn(cmd.Context(), "Unknown command for listing test cases", nil, z.Str("name", cmd.Name())) } @@ -229,12 +251,14 @@ func writeResultToWriter(res testCategoryResult, w io.Writer) error { lines = append(lines, "") lines = append(lines, fmt.Sprintf("%-64s%s", "TEST NAME", "RESULT")) suggestions := []string{} - for target, testResults := range res.Targets { - if target != "" && len(testResults) > 0 { + targets := maps.Keys(res.Targets) + slices.Sort(targets) + for _, target := range targets { + if target != "" && len(res.Targets[target]) > 0 { lines = append(lines, "") lines = append(lines, target) } - for _, singleTestRes := range testResults { + for _, singleTestRes := range res.Targets[target] { testOutput := "" testOutput += fmt.Sprintf("%-64s", singleTestRes.Name) if singleTestRes.Measurement != "" { @@ -273,6 +297,30 @@ func writeResultToWriter(res testCategoryResult, w io.Writer) error { return nil } +func evaluateHighestRTTScores(testResCh chan time.Duration, testRes testResult, avg time.Duration, poor time.Duration) testResult { + highestRTT := time.Duration(0) + for rtt := range testResCh { + if rtt > highestRTT { + highestRTT = rtt + } + } + + return evaluateRTT(highestRTT, testRes, avg, poor) +} + +func evaluateRTT(rtt time.Duration, testRes testResult, avg time.Duration, poor time.Duration) testResult { + if rtt == 0 || rtt > poor { + testRes.Verdict = testVerdictPoor + } else if rtt > avg { + testRes.Verdict = testVerdictAvg + } else { + testRes.Verdict = testVerdictGood + } + testRes.Measurement = Duration{rtt}.String() + + return testRes +} + func calculateScore(results []testResult) categoryScore { // TODO(kalo): calculate score more elaborately (potentially use weights) avg := 0 @@ -348,3 +396,37 @@ func sleepWithContext(ctx context.Context, d time.Duration) { case <-timer.C: } } + +func requestRTT(ctx context.Context, url string, method string, body io.Reader, expectedStatus int) (time.Duration, error) { + var start time.Time + var firstByte time.Duration + + trace := &httptrace.ClientTrace{ + GotFirstResponseByte: func() { + firstByte = time.Since(start) + }, + } + + start = time.Now() + req, err := http.NewRequestWithContext(httptrace.WithClientTrace(ctx, trace), method, url, body) + if err != nil { + return 0, errors.Wrap(err, "create new request with trace and context") + } + + resp, err := http.DefaultTransport.RoundTrip(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + if resp.StatusCode != expectedStatus { + data, err := io.ReadAll(resp.Body) + if err != nil { + log.Warn(ctx, "Unexpected status code", nil, z.Int("status_code", resp.StatusCode), z.Int("expected_status_code", expectedStatus), z.Str("endpoint", url)) + } else { + log.Warn(ctx, "Unexpected status code", nil, z.Int("status_code", resp.StatusCode), z.Int("expected_status_code", expectedStatus), z.Str("endpoint", url), z.Str("body", string(data))) + } + } + + return firstByte, nil +} diff --git a/cmd/testall.go b/cmd/testall.go new file mode 100644 index 0000000000..7aab22fe93 --- /dev/null +++ b/cmd/testall.go @@ -0,0 +1,97 @@ +// Copyright © 2022-2024 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1 + +package cmd + +import ( + "context" + "io" + + "github.com/spf13/cobra" + + "github.com/obolnetwork/charon/app/errors" +) + +type testAllConfig struct { + testConfig + Peers testPeersConfig + Beacon testBeaconConfig + Validator testValidatorConfig + MEV testMEVConfig + Performance testPerformanceConfig +} + +func newTestAllCmd(runFunc func(context.Context, io.Writer, testAllConfig) error) *cobra.Command { + var config testAllConfig + + cmd := &cobra.Command{ + Use: "all", + Short: "Run tests towards peer nodes, beacon nodes, validator client, MEV relays, own hardware and internet connectivity.", + Long: `Run tests towards peer nodes, beacon nodes, validator client, MEV relays, own hardware and internet connectivity. Verify that Charon can efficiently do its duties on the tested setup.`, + Args: cobra.NoArgs, + PreRunE: func(cmd *cobra.Command, _ []string) error { + return mustOutputToFileOnQuiet(cmd) + }, + RunE: func(cmd *cobra.Command, _ []string) error { + return runFunc(cmd.Context(), cmd.OutOrStdout(), config) + }, + } + + bindTestFlags(cmd, &config.testConfig) + + bindTestPeersFlags(cmd, &config.Peers, "peers-") + bindTestBeaconFlags(cmd, &config.Beacon, "beacon-") + bindTestValidatorFlags(cmd, &config.Validator, "validator-") + bindTestMEVFlags(cmd, &config.MEV, "mev-") + bindTestPerformanceFlags(cmd, &config.Performance, "performance-") + + bindP2PFlags(cmd, &config.Peers.P2P) + bindDataDirFlag(cmd.Flags(), &config.Peers.DataDir) + bindTestLogFlags(cmd.Flags(), &config.Peers.Log) + + wrapPreRunE(cmd, func(cmd *cobra.Command, _ []string) error { + testCasesPresent := cmd.Flags().Lookup("test-cases").Changed + + if testCasesPresent { + //nolint:revive // we use our own version of the errors package + return errors.New("test-cases cannot be specified when explicitly running all test cases.") + } + + return nil + }) + + return cmd +} + +func runTestAll(ctx context.Context, w io.Writer, cfg testAllConfig) (err error) { + cfg.Beacon.testConfig = cfg.testConfig + err = runTestBeacon(ctx, w, cfg.Beacon) + if err != nil { + return err + } + + cfg.Validator.testConfig = cfg.testConfig + err = runTestValidator(ctx, w, cfg.Validator) + if err != nil { + return err + } + + cfg.MEV.testConfig = cfg.testConfig + err = runTestMEV(ctx, w, cfg.MEV) + if err != nil { + return err + } + + cfg.Performance.testConfig = cfg.testConfig + err = runTestPerformance(ctx, w, cfg.Performance) + if err != nil { + return err + } + + cfg.Peers.testConfig = cfg.testConfig + err = runTestPeers(ctx, w, cfg.Peers) + if err != nil { + return err + } + + return nil +} diff --git a/cmd/testbeacon.go b/cmd/testbeacon.go index e865ff01f5..ce75cfd274 100644 --- a/cmd/testbeacon.go +++ b/cmd/testbeacon.go @@ -10,7 +10,6 @@ import ( "math" "math/rand" "net/http" - "net/http/httptrace" "os" "path/filepath" "sort" @@ -177,20 +176,18 @@ func newTestBeaconCmd(runFunc func(context.Context, io.Writer, testBeaconConfig) } bindTestFlags(cmd, &config.testConfig) - bindTestBeaconFlags(cmd, &config) + bindTestBeaconFlags(cmd, &config, "") return cmd } -func bindTestBeaconFlags(cmd *cobra.Command, config *testBeaconConfig) { - const endpoints = "endpoints" - cmd.Flags().StringSliceVar(&config.Endpoints, endpoints, nil, "[REQUIRED] Comma separated list of one or more beacon node endpoint URLs.") - mustMarkFlagRequired(cmd, endpoints) - cmd.Flags().BoolVar(&config.LoadTest, "load-test", false, "Enable load test, not advisable when testing towards external beacon nodes.") - cmd.Flags().DurationVar(&config.LoadTestDuration, "load-test-duration", 5*time.Second, "Time to keep running the load tests in seconds. For each second a new continuous ping instance is spawned.") - cmd.Flags().StringVar(&config.SimulationFileDir, "simulation-file-dir", "./", "JSON directory to which simulation file results will be written.") - cmd.Flags().IntVar(&config.SimulationDuration, "simulation-duration-in-slots", slotsInEpoch, "Time to keep running the simulation in slots.") - cmd.Flags().BoolVar(&config.SimulationVerbose, "simulation-verbose", false, "Show results for each request and each validator.") +func bindTestBeaconFlags(cmd *cobra.Command, config *testBeaconConfig, flagsPrefix string) { + cmd.Flags().StringSliceVar(&config.Endpoints, flagsPrefix+"endpoints", nil, "[REQUIRED] Comma separated list of one or more beacon node endpoint URLs.") + cmd.Flags().BoolVar(&config.LoadTest, flagsPrefix+"load-test", false, "Enable load test, not advisable when testing towards external beacon nodes.") + cmd.Flags().DurationVar(&config.LoadTestDuration, flagsPrefix+"load-test-duration", 5*time.Second, "Time to keep running the load tests in seconds. For each second a new continuous ping instance is spawned.") + cmd.Flags().IntVar(&config.SimulationDuration, flagsPrefix+"simulation-duration-in-slots", slotsInEpoch, "Time to keep running the simulation in slots.") + cmd.Flags().BoolVar(&config.SimulationVerbose, flagsPrefix+"simulation-verbose", false, "Show results for each request and each validator.") + mustMarkFlagRequired(cmd, flagsPrefix+"endpoints") } func supportedBeaconTestCases() map[testCaseName]testCaseBeacon { @@ -210,6 +207,8 @@ func supportedBeaconTestCases() map[testCaseName]testCaseBeacon { } func runTestBeacon(ctx context.Context, w io.Writer, cfg testBeaconConfig) (err error) { + log.Info(ctx, "Starting beacon node test") + testCases := supportedBeaconTestCases() queuedTests := filterTests(maps.Keys(testCases), cfg.testConfig) if len(queuedTests) == 0 { @@ -266,6 +265,8 @@ func runTestBeacon(ctx context.Context, w io.Writer, cfg testBeaconConfig) (err return nil } +// beacon node tests + func testAllBeacons(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCaseBeacon, conf testBeaconConfig, allBeaconsResCh chan map[string][]testResult) { defer close(allBeaconsResCh) // run tests for all beacon nodes @@ -366,36 +367,6 @@ func beaconPingTest(ctx context.Context, _ *testBeaconConfig, target string) tes return testRes } -func beaconPingOnce(ctx context.Context, target string) (time.Duration, error) { - var start time.Time - var firstByte time.Duration - - trace := &httptrace.ClientTrace{ - GotFirstResponseByte: func() { - firstByte = time.Since(start) - }, - } - - start = time.Now() - targetEndpoint := fmt.Sprintf("%v/eth/v1/node/health", target) - req, err := http.NewRequestWithContext(httptrace.WithClientTrace(ctx, trace), http.MethodGet, targetEndpoint, nil) - if err != nil { - return 0, errors.Wrap(err, "create new request with trace and context") - } - - resp, err := http.DefaultTransport.RoundTrip(req) - if err != nil { - return 0, err - } - defer resp.Body.Close() - - if resp.StatusCode > 399 { - return 0, errors.New(httpStatusError(resp.StatusCode)) - } - - return firstByte, nil -} - func beaconPingMeasureTest(ctx context.Context, _ *testBeaconConfig, target string) testResult { testRes := testResult{Name: "PingMeasure"} @@ -404,34 +375,11 @@ func beaconPingMeasureTest(ctx context.Context, _ *testBeaconConfig, target stri return failedTestResult(testRes, err) } - if rtt > thresholdBeaconMeasurePoor { - testRes.Verdict = testVerdictPoor - } else if rtt > thresholdBeaconMeasureAvg { - testRes.Verdict = testVerdictAvg - } else { - testRes.Verdict = testVerdictGood - } - testRes.Measurement = Duration{rtt}.String() + testRes = evaluateRTT(rtt, testRes, thresholdBeaconMeasureAvg, thresholdBeaconMeasurePoor) return testRes } -func pingBeaconContinuously(ctx context.Context, target string, resCh chan<- time.Duration) { - for { - rtt, err := beaconPingOnce(ctx, target) - if err != nil { - return - } - select { - case <-ctx.Done(): - return - case resCh <- rtt: - awaitTime := rand.Intn(100) //nolint:gosec // weak generator is not an issue here - sleepWithContext(ctx, time.Duration(awaitTime)*time.Millisecond) - } - } -} - func beaconPingLoadTest(ctx context.Context, conf *testBeaconConfig, target string) testResult { testRes := testResult{Name: "BeaconLoad"} if !conf.LoadTest { @@ -465,20 +413,7 @@ func beaconPingLoadTest(ctx context.Context, conf *testBeaconConfig, target stri close(testResCh) log.Info(ctx, "Ping load tests finished", z.Any("target", target)) - highestRTT := time.Duration(0) - for rtt := range testResCh { - if rtt > highestRTT { - highestRTT = rtt - } - } - if highestRTT > thresholdBeaconLoadPoor { - testRes.Verdict = testVerdictPoor - } else if highestRTT > thresholdBeaconLoadAvg { - testRes.Verdict = testVerdictAvg - } else { - testRes.Verdict = testVerdictGood - } - testRes.Measurement = Duration{highestRTT}.String() + testRes = evaluateHighestRTTScores(testResCh, testRes, thresholdBeaconLoadAvg, thresholdBeaconLoadPoor) return testRes } @@ -578,6 +513,30 @@ func beaconPeerCountTest(ctx context.Context, _ *testBeaconConfig, target string return testRes } +// helper functions + +func beaconPingOnce(ctx context.Context, target string) (time.Duration, error) { + return requestRTT(ctx, fmt.Sprintf("%v/eth/v1/node/health", target), http.MethodGet, nil, 200) +} + +func pingBeaconContinuously(ctx context.Context, target string, resCh chan<- time.Duration) { + for { + rtt, err := beaconPingOnce(ctx, target) + if err != nil { + return + } + select { + case <-ctx.Done(): + return + case resCh <- rtt: + awaitTime := rand.Intn(100) //nolint:gosec // weak generator is not an issue here + sleepWithContext(ctx, time.Duration(awaitTime)*time.Millisecond) + } + } +} + +// beacon simulation tests + func beaconSimulation1Test(ctx context.Context, conf *testBeaconConfig, target string) testResult { testRes := testResult{Name: "BeaconSimulation1Validator"} if !conf.LoadTest { @@ -808,6 +767,8 @@ func beaconSimulationTest(ctx context.Context, conf *testBeaconConfig, target st return testRes } +// requests per 1 cluster + func singleClusterSimulation(ctx context.Context, simulationDuration time.Duration, target string, resultCh chan SimulationCluster, wgDone func()) { defer wgDone() // per slot requests @@ -1120,6 +1081,8 @@ func clusterGeneralRequests( } } +// requests per 1 validator + func singleValidatorSimulation(ctx context.Context, simulationDuration time.Duration, target string, resultCh chan SimulationSingleValidator, intensity RequestsIntensity, dutiesPerformed DutiesPerformed, wg *sync.WaitGroup) { defer wg.Done() // attestations @@ -1350,6 +1313,41 @@ func singleValidatorSimulation(ctx context.Context, simulationDuration time.Dura } } +func attestationDuty(ctx context.Context, target string, simulationDuration time.Duration, tickTime time.Duration, getAttestationDataCh chan time.Duration, submitAttestationObjectCh chan time.Duration) { + defer close(getAttestationDataCh) + defer close(submitAttestationObjectCh) + pingCtx, cancel := context.WithTimeout(ctx, simulationDuration) + defer cancel() + + time.Sleep(randomizeStart(tickTime)) + ticker := time.NewTicker(tickTime) + defer ticker.Stop() + slot, err := getCurrentSlot(ctx, target) + if err != nil { + log.Error(ctx, "Failed to get current slot", err) + slot = 1 + } + for pingCtx.Err() == nil { + getResult, err := getAttestationData(ctx, target, slot, rand.Intn(committeeSizePerSlot)) //nolint:gosec // weak generator is not an issue here + if err != nil && !errors.Is(err, context.Canceled) { + log.Error(ctx, "Unexpected getAttestationData failure", err) + } + getAttestationDataCh <- getResult + + submitResult, err := submitAttestationObject(ctx, target) + if err != nil && !errors.Is(err, context.Canceled) { + log.Error(ctx, "Unexpected submitAttestationObject failure", err) + } + submitAttestationObjectCh <- submitResult + + select { + case <-pingCtx.Done(): + case <-ticker.C: + slot += int(tickTime.Seconds()) / int(slotTime.Seconds()) + } + } +} + func aggregationDuty(ctx context.Context, target string, simulationDuration time.Duration, tickTime time.Duration, getAggregateAttestationsCh chan time.Duration, submitAggregateAndProofsCh chan time.Duration) { defer close(getAggregateAttestationsCh) defer close(submitAggregateAndProofsCh) @@ -1417,41 +1415,6 @@ func proposalDuty(ctx context.Context, target string, simulationDuration time.Du } } -func attestationDuty(ctx context.Context, target string, simulationDuration time.Duration, tickTime time.Duration, getAttestationDataCh chan time.Duration, submitAttestationObjectCh chan time.Duration) { - defer close(getAttestationDataCh) - defer close(submitAttestationObjectCh) - pingCtx, cancel := context.WithTimeout(ctx, simulationDuration) - defer cancel() - - time.Sleep(randomizeStart(tickTime)) - ticker := time.NewTicker(tickTime) - defer ticker.Stop() - slot, err := getCurrentSlot(ctx, target) - if err != nil { - log.Error(ctx, "Failed to get current slot", err) - slot = 1 - } - for pingCtx.Err() == nil { - getResult, err := getAttestationData(ctx, target, slot, rand.Intn(committeeSizePerSlot)) //nolint:gosec // weak generator is not an issue here - if err != nil && !errors.Is(err, context.Canceled) { - log.Error(ctx, "Unexpected getAttestationData failure", err) - } - getAttestationDataCh <- getResult - - submitResult, err := submitAttestationObject(ctx, target) - if err != nil && !errors.Is(err, context.Canceled) { - log.Error(ctx, "Unexpected submitAttestationObject failure", err) - } - submitAttestationObjectCh <- submitResult - - select { - case <-pingCtx.Done(): - case <-ticker.C: - slot += int(tickTime.Seconds()) / int(slotTime.Seconds()) - } - } -} - func syncCommitteeDuties( ctx context.Context, target string, simulationDuration time.Duration, tickTimeSubmit time.Duration, tickTimeSubscribe time.Duration, tickTimeContribution time.Duration, @@ -1538,6 +1501,8 @@ func syncCommitteeMessageDuty(ctx context.Context, target string, simulationDura } } +// simulation helper functions + func getCurrentSlot(ctx context.Context, target string) (int, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, target+"/eth/v1/node/syncing", nil) if err != nil { @@ -1723,41 +1688,8 @@ func randomizeStart(tickTime time.Duration) time.Duration { return slotTime * time.Duration(rand.Intn(int((tickTime / slotTime)))) //nolint:gosec // weak generator is not an issue here } -func requestRTT(ctx context.Context, url string, method string, body io.Reader, expectedStatus int) (time.Duration, error) { - var start time.Time - var firstByte time.Duration +// simulation http requests - cluster - trace := &httptrace.ClientTrace{ - GotFirstResponseByte: func() { - firstByte = time.Since(start) - }, - } - - start = time.Now() - req, err := http.NewRequestWithContext(httptrace.WithClientTrace(ctx, trace), method, url, body) - if err != nil { - return 0, errors.Wrap(err, "create new request with trace and context") - } - - resp, err := http.DefaultTransport.RoundTrip(req) - if err != nil { - return 0, err - } - defer resp.Body.Close() - - if resp.StatusCode != expectedStatus { - data, err := io.ReadAll(resp.Body) - if err != nil { - log.Warn(ctx, "Unexpected status code", nil, z.Int("status_code", resp.StatusCode), z.Int("expected_status_code", expectedStatus), z.Str("endpoint", url)) - } else { - log.Warn(ctx, "Unexpected status code", nil, z.Int("status_code", resp.StatusCode), z.Int("expected_status_code", expectedStatus), z.Str("endpoint", url), z.Str("body", string(data))) - } - } - - return firstByte, nil -} - -// cluster requests func getAttestationsForBlock(ctx context.Context, target string, block int) (time.Duration, error) { return requestRTT(ctx, fmt.Sprintf("%v/eth/v1/beacon/blocks/%v/attestations", target, block), http.MethodGet, nil, 200) } @@ -1811,7 +1743,8 @@ func nodeVersion(ctx context.Context, target string) (time.Duration, error) { return requestRTT(ctx, fmt.Sprintf("%v/eth/v1/node/version", target), http.MethodGet, nil, 200) } -// attestation duty requests +// simulation http requests - attestation duty + func getAttestationData(ctx context.Context, target string, slot int, committeeIndex int) (time.Duration, error) { return requestRTT(ctx, fmt.Sprintf("%v/eth/v1/validator/attestation_data?slot=%v&committee_index=%v", target, slot, committeeIndex), http.MethodGet, nil, 200) } @@ -1821,7 +1754,8 @@ func submitAttestationObject(ctx context.Context, target string) (time.Duration, return requestRTT(ctx, fmt.Sprintf("%v/eth/v1/beacon/pool/attestations", target), http.MethodPost, body, 400) } -// aggregation duty requests +// simulation http requests - aggregation duty + func getAggregateAttestations(ctx context.Context, target string, slot int, attestationDataRoot string) (time.Duration, error) { return requestRTT(ctx, fmt.Sprintf("%v/eth/v1/validator/aggregate_attestation?slot=%v&attestation_data_root=%v", target, slot, attestationDataRoot), http.MethodGet, nil, 404) } @@ -1831,7 +1765,8 @@ func postAggregateAndProofs(ctx context.Context, target string) (time.Duration, return requestRTT(ctx, fmt.Sprintf("%v/eth/v1/validator/aggregate_and_proofs", target), http.MethodPost, body, 400) } -// proposal duty requests +// simulation http requests - proposal duty + func produceBlock(ctx context.Context, target string, slot int, randaoReveal string) (time.Duration, error) { return requestRTT(ctx, fmt.Sprintf("%v/eth/v3/validator/blocks/%v?randao_reveal=%v", target, slot, randaoReveal), http.MethodGet, nil, 200) } @@ -1841,7 +1776,8 @@ func publishBlindedBlock(ctx context.Context, target string) (time.Duration, err return requestRTT(ctx, fmt.Sprintf("%v/eth/v2/beacon/blinded", target), http.MethodPost, body, 404) } -// sync committee duty requests +// simulation http requests - sync committee duty + func submitSyncCommittee(ctx context.Context, target string) (time.Duration, error) { body := strings.NewReader(`{{"aggregation_bits":"0x01","signature":"0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505","data":{"slot":"1","index":"1","beacon_block_root":"0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2","source":{"epoch":"1","root":"0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},"target":{"epoch":"1","root":"0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"}}}`) return requestRTT(ctx, fmt.Sprintf("%v/eth/v1/beacon/pool/sync_committees", target), http.MethodPost, body, 400) diff --git a/cmd/testmev.go b/cmd/testmev.go index 709b118572..0bf49e395e 100644 --- a/cmd/testmev.go +++ b/cmd/testmev.go @@ -7,7 +7,7 @@ import ( "fmt" "io" "net/http" - "net/http/httptrace" + "strings" "time" "github.com/spf13/cobra" @@ -15,6 +15,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/obolnetwork/charon/app/errors" + "github.com/obolnetwork/charon/app/log" ) type testMEVConfig struct { @@ -34,8 +35,8 @@ func newTestMEVCmd(runFunc func(context.Context, io.Writer, testMEVConfig) error cmd := &cobra.Command{ Use: "mev", - Short: "Run multiple tests towards mev nodes", - Long: `Run multiple tests towards mev nodes. Verify that Charon can efficiently interact with MEV Node(s).`, + Short: "Run multiple tests towards MEV relays", + Long: `Run multiple tests towards MEV relays. Verify that Charon can efficiently interact with MEV relay(s).`, Args: cobra.NoArgs, PreRunE: func(cmd *cobra.Command, _ []string) error { return mustOutputToFileOnQuiet(cmd) @@ -46,13 +47,13 @@ func newTestMEVCmd(runFunc func(context.Context, io.Writer, testMEVConfig) error } bindTestFlags(cmd, &config.testConfig) - bindTestMEVFlags(cmd, &config) + bindTestMEVFlags(cmd, &config, "") return cmd } -func bindTestMEVFlags(cmd *cobra.Command, config *testMEVConfig) { - const endpoints = "endpoints" +func bindTestMEVFlags(cmd *cobra.Command, config *testMEVConfig, flagsPrefix string) { + endpoints := flagsPrefix + "endpoints" cmd.Flags().StringSliceVar(&config.Endpoints, endpoints, nil, "[REQUIRED] Comma separated list of one or more MEV relay endpoint URLs.") mustMarkFlagRequired(cmd, endpoints) } @@ -65,6 +66,8 @@ func supportedMEVTestCases() map[testCaseName]testCaseMEV { } func runTestMEV(ctx context.Context, w io.Writer, cfg testMEVConfig) (err error) { + log.Info(ctx, "Starting MEV relays test") + testCases := supportedMEVTestCases() queuedTests := filterTests(maps.Keys(testCases), cfg.testConfig) if len(queuedTests) == 0 { @@ -121,6 +124,8 @@ func runTestMEV(ctx context.Context, w io.Writer, cfg testMEVConfig) (err error) return nil } +// mev relays tests + func testAllMEVs(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCaseMEV, conf testMEVConfig, allMEVsResCh chan map[string][]testResult) { defer close(allMEVsResCh) // run tests for all mev nodes @@ -179,7 +184,8 @@ func testSingleMEV(ctx context.Context, queuedTestCases []testCaseName, allTestC } } - resCh <- map[string][]testResult{target: allTestRes} + relayName := formatMEVRelayName(target) + resCh <- map[string][]testResult{relayName: allTestRes} return nil } @@ -223,40 +229,35 @@ func mevPingTest(ctx context.Context, _ *testMEVConfig, target string) testResul func mevPingMeasureTest(ctx context.Context, _ *testMEVConfig, target string) testResult { testRes := testResult{Name: "PingMeasure"} - var start time.Time - var firstByte time.Duration - - trace := &httptrace.ClientTrace{ - GotFirstResponseByte: func() { - firstByte = time.Since(start) - }, - } - - start = time.Now() - targetEndpoint := fmt.Sprintf("%v/eth/v1/builder/status", target) - req, err := http.NewRequestWithContext(httptrace.WithClientTrace(ctx, trace), http.MethodGet, targetEndpoint, nil) + rtt, err := requestRTT(ctx, fmt.Sprintf("%v/eth/v1/builder/status", target), http.MethodGet, nil, 200) if err != nil { return failedTestResult(testRes, err) } - resp, err := http.DefaultTransport.RoundTrip(req) - if err != nil { - return failedTestResult(testRes, err) - } - defer resp.Body.Close() + testRes = evaluateRTT(rtt, testRes, thresholdMEVMeasureAvg, thresholdMEVMeasurePoor) - if resp.StatusCode > 399 { - return failedTestResult(testRes, errors.New(httpStatusError(resp.StatusCode))) - } + return testRes +} + +// helper functions - if firstByte > thresholdMEVMeasurePoor { - testRes.Verdict = testVerdictPoor - } else if firstByte > thresholdMEVMeasureAvg { - testRes.Verdict = testVerdictAvg - } else { - testRes.Verdict = testVerdictGood +// Shorten the hash of the MEV relay endpoint +// Example: https://0xac6e77dfe25ecd6110b8e780608cce0dab71fdd5ebea22a16c0205200f2f8e2e3ad3b71d3499c54ad14d6c21b41a37ae@boost-relay.flashbots.net +// to https://0xac6e...37ae@boost-relay.flashbots.net +func formatMEVRelayName(urlString string) string { + splitScheme := strings.Split(urlString, "://") + if len(splitScheme) == 1 { + return urlString + } + hashSplit := strings.Split(splitScheme[1], "@") + if len(hashSplit) == 1 { + return urlString } - testRes.Measurement = Duration{firstByte}.String() + hash := hashSplit[0] + if !strings.HasPrefix(hash, "0x") || len(hash) < 18 { + return urlString + } + hashShort := hash[:6] + "..." + hash[len(hash)-4:] - return testRes + return splitScheme[0] + "://" + hashShort + "@" + hashSplit[1] } diff --git a/cmd/testpeers.go b/cmd/testpeers.go index 77312360ea..987c448c58 100644 --- a/cmd/testpeers.go +++ b/cmd/testpeers.go @@ -13,7 +13,6 @@ import ( "math/rand" "net" "net/http" - "net/http/httptrace" "os" "slices" "strings" @@ -26,7 +25,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/protocol/ping" "github.com/spf13/cobra" - "github.com/spf13/pflag" "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" @@ -83,7 +81,7 @@ func newTestPeersCmd(runFunc func(context.Context, io.Writer, testPeersConfig) e } bindTestFlags(cmd, &config.testConfig) - bindTestPeersFlags(cmd, &config) + bindTestPeersFlags(cmd, &config, "") bindP2PFlags(cmd, &config.P2P) bindDataDirFlag(cmd.Flags(), &config.DataDir) bindTestLogFlags(cmd.Flags(), &config.Log) @@ -116,21 +114,13 @@ func newTestPeersCmd(runFunc func(context.Context, io.Writer, testPeersConfig) e return cmd } -func bindTestPeersFlags(cmd *cobra.Command, config *testPeersConfig) { - const enrs = "enrs" - cmd.Flags().StringSliceVar(&config.ENRs, enrs, nil, "Comma-separated list of each peer ENR address.") - cmd.Flags().DurationVar(&config.KeepAlive, "keep-alive", 30*time.Minute, "Time to keep TCP node alive after test completion, so connection is open for other peers to test on their end.") - cmd.Flags().DurationVar(&config.LoadTestDuration, "load-test-duration", 30*time.Second, "Time to keep running the load tests in seconds. For each second a new continuous ping instance is spawned.") - cmd.Flags().DurationVar(&config.DirectConnectionTimeout, "direct-connection-timeout", 2*time.Minute, "Time to keep trying to establish direct connection to peer.") - cmd.Flags().StringVar(&config.ClusterLockFilePath, "cluster-lock-file-path", "", "Path to cluster lock file, used to fetch peers' ENR addresses.") - cmd.Flags().StringVar(&config.ClusterDefinitionFilePath, "cluster-definition-file-path", "", "Path to cluster definition file, used to fetch peers' ENR addresses.") -} - -func bindTestLogFlags(flags *pflag.FlagSet, config *log.Config) { - flags.StringVar(&config.Format, "log-format", "console", "Log format; console, logfmt or json") - flags.StringVar(&config.Level, "log-level", "info", "Log level; debug, info, warn or error") - flags.StringVar(&config.Color, "log-color", "auto", "Log color; auto, force, disable.") - flags.StringVar(&config.LogOutputPath, "log-output-path", "", "Path in which to write on-disk logs.") +func bindTestPeersFlags(cmd *cobra.Command, config *testPeersConfig, flagsPrefix string) { + cmd.Flags().StringSliceVar(&config.ENRs, flagsPrefix+"enrs", nil, "[REQUIRED] Comma-separated list of each peer ENR address.") + cmd.Flags().DurationVar(&config.KeepAlive, flagsPrefix+"keep-alive", 30*time.Minute, "Time to keep TCP node alive after test completion, so connection is open for other peers to test on their end.") + cmd.Flags().DurationVar(&config.LoadTestDuration, flagsPrefix+"load-test-duration", 30*time.Second, "Time to keep running the load tests in seconds. For each second a new continuous ping instance is spawned.") + cmd.Flags().DurationVar(&config.DirectConnectionTimeout, flagsPrefix+"direct-connection-timeout", 2*time.Minute, "Time to keep trying to establish direct connection to peer.") + cmd.Flags().StringVar(&config.ClusterLockFilePath, flagsPrefix+"cluster-lock-file-path", "", "Path to cluster lock file, used to fetch peers' ENR addresses.") + cmd.Flags().StringVar(&config.ClusterDefinitionFilePath, flagsPrefix+"cluster-definition-file-path", "", "Path to cluster definition file, used to fetch peers' ENR addresses.") } func supportedPeerTestCases() map[testCaseName]testCasePeer { @@ -155,196 +145,9 @@ func supportedSelfTestCases() map[testCaseName]testCasePeerSelf { } } -func fetchPeersFromDefinition(path string) ([]string, error) { - f, err := os.ReadFile(path) - if err != nil { - return nil, errors.Wrap(err, "read definition file", z.Str("path", path)) - } - - var def cluster.Definition - err = json.Unmarshal(f, &def) - if err != nil { - return nil, errors.Wrap(err, "unmarshal definition json", z.Str("path", path)) - } - - var enrs []string - for _, o := range def.Operators { - enrs = append(enrs, o.ENR) - } - - if len(enrs) == 0 { - return nil, errors.New("no peers found in lock", z.Str("path", path)) - } - - return enrs, nil -} - -func fetchPeersFromLock(path string) ([]string, error) { - f, err := os.ReadFile(path) - if err != nil { - return nil, errors.Wrap(err, "read lock file", z.Str("path", path)) - } - - var lock cluster.Lock - err = json.Unmarshal(f, &lock) - if err != nil { - return nil, errors.Wrap(err, "unmarshal lock json", z.Str("path", path)) - } - - var enrs []string - for _, o := range lock.Operators { - enrs = append(enrs, o.ENR) - } - - if len(enrs) == 0 { - return nil, errors.New("no peers found in lock", z.Str("path", path)) - } - - return enrs, nil -} - -func fetchENRs(conf testPeersConfig) ([]string, error) { - var enrs []string - var err error - switch { - case len(conf.ENRs) != 0: - enrs = conf.ENRs - case conf.ClusterDefinitionFilePath != "": - enrs, err = fetchPeersFromDefinition(conf.ClusterDefinitionFilePath) - if err != nil { - return nil, err - } - case conf.ClusterLockFilePath != "": - enrs, err = fetchPeersFromLock(conf.ClusterLockFilePath) - if err != nil { - return nil, err - } - } - - return enrs, nil -} - -func startTCPNode(ctx context.Context, conf testPeersConfig) (host.Host, func(), error) { - enrs, err := fetchENRs(conf) - if err != nil { - return nil, nil, err - } - - var peers []p2p.Peer - for i, enrString := range enrs { - enrRecord, err := enr.Parse(enrString) - if err != nil { - return nil, nil, errors.Wrap(err, "decode enr", z.Str("enr", enrString)) - } - - p2pPeer, err := p2p.NewPeerFromENR(enrRecord, i) - if err != nil { - return nil, nil, err - } - - peers = append(peers, p2pPeer) - } - - p2pPrivKey, err := p2p.LoadPrivKey(conf.DataDir) - if err != nil { - return nil, nil, err - } - - meENR, err := enr.New(p2pPrivKey) - if err != nil { - return nil, nil, err - } - - mePeer, err := p2p.NewPeerFromENR(meENR, len(enrs)) - if err != nil { - return nil, nil, err - } - - log.Info(ctx, "Self p2p name resolved", z.Any("name", mePeer.Name)) - - peers = append(peers, mePeer) - - allENRs := enrs - allENRs = append(allENRs, meENR.String()) - slices.Sort(allENRs) - allENRsString := strings.Join(allENRs, ",") - allENRsHash := sha256.Sum256([]byte(allENRsString)) - - return setupP2P(ctx, p2pPrivKey, conf.P2P, peers, allENRsHash[:]) -} - -func setupP2P(ctx context.Context, privKey *k1.PrivateKey, conf p2p.Config, peers []p2p.Peer, enrsHash []byte) (host.Host, func(), error) { - var peerIDs []peer.ID - for _, peer := range peers { - peerIDs = append(peerIDs, peer.ID) - } - - if err := p2p.VerifyP2PKey(peers, privKey); err != nil { - return nil, nil, err - } - - relays, err := p2p.NewRelays(ctx, conf.Relays, hex.EncodeToString(enrsHash)) - if err != nil { - return nil, nil, err - } - - connGater, err := p2p.NewConnGater(peerIDs, relays) - if err != nil { - return nil, nil, err - } - - tcpNode, err := p2p.NewTCPNode(ctx, conf, privKey, connGater, false) - if err != nil { - return nil, nil, err - } - - p2p.RegisterConnectionLogger(ctx, tcpNode, peerIDs) - - for _, relay := range relays { - go p2p.NewRelayReserver(tcpNode, relay)(ctx) - } - - go p2p.NewRelayRouter(tcpNode, peerIDs, relays)(ctx) - - return tcpNode, func() { - err := tcpNode.Close() - if err != nil && !errors.Is(err, context.Canceled) { - log.Error(ctx, "Close TCP node", err) - } - }, nil -} - -func pingPeerOnce(ctx context.Context, tcpNode host.Host, peer p2p.Peer) (ping.Result, error) { - pingSvc := ping.NewPingService(tcpNode) - pingCtx, cancel := context.WithCancel(ctx) - defer cancel() - pingChan := pingSvc.Ping(pingCtx, peer.ID) - result, ok := <-pingChan - if !ok { - return ping.Result{}, errors.New("ping channel closed") - } - - return result, nil -} - -func pingPeerContinuously(ctx context.Context, tcpNode host.Host, peer p2p.Peer, resCh chan<- ping.Result) { - for { - r, err := pingPeerOnce(ctx, tcpNode, peer) - if err != nil { - return - } - - select { - case <-ctx.Done(): - return - case resCh <- r: - awaitTime := rand.Intn(100) //nolint:gosec // weak generator is not an issue here - sleepWithContext(ctx, time.Duration(awaitTime)*time.Millisecond) - } - } -} - func runTestPeers(ctx context.Context, w io.Writer, conf testPeersConfig) error { + log.Info(ctx, "Starting charon peers and relays test") + relayTestCases := supportedRelayTestCases() queuedTestsRelay := filterTests(maps.Keys(relayTestCases), conf.testConfig) sortTests(queuedTestsRelay) @@ -438,57 +241,77 @@ func runTestPeers(ctx context.Context, w io.Writer, conf testPeersConfig) error return nil } -func testAllRelays(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCaseRelay, conf testPeersConfig, allRelaysResCh chan map[string][]testResult) error { - // run tests for all relays - allRelayRes := make(map[string][]testResult) - singleRelayResCh := make(chan map[string][]testResult) +// charon peers tests + +func testAllPeers(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCasePeer, conf testPeersConfig, tcpNode host.Host, allPeersResCh chan map[string][]testResult) error { + // run tests for all peer nodes + allPeersRes := make(map[string][]testResult) + singlePeerResCh := make(chan map[string][]testResult) group, _ := errgroup.WithContext(ctx) - for _, relay := range conf.P2P.Relays { + enrs, err := fetchENRs(conf) + if err != nil { + return err + } + for _, enr := range enrs { + currENR := enr // TODO: can be removed after go1.22 version bump group.Go(func() error { - return testSingleRelay(ctx, queuedTestCases, allTestCases, conf, relay, singleRelayResCh) + return testSinglePeer(ctx, queuedTestCases, allTestCases, conf, tcpNode, currENR, singlePeerResCh) }) } doneReading := make(chan bool) go func() { - for singleRelayRes := range singleRelayResCh { - maps.Copy(allRelayRes, singleRelayRes) + for singlePeerRes := range singlePeerResCh { + maps.Copy(allPeersRes, singlePeerRes) } doneReading <- true }() - err := group.Wait() + err = group.Wait() if err != nil { - return errors.Wrap(err, "relays test errgroup") + return errors.Wrap(err, "peers test errgroup") } - close(singleRelayResCh) + close(singlePeerResCh) <-doneReading - allRelaysResCh <- allRelayRes + allPeersResCh <- allPeersRes return nil } -func testSingleRelay(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCaseRelay, conf testPeersConfig, target string, allTestResCh chan map[string][]testResult) error { +func testSinglePeer(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCasePeer, conf testPeersConfig, tcpNode host.Host, target string, allTestResCh chan map[string][]testResult) error { singleTestResCh := make(chan testResult) allTestRes := []testResult{} - relayName := fmt.Sprintf("relay %v", target) + enrTarget, err := enr.Parse(target) + if err != nil { + return err + } + peerTarget, err := p2p.NewPeerFromENR(enrTarget, 0) + if err != nil { + return err + } + + formatENR := target[:13] + "..." + target[len(target)-4:] // enr:- + first 8 chars + ... + last 4 chars + nameENR := fmt.Sprintf("peer %v %v", peerTarget.Name, formatENR) + if len(queuedTestCases) == 0 { - allTestResCh <- map[string][]testResult{relayName: allTestRes} + allTestResCh <- map[string][]testResult{nameENR: allTestRes} return nil } - // run all relay tests for a relay, pushing each completed test to the channel until all are complete or timeout occurs - go runRelayTest(ctx, queuedTestCases, allTestCases, conf, target, singleTestResCh) + // run all peers tests for a peer, pushing each completed test to the channel until all are complete or timeout occurs + go runPeerTest(ctx, queuedTestCases, allTestCases, conf, tcpNode, peerTarget, singleTestResCh) testCounter := 0 finished := false for !finished { var testName string select { case <-ctx.Done(): - testName = queuedTestCases[testCounter].name - allTestRes = append(allTestRes, testResult{Name: testName, Verdict: testVerdictFail, Error: errTimeoutInterrupted}) + if testCounter < len(queuedTestCases) { + testName = queuedTestCases[testCounter].name + allTestRes = append(allTestRes, testResult{Name: testName, Verdict: testVerdictFail, Error: errTimeoutInterrupted}) + } finished = true case result, ok := <-singleTestResCh: if !ok { @@ -502,165 +325,20 @@ func testSingleRelay(ctx context.Context, queuedTestCases []testCaseName, allTes } } - allTestResCh <- map[string][]testResult{relayName: allTestRes} + allTestResCh <- map[string][]testResult{nameENR: allTestRes} return nil } -func runRelayTest(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCaseRelay, conf testPeersConfig, target string, testResCh chan testResult) { +func runPeerTest(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCasePeer, conf testPeersConfig, tcpNode host.Host, target p2p.Peer, testResCh chan testResult) { defer close(testResCh) for _, t := range queuedTestCases { select { case <-ctx.Done(): + testResCh <- failedTestResult(testResult{Name: t.name}, errTimeoutInterrupted) return default: - testResCh <- allTestCases[t](ctx, &conf, target) - } - } -} - -func testAllPeers(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCasePeer, conf testPeersConfig, tcpNode host.Host, allPeersResCh chan map[string][]testResult) error { - // run tests for all peer nodes - allPeersRes := make(map[string][]testResult) - singlePeerResCh := make(chan map[string][]testResult) - group, _ := errgroup.WithContext(ctx) - - enrs, err := fetchENRs(conf) - if err != nil { - return err - } - for _, enr := range enrs { - currENR := enr // TODO: can be removed after go1.22 version bump - group.Go(func() error { - return testSinglePeer(ctx, queuedTestCases, allTestCases, conf, tcpNode, currENR, singlePeerResCh) - }) - } - - doneReading := make(chan bool) - go func() { - for singlePeerRes := range singlePeerResCh { - maps.Copy(allPeersRes, singlePeerRes) - } - doneReading <- true - }() - - err = group.Wait() - if err != nil { - return errors.Wrap(err, "peers test errgroup") - } - close(singlePeerResCh) - <-doneReading - - allPeersResCh <- allPeersRes - - return nil -} - -func testSinglePeer(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCasePeer, conf testPeersConfig, tcpNode host.Host, target string, allTestResCh chan map[string][]testResult) error { - singleTestResCh := make(chan testResult) - allTestRes := []testResult{} - enrTarget, err := enr.Parse(target) - if err != nil { - return err - } - peerTarget, err := p2p.NewPeerFromENR(enrTarget, 0) - if err != nil { - return err - } - - nameENR := fmt.Sprintf("peer %v %v", peerTarget.Name, target) - - if len(queuedTestCases) == 0 { - allTestResCh <- map[string][]testResult{nameENR: allTestRes} - return nil - } - - // run all peers tests for a peer, pushing each completed test to the channel until all are complete or timeout occurs - go runPeerTest(ctx, queuedTestCases, allTestCases, conf, tcpNode, peerTarget, singleTestResCh) - testCounter := 0 - finished := false - for !finished { - var testName string - select { - case <-ctx.Done(): - if testCounter < len(queuedTestCases) { - testName = queuedTestCases[testCounter].name - allTestRes = append(allTestRes, testResult{Name: testName, Verdict: testVerdictFail, Error: errTimeoutInterrupted}) - } - finished = true - case result, ok := <-singleTestResCh: - if !ok { - finished = true - continue - } - testName = queuedTestCases[testCounter].name - testCounter++ - result.Name = testName - allTestRes = append(allTestRes, result) - } - } - - allTestResCh <- map[string][]testResult{nameENR: allTestRes} - - return nil -} - -func runPeerTest(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCasePeer, conf testPeersConfig, tcpNode host.Host, target p2p.Peer, testResCh chan testResult) { - defer close(testResCh) - for _, t := range queuedTestCases { - select { - case <-ctx.Done(): - testResCh <- failedTestResult(testResult{Name: t.name}, errTimeoutInterrupted) - return - default: - testResCh <- allTestCases[t](ctx, &conf, tcpNode, target) - } - } -} - -func testSelf(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCasePeerSelf, conf testPeersConfig, allTestResCh chan map[string][]testResult) error { - singleTestResCh := make(chan testResult) - allTestRes := []testResult{} - if len(queuedTestCases) == 0 { - allTestResCh <- map[string][]testResult{"self": allTestRes} - return nil - } - go runSelfTest(ctx, queuedTestCases, allTestCases, conf, singleTestResCh) - - testCounter := 0 - finished := false - for !finished { - var testName string - select { - case <-ctx.Done(): - testName = queuedTestCases[testCounter].name - allTestRes = append(allTestRes, testResult{Name: testName, Verdict: testVerdictFail, Error: errTimeoutInterrupted}) - finished = true - case result, ok := <-singleTestResCh: - if !ok { - finished = true - continue - } - testName = queuedTestCases[testCounter].name - testCounter++ - result.Name = testName - allTestRes = append(allTestRes, result) - } - } - - allTestResCh <- map[string][]testResult{"self": allTestRes} - - return nil -} - -func runSelfTest(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCasePeerSelf, conf testPeersConfig, ch chan testResult) { - defer close(ch) - for _, t := range queuedTestCases { - select { - case <-ctx.Done(): - return - default: - ch <- allTestCases[t](ctx, &conf) + testResCh <- allTestCases[t](ctx, &conf, tcpNode, target) } } } @@ -736,7 +414,7 @@ func peerPingLoadTest(ctx context.Context, conf *testPeersConfig, tcpNode host.H ) testRes := testResult{Name: "PingLoad"} - testResCh := make(chan ping.Result, math.MaxInt16) + testResCh := make(chan time.Duration, math.MaxInt16) pingCtx, cancel := context.WithTimeout(ctx, conf.LoadTestDuration) defer cancel() ticker := time.NewTicker(time.Second) @@ -758,48 +436,11 @@ func peerPingLoadTest(ctx context.Context, conf *testPeersConfig, tcpNode host.H close(testResCh) log.Info(ctx, "Ping load tests finished", z.Any("target", peer.Name)) - highestRTT := time.Duration(0) - for val := range testResCh { - if val.RTT > highestRTT { - highestRTT = val.RTT - } - } - if highestRTT > thresholdPeersLoadPoor { - testRes.Verdict = testVerdictPoor - } else if highestRTT > thresholdPeersLoadAvg { - testRes.Verdict = testVerdictAvg - } else { - testRes.Verdict = testVerdictGood - } - testRes.Measurement = Duration{highestRTT}.String() + testRes = evaluateHighestRTTScores(testResCh, testRes, thresholdPeersLoadAvg, thresholdPeersLoadPoor) return testRes } -func dialLibp2pTCPIP(ctx context.Context, address string) error { - d := net.Dialer{Timeout: time.Second} - conn, err := d.DialContext(ctx, "tcp", address) - if err != nil { - return errors.Wrap(err, "net dial") - } - defer conn.Close() - buf := new(strings.Builder) - _, err = io.CopyN(buf, conn, 19) - if err != nil { - return errors.Wrap(err, "io copy") - } - if !strings.Contains(buf.String(), "/multistream/1.0.0") { - return errors.New("multistream not found", z.Any("found", buf.String()), z.Any("address", address)) - } - - err = conn.Close() - if err != nil { - return errors.Wrap(err, "close conn") - } - - return nil -} - func peerDirectConnTest(ctx context.Context, conf *testPeersConfig, tcpNode host.Host, p2pPeer p2p.Peer) testResult { testRes := testResult{Name: "DirectConn"} @@ -830,6 +471,55 @@ func peerDirectConnTest(ctx context.Context, conf *testPeersConfig, tcpNode host return testRes } +// self tests + +func testSelf(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCasePeerSelf, conf testPeersConfig, allTestResCh chan map[string][]testResult) error { + singleTestResCh := make(chan testResult) + allTestRes := []testResult{} + if len(queuedTestCases) == 0 { + allTestResCh <- map[string][]testResult{"self": allTestRes} + return nil + } + go runSelfTest(ctx, queuedTestCases, allTestCases, conf, singleTestResCh) + + testCounter := 0 + finished := false + for !finished { + var testName string + select { + case <-ctx.Done(): + testName = queuedTestCases[testCounter].name + allTestRes = append(allTestRes, testResult{Name: testName, Verdict: testVerdictFail, Error: errTimeoutInterrupted}) + finished = true + case result, ok := <-singleTestResCh: + if !ok { + finished = true + continue + } + testName = queuedTestCases[testCounter].name + testCounter++ + result.Name = testName + allTestRes = append(allTestRes, result) + } + } + + allTestResCh <- map[string][]testResult{"self": allTestRes} + + return nil +} + +func runSelfTest(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCasePeerSelf, conf testPeersConfig, ch chan testResult) { + defer close(ch) + for _, t := range queuedTestCases { + select { + case <-ctx.Done(): + return + default: + ch <- allTestCases[t](ctx, &conf) + } + } +} + func libp2pTCPPortOpenTest(ctx context.Context, cfg *testPeersConfig) testResult { testRes := testResult{Name: "Libp2pTCPPortOpen"} @@ -849,48 +539,98 @@ func libp2pTCPPortOpenTest(ctx context.Context, cfg *testPeersConfig) testResult return testRes } -func relayPingTest(ctx context.Context, _ *testPeersConfig, target string) testResult { - testRes := testResult{Name: "PingRelay"} +// charon relays tests - client := http.Client{} - req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil) - if err != nil { - return failedTestResult(testRes, err) +func testAllRelays(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCaseRelay, conf testPeersConfig, allRelaysResCh chan map[string][]testResult) error { + // run tests for all relays + allRelayRes := make(map[string][]testResult) + singleRelayResCh := make(chan map[string][]testResult) + group, _ := errgroup.WithContext(ctx) + + for _, relay := range conf.P2P.Relays { + group.Go(func() error { + return testSingleRelay(ctx, queuedTestCases, allTestCases, conf, relay, singleRelayResCh) + }) } - resp, err := client.Do(req) + + doneReading := make(chan bool) + go func() { + for singleRelayRes := range singleRelayResCh { + maps.Copy(allRelayRes, singleRelayRes) + } + doneReading <- true + }() + + err := group.Wait() if err != nil { - return failedTestResult(testRes, err) + return errors.Wrap(err, "relays test errgroup") } - defer resp.Body.Close() + close(singleRelayResCh) + <-doneReading - if resp.StatusCode > 399 { - return failedTestResult(testRes, errors.New(httpStatusError(resp.StatusCode))) - } + allRelaysResCh <- allRelayRes - testRes.Verdict = testVerdictOk - - return testRes + return nil } -func relayPingMeasureTest(ctx context.Context, _ *testPeersConfig, target string) testResult { - testRes := testResult{Name: "PingMeasureRelay"} +func testSingleRelay(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCaseRelay, conf testPeersConfig, target string, allTestResCh chan map[string][]testResult) error { + singleTestResCh := make(chan testResult) + allTestRes := []testResult{} + relayName := fmt.Sprintf("relay %v", target) + if len(queuedTestCases) == 0 { + allTestResCh <- map[string][]testResult{relayName: allTestRes} + return nil + } + + // run all relay tests for a relay, pushing each completed test to the channel until all are complete or timeout occurs + go runRelayTest(ctx, queuedTestCases, allTestCases, conf, target, singleTestResCh) + testCounter := 0 + finished := false + for !finished { + var testName string + select { + case <-ctx.Done(): + testName = queuedTestCases[testCounter].name + allTestRes = append(allTestRes, testResult{Name: testName, Verdict: testVerdictFail, Error: errTimeoutInterrupted}) + finished = true + case result, ok := <-singleTestResCh: + if !ok { + finished = true + continue + } + testName = queuedTestCases[testCounter].name + testCounter++ + result.Name = testName + allTestRes = append(allTestRes, result) + } + } - var start time.Time - var firstByte time.Duration + allTestResCh <- map[string][]testResult{relayName: allTestRes} - trace := &httptrace.ClientTrace{ - GotFirstResponseByte: func() { - firstByte = time.Since(start) - }, + return nil +} + +func runRelayTest(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]testCaseRelay, conf testPeersConfig, target string, testResCh chan testResult) { + defer close(testResCh) + for _, t := range queuedTestCases { + select { + case <-ctx.Done(): + return + default: + testResCh <- allTestCases[t](ctx, &conf, target) + } } +} + +func relayPingTest(ctx context.Context, _ *testPeersConfig, target string) testResult { + testRes := testResult{Name: "PingRelay"} - start = time.Now() - req, err := http.NewRequestWithContext(httptrace.WithClientTrace(ctx, trace), http.MethodGet, target, nil) + client := http.Client{} + req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil) if err != nil { return failedTestResult(testRes, err) } - - resp, err := http.DefaultTransport.RoundTrip(req) + resp, err := client.Do(req) if err != nil { return failedTestResult(testRes, err) } @@ -900,14 +640,235 @@ func relayPingMeasureTest(ctx context.Context, _ *testPeersConfig, target string return failedTestResult(testRes, errors.New(httpStatusError(resp.StatusCode))) } - if firstByte > thresholdRelayMeasurePoor { - testRes.Verdict = testVerdictPoor - } else if firstByte > thresholdRelayMeasureAvg { - testRes.Verdict = testVerdictAvg - } else { - testRes.Verdict = testVerdictGood + testRes.Verdict = testVerdictOk + + return testRes +} + +func relayPingMeasureTest(ctx context.Context, _ *testPeersConfig, target string) testResult { + testRes := testResult{Name: "PingMeasureRelay"} + + rtt, err := requestRTT(ctx, target, http.MethodGet, nil, 200) + if err != nil { + return failedTestResult(testRes, err) } - testRes.Measurement = Duration{firstByte}.String() + + testRes = evaluateRTT(rtt, testRes, thresholdRelayMeasureAvg, thresholdRelayMeasurePoor) return testRes } + +// helper functions + +func fetchPeersFromDefinition(path string) ([]string, error) { + f, err := os.ReadFile(path) + if err != nil { + return nil, errors.Wrap(err, "read definition file", z.Str("path", path)) + } + + var def cluster.Definition + err = json.Unmarshal(f, &def) + if err != nil { + return nil, errors.Wrap(err, "unmarshal definition json", z.Str("path", path)) + } + + var enrs []string + for _, o := range def.Operators { + enrs = append(enrs, o.ENR) + } + + if len(enrs) == 0 { + return nil, errors.New("no peers found in lock", z.Str("path", path)) + } + + return enrs, nil +} + +func fetchPeersFromLock(path string) ([]string, error) { + f, err := os.ReadFile(path) + if err != nil { + return nil, errors.Wrap(err, "read lock file", z.Str("path", path)) + } + + var lock cluster.Lock + err = json.Unmarshal(f, &lock) + if err != nil { + return nil, errors.Wrap(err, "unmarshal lock json", z.Str("path", path)) + } + + var enrs []string + for _, o := range lock.Operators { + enrs = append(enrs, o.ENR) + } + + if len(enrs) == 0 { + return nil, errors.New("no peers found in lock", z.Str("path", path)) + } + + return enrs, nil +} + +func fetchENRs(conf testPeersConfig) ([]string, error) { + var enrs []string + var err error + switch { + case len(conf.ENRs) != 0: + enrs = conf.ENRs + case conf.ClusterDefinitionFilePath != "": + enrs, err = fetchPeersFromDefinition(conf.ClusterDefinitionFilePath) + if err != nil { + return nil, err + } + case conf.ClusterLockFilePath != "": + enrs, err = fetchPeersFromLock(conf.ClusterLockFilePath) + if err != nil { + return nil, err + } + } + + return enrs, nil +} + +func startTCPNode(ctx context.Context, conf testPeersConfig) (host.Host, func(), error) { + enrs, err := fetchENRs(conf) + if err != nil { + return nil, nil, err + } + + var peers []p2p.Peer + for i, enrString := range enrs { + enrRecord, err := enr.Parse(enrString) + if err != nil { + return nil, nil, errors.Wrap(err, "decode enr", z.Str("enr", enrString)) + } + + p2pPeer, err := p2p.NewPeerFromENR(enrRecord, i) + if err != nil { + return nil, nil, err + } + + peers = append(peers, p2pPeer) + } + + p2pPrivKey, err := p2p.LoadPrivKey(conf.DataDir) + if err != nil { + return nil, nil, err + } + + meENR, err := enr.New(p2pPrivKey) + if err != nil { + return nil, nil, err + } + + mePeer, err := p2p.NewPeerFromENR(meENR, len(enrs)) + if err != nil { + return nil, nil, err + } + + log.Info(ctx, "Self p2p name resolved", z.Any("name", mePeer.Name)) + + peers = append(peers, mePeer) + + allENRs := enrs + allENRs = append(allENRs, meENR.String()) + slices.Sort(allENRs) + allENRsString := strings.Join(allENRs, ",") + allENRsHash := sha256.Sum256([]byte(allENRsString)) + + return setupP2P(ctx, p2pPrivKey, conf.P2P, peers, allENRsHash[:]) +} + +func setupP2P(ctx context.Context, privKey *k1.PrivateKey, conf p2p.Config, peers []p2p.Peer, enrsHash []byte) (host.Host, func(), error) { + var peerIDs []peer.ID + for _, peer := range peers { + peerIDs = append(peerIDs, peer.ID) + } + + if err := p2p.VerifyP2PKey(peers, privKey); err != nil { + return nil, nil, err + } + + relays, err := p2p.NewRelays(ctx, conf.Relays, hex.EncodeToString(enrsHash)) + if err != nil { + return nil, nil, err + } + + connGater, err := p2p.NewConnGater(peerIDs, relays) + if err != nil { + return nil, nil, err + } + + tcpNode, err := p2p.NewTCPNode(ctx, conf, privKey, connGater, false) + if err != nil { + return nil, nil, err + } + + p2p.RegisterConnectionLogger(ctx, tcpNode, peerIDs) + + for _, relay := range relays { + go p2p.NewRelayReserver(tcpNode, relay)(ctx) + } + + go p2p.NewRelayRouter(tcpNode, peerIDs, relays)(ctx) + + return tcpNode, func() { + err := tcpNode.Close() + if err != nil && !errors.Is(err, context.Canceled) { + log.Error(ctx, "Close TCP node", err) + } + }, nil +} + +func pingPeerOnce(ctx context.Context, tcpNode host.Host, peer p2p.Peer) (ping.Result, error) { + pingSvc := ping.NewPingService(tcpNode) + pingCtx, cancel := context.WithCancel(ctx) + defer cancel() + pingChan := pingSvc.Ping(pingCtx, peer.ID) + result, ok := <-pingChan + if !ok { + return ping.Result{}, errors.New("ping channel closed") + } + + return result, nil +} + +func pingPeerContinuously(ctx context.Context, tcpNode host.Host, peer p2p.Peer, resCh chan<- time.Duration) { + for { + r, err := pingPeerOnce(ctx, tcpNode, peer) + if err != nil { + return + } + + select { + case <-ctx.Done(): + return + case resCh <- r.RTT: + awaitTime := rand.Intn(100) //nolint:gosec // weak generator is not an issue here + sleepWithContext(ctx, time.Duration(awaitTime)*time.Millisecond) + } + } +} + +func dialLibp2pTCPIP(ctx context.Context, address string) error { + d := net.Dialer{Timeout: time.Second} + conn, err := d.DialContext(ctx, "tcp", address) + if err != nil { + return errors.Wrap(err, "net dial") + } + defer conn.Close() + buf := new(strings.Builder) + _, err = io.CopyN(buf, conn, 19) + if err != nil { + return errors.Wrap(err, "io copy") + } + if !strings.Contains(buf.String(), "/multistream/1.0.0") { + return errors.New("multistream not found", z.Any("found", buf.String()), z.Any("address", address)) + } + + err = conn.Close() + if err != nil { + return errors.Wrap(err, "close conn") + } + + return nil +} diff --git a/cmd/testpeers_internal_test.go b/cmd/testpeers_internal_test.go index a2694a2b69..0d766c0f36 100644 --- a/cmd/testpeers_internal_test.go +++ b/cmd/testpeers_internal_test.go @@ -78,19 +78,19 @@ func TestPeersTest(t *testing.T) { {Name: "pingRelay", Verdict: testVerdictOk, Measurement: "", Suggestion: "", Error: testResultError{}}, {Name: "pingMeasureRelay", Verdict: testVerdictGood, Measurement: "", Suggestion: "", Error: testResultError{}}, }, - "peer inexpensive-farm enr:-HW4QBHlcyD3fYWUMADiOv4OxODaL5wJG0a7P7d_ltu4VZe1MibZ1N-twFaoaq0BoCtXcY71etxLJGeEZT5p3XCO6GOAgmlkgnY0iXNlY3AyNTZrMaEDI2HRUlVBag__njkOWEEQRLlC9ylIVCrIXOuNBSlrx6o": { + "peer inexpensive-farm enr:-HW4QBHlc...rx6o": { {Name: "ping", Verdict: testVerdictOk, Measurement: "", Suggestion: "", Error: testResultError{}}, {Name: "pingMeasure", Verdict: testVerdictGood, Measurement: "", Suggestion: "", Error: testResultError{}}, {Name: "pingLoad", Verdict: testVerdictGood, Measurement: "", Suggestion: "", Error: testResultError{}}, {Name: "directConn", Verdict: testVerdictOk, Measurement: "", Suggestion: "", Error: testResultError{}}, }, - "peer anxious-pencil enr:-HW4QDwUF804f4WhUjwcp4JJ-PrRH0glQZv8s2cVHlBRPJ3SYcYO-dvJGsKhztffrski5eujJkl8oAc983MZy6-PqF2AgmlkgnY0iXNlY3AyNTZrMaECPEPryjkmUBnQFyjmMw9rl7DVtKL0243nN5iepqsvKDw": { + "peer anxious-pencil enr:-HW4QDwUF...vKDw": { {Name: "ping", Verdict: testVerdictOk, Measurement: "", Suggestion: "", Error: testResultError{}}, {Name: "pingMeasure", Verdict: testVerdictGood, Measurement: "", Suggestion: "", Error: testResultError{}}, {Name: "pingLoad", Verdict: testVerdictGood, Measurement: "", Suggestion: "", Error: testResultError{}}, {Name: "directConn", Verdict: testVerdictOk, Measurement: "", Suggestion: "", Error: testResultError{}}, }, - "peer important-pen enr:-HW4QPSBgUTag8oZs3zIsgWzlBUrSgT8pgZmFJa7HWwKXUcRLlISa68OJtp-JTzhUXsJ2vSGwKGACn0OTatWdJATxn-AgmlkgnY0iXNlY3AyNTZrMaECA3R_ffXLXCLJsfEwf6xeoAFgWnDIOdq8kS0Yqkhwbr0": { + "peer important-pen enr:-HW4QPSBg...wbr0": { {Name: "ping", Verdict: testVerdictOk, Measurement: "", Suggestion: "", Error: testResultError{}}, {Name: "pingMeasure", Verdict: testVerdictGood, Measurement: "", Suggestion: "", Error: testResultError{}}, {Name: "pingLoad", Verdict: testVerdictGood, Measurement: "", Suggestion: "", Error: testResultError{}}, @@ -144,13 +144,13 @@ func TestPeersTest(t *testing.T) { {Name: "pingRelay", Verdict: testVerdictOk, Measurement: "", Suggestion: "", Error: testResultError{}}, {Name: "pingMeasureRelay", Verdict: testVerdictGood, Measurement: "", Suggestion: "", Error: testResultError{}}, }, - "peer inexpensive-farm enr:-HW4QBHlcyD3fYWUMADiOv4OxODaL5wJG0a7P7d_ltu4VZe1MibZ1N-twFaoaq0BoCtXcY71etxLJGeEZT5p3XCO6GOAgmlkgnY0iXNlY3AyNTZrMaEDI2HRUlVBag__njkOWEEQRLlC9ylIVCrIXOuNBSlrx6o": { + "peer inexpensive-farm enr:-HW4QBHlc...rx6o": { {Name: "ping", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: errTimeoutInterrupted}, }, - "peer anxious-pencil enr:-HW4QDwUF804f4WhUjwcp4JJ-PrRH0glQZv8s2cVHlBRPJ3SYcYO-dvJGsKhztffrski5eujJkl8oAc983MZy6-PqF2AgmlkgnY0iXNlY3AyNTZrMaECPEPryjkmUBnQFyjmMw9rl7DVtKL0243nN5iepqsvKDw": { + "peer anxious-pencil enr:-HW4QDwUF...vKDw": { {Name: "ping", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: errTimeoutInterrupted}, }, - "peer important-pen enr:-HW4QPSBgUTag8oZs3zIsgWzlBUrSgT8pgZmFJa7HWwKXUcRLlISa68OJtp-JTzhUXsJ2vSGwKGACn0OTatWdJATxn-AgmlkgnY0iXNlY3AyNTZrMaECA3R_ffXLXCLJsfEwf6xeoAFgWnDIOdq8kS0Yqkhwbr0": { + "peer important-pen enr:-HW4QPSBg...wbr0": { {Name: "ping", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: errTimeoutInterrupted}, }, }, @@ -204,13 +204,13 @@ func TestPeersTest(t *testing.T) { expected: testCategoryResult{ CategoryName: peersTestCategory, Targets: map[string][]testResult{ - "peer inexpensive-farm enr:-HW4QBHlcyD3fYWUMADiOv4OxODaL5wJG0a7P7d_ltu4VZe1MibZ1N-twFaoaq0BoCtXcY71etxLJGeEZT5p3XCO6GOAgmlkgnY0iXNlY3AyNTZrMaEDI2HRUlVBag__njkOWEEQRLlC9ylIVCrIXOuNBSlrx6o": { + "peer inexpensive-farm enr:-HW4QBHlc...rx6o": { {Name: "ping", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: errTimeoutInterrupted}, }, - "peer anxious-pencil enr:-HW4QDwUF804f4WhUjwcp4JJ-PrRH0glQZv8s2cVHlBRPJ3SYcYO-dvJGsKhztffrski5eujJkl8oAc983MZy6-PqF2AgmlkgnY0iXNlY3AyNTZrMaECPEPryjkmUBnQFyjmMw9rl7DVtKL0243nN5iepqsvKDw": { + "peer anxious-pencil enr:-HW4QDwUF...vKDw": { {Name: "ping", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: errTimeoutInterrupted}, }, - "peer important-pen enr:-HW4QPSBgUTag8oZs3zIsgWzlBUrSgT8pgZmFJa7HWwKXUcRLlISa68OJtp-JTzhUXsJ2vSGwKGACn0OTatWdJATxn-AgmlkgnY0iXNlY3AyNTZrMaECA3R_ffXLXCLJsfEwf6xeoAFgWnDIOdq8kS0Yqkhwbr0": { + "peer important-pen enr:-HW4QPSBg...wbr0": { {Name: "ping", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: errTimeoutInterrupted}, }, }, @@ -247,13 +247,13 @@ func TestPeersTest(t *testing.T) { {Name: "pingRelay", Verdict: testVerdictOk, Measurement: "", Suggestion: "", Error: testResultError{}}, {Name: "pingMeasureRelay", Verdict: testVerdictGood, Measurement: "", Suggestion: "", Error: testResultError{}}, }, - "peer inexpensive-farm enr:-HW4QBHlcyD3fYWUMADiOv4OxODaL5wJG0a7P7d_ltu4VZe1MibZ1N-twFaoaq0BoCtXcY71etxLJGeEZT5p3XCO6GOAgmlkgnY0iXNlY3AyNTZrMaEDI2HRUlVBag__njkOWEEQRLlC9ylIVCrIXOuNBSlrx6o": { + "peer inexpensive-farm enr:-HW4QBHlc...rx6o": { {Name: "ping", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: errTimeoutInterrupted}, }, - "peer anxious-pencil enr:-HW4QDwUF804f4WhUjwcp4JJ-PrRH0glQZv8s2cVHlBRPJ3SYcYO-dvJGsKhztffrski5eujJkl8oAc983MZy6-PqF2AgmlkgnY0iXNlY3AyNTZrMaECPEPryjkmUBnQFyjmMw9rl7DVtKL0243nN5iepqsvKDw": { + "peer anxious-pencil enr:-HW4QDwUF...vKDw": { {Name: "ping", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: errTimeoutInterrupted}, }, - "peer important-pen enr:-HW4QPSBgUTag8oZs3zIsgWzlBUrSgT8pgZmFJa7HWwKXUcRLlISa68OJtp-JTzhUXsJ2vSGwKGACn0OTatWdJATxn-AgmlkgnY0iXNlY3AyNTZrMaECA3R_ffXLXCLJsfEwf6xeoAFgWnDIOdq8kS0Yqkhwbr0": { + "peer important-pen enr:-HW4QPSBg...wbr0": { {Name: "ping", Verdict: testVerdictFail, Measurement: "", Suggestion: "", Error: errTimeoutInterrupted}, }, }, diff --git a/cmd/testperformance.go b/cmd/testperformance.go index 69e5980d4a..5cdc0b99f9 100644 --- a/cmd/testperformance.go +++ b/cmd/testperformance.go @@ -92,16 +92,16 @@ func newTestPerformanceCmd(runFunc func(context.Context, io.Writer, testPerforma } bindTestFlags(cmd, &config.testConfig) - bindTestPerformanceFlags(cmd, &config) + bindTestPerformanceFlags(cmd, &config, "") return cmd } -func bindTestPerformanceFlags(cmd *cobra.Command, config *testPerformanceConfig) { - cmd.Flags().StringVar(&config.DiskIOTestFileDir, "disk-io-test-file-dir", "", "Directory at which disk performance will be measured. If none specified, current user's home directory will be used.") - cmd.Flags().IntVar(&config.DiskIOBlockSizeKb, "disk-io-block-size-kb", 4096, "The block size in kilobytes used for I/O units. Same value applies for both reads and writes.") - cmd.Flags().StringSliceVar(&config.InternetTestServersOnly, "internet-test-servers-only", []string{}, "List of specific server names to be included for the internet tests, the best performing one is chosen. If not provided, closest and best performing servers are chosen automatically.") - cmd.Flags().StringSliceVar(&config.InternetTestServersExclude, "internet-test-servers-exclude", []string{}, "List of server names to be excluded from the tests. To be specified only if you experience issues with a server that is wrongly considered best performing.") +func bindTestPerformanceFlags(cmd *cobra.Command, config *testPerformanceConfig, flagsPrefix string) { + cmd.Flags().StringVar(&config.DiskIOTestFileDir, flagsPrefix+"disk-io-test-file-dir", "", "Directory at which disk performance will be measured. If none specified, current user's home directory will be used.") + cmd.Flags().IntVar(&config.DiskIOBlockSizeKb, flagsPrefix+"disk-io-block-size-kb", 4096, "The block size in kilobytes used for I/O units. Same value applies for both reads and writes.") + cmd.Flags().StringSliceVar(&config.InternetTestServersOnly, flagsPrefix+"internet-test-servers-only", []string{}, "List of specific server names to be included for the internet tests, the best performing one is chosen. If not provided, closest and best performing servers are chosen automatically.") + cmd.Flags().StringSliceVar(&config.InternetTestServersExclude, flagsPrefix+"internet-test-servers-exclude", []string{}, "List of server names to be excluded from the tests. To be specified only if you experience issues with a server that is wrongly considered best performing.") } func supportedPerformanceTestCases() map[testCaseName]func(context.Context, *testPerformanceConfig) testResult { @@ -119,6 +119,8 @@ func supportedPerformanceTestCases() map[testCaseName]func(context.Context, *tes } func runTestPerformance(ctx context.Context, w io.Writer, cfg testPerformanceConfig) (err error) { + log.Info(ctx, "Starting machine performance and network connectivity test") + testCases := supportedPerformanceTestCases() queuedTests := filterTests(maps.Keys(testCases), cfg.testConfig) if len(queuedTests) == 0 { @@ -174,6 +176,8 @@ func runTestPerformance(ctx context.Context, w io.Writer, cfg testPerformanceCon return nil } +// hardware and internet connectivity performance tests + func testSinglePerformance(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]func(context.Context, *testPerformanceConfig) testResult, cfg testPerformanceConfig, resCh chan map[string][]testResult) { defer close(resCh) singleTestResCh := make(chan testResult) @@ -217,27 +221,6 @@ func testPerformance(ctx context.Context, queuedTests []testCaseName, allTests m } } -func fioCommand(ctx context.Context, filename string, blocksize int, operation string) ([]byte, error) { - //nolint:gosec - cmd, err := exec.CommandContext(ctx, "fio", - "--name=fioTest", - fmt.Sprintf("--filename=%v/fiotest", filename), - fmt.Sprintf("--size=%vMb", diskOpsMBsTotal/diskOpsNumOfJobs), - fmt.Sprintf("--blocksize=%vk", blocksize), - fmt.Sprintf("--numjobs=%v", diskOpsNumOfJobs), - fmt.Sprintf("--rw=%v", operation), - "--direct=1", - "--runtime=60s", - "--group_reporting", - "--output-format=json", - ).Output() - if err != nil { - return nil, errors.Wrap(err, "exec fio command") - } - - return cmd, nil -} - func performanceDiskWriteSpeedTest(ctx context.Context, conf *testPerformanceConfig) testResult { testRes := testResult{Name: "DiskWriteSpeed"} @@ -444,86 +427,6 @@ func performanceDiskReadIOPSTest(ctx context.Context, conf *testPerformanceConfi return testRes } -func availableMemoryLinux(context.Context) (int64, error) { - file, err := os.Open("/proc/meminfo") - if err != nil { - return 0, errors.Wrap(err, "open /proc/meminfo") - } - scanner := bufio.NewScanner(file) - if scanner.Err() != nil { - return 0, errors.Wrap(err, "new scanner") - } - - for scanner.Scan() { - line := scanner.Text() - if !strings.Contains(line, "MemAvailable") { - continue - } - splitText := strings.Split(line, ": ") - kbs := strings.Trim(strings.Split(splitText[1], "kB")[0], " ") - kbsInt, err := strconv.ParseInt(kbs, 10, 64) - if err != nil { - return 0, errors.Wrap(err, "parse MemAvailable int") - } - - return kbsInt * 1024, nil - } - - return 0, errors.New("memAvailable not found in /proc/meminfo") -} - -func availableMemoryMacos(ctx context.Context) (int64, error) { - pageSizeBytes, err := exec.CommandContext(ctx, "pagesize").Output() - if err != nil { - return 0, errors.Wrap(err, "run pagesize") - } - memorySizePerPage, err := strconv.ParseInt(strings.TrimSuffix(string(pageSizeBytes), "\n"), 10, 64) - if err != nil { - return 0, errors.Wrap(err, "parse memorySizePerPage int") - } - - out, err := exec.CommandContext(ctx, "vm_stat").Output() - if err != nil { - return 0, errors.Wrap(err, "run vm_stat") - } - outBuf := bytes.NewBuffer(out) - scanner := bufio.NewScanner(outBuf) - if scanner.Err() != nil { - return 0, errors.Wrap(err, "new scanner") - } - - var pagesFree, pagesInactive, pagesSpeculative int64 - for scanner.Scan() { - line := scanner.Text() - splitText := strings.Split(line, ": ") - - var bytes int64 - var err error - switch { - case strings.Contains(splitText[0], "Pages free"): - bytes, err = strconv.ParseInt(strings.Trim(strings.Split(splitText[1], ".")[0], " "), 10, 64) - if err != nil { - return 0, errors.Wrap(err, "parse Pages free int") - } - pagesFree = bytes - case strings.Contains(splitText[0], "Pages inactive"): - bytes, err = strconv.ParseInt(strings.Trim(strings.Split(splitText[1], ".")[0], " "), 10, 64) - if err != nil { - return 0, errors.Wrap(err, "parse Pages inactive int") - } - pagesInactive = bytes - case strings.Contains(splitText[0], "Pages speculative"): - bytes, err = strconv.ParseInt(strings.Trim(strings.Split(splitText[1], ".")[0], " "), 10, 64) - if err != nil { - return 0, errors.Wrap(err, "parse Pages speculative int") - } - pagesSpeculative = bytes - } - } - - return ((pagesFree + pagesInactive + pagesSpeculative) * memorySizePerPage), nil -} - func performanceAvailableMemoryTest(ctx context.Context, _ *testPerformanceConfig) testResult { testRes := testResult{Name: "AvailableMemory"} @@ -559,49 +462,6 @@ func performanceAvailableMemoryTest(ctx context.Context, _ *testPerformanceConfi return testRes } -func totalMemoryLinux(context.Context) (int64, error) { - file, err := os.Open("/proc/meminfo") - if err != nil { - return 0, errors.Wrap(err, "open /proc/meminfo") - } - scanner := bufio.NewScanner(file) - if scanner.Err() != nil { - return 0, errors.Wrap(err, "new scanner") - } - - for scanner.Scan() { - line := scanner.Text() - if !strings.Contains(line, "MemTotal") { - continue - } - splitText := strings.Split(line, ": ") - kbs := strings.Trim(strings.Split(splitText[1], "kB")[0], " ") - kbsInt, err := strconv.ParseInt(kbs, 10, 64) - if err != nil { - return 0, errors.Wrap(err, "parse MemTotal int") - } - - return kbsInt * 1024, nil - } - - return 0, errors.New("memTotal not found in /proc/meminfo") -} - -func totalMemoryMacos(ctx context.Context) (int64, error) { - out, err := exec.CommandContext(ctx, "sysctl", "hw.memsize").Output() - if err != nil { - return 0, errors.Wrap(err, "run sysctl hw.memsize") - } - - memSize := strings.TrimSuffix(strings.Split(string(out), ": ")[1], "\n") - memSizeInt, err := strconv.ParseInt(memSize, 10, 64) - if err != nil { - return 0, errors.Wrap(err, "parse memSize int") - } - - return memSizeInt, nil -} - func performanceTotalMemoryTest(ctx context.Context, _ *testPerformanceConfig) testResult { testRes := testResult{Name: "TotalMemory"} @@ -637,44 +497,6 @@ func performanceTotalMemoryTest(ctx context.Context, _ *testPerformanceConfig) t return testRes } -func fetchOoklaServer(_ context.Context, conf *testPerformanceConfig) (speedtest.Server, error) { - speedtestClient := speedtest.New() - - serverList, err := speedtestClient.FetchServers() - if err != nil { - return speedtest.Server{}, errors.Wrap(err, "fetch Ookla servers") - } - - var targets speedtest.Servers - - if len(conf.InternetTestServersOnly) != 0 { - for _, server := range serverList { - if slices.Contains(conf.InternetTestServersOnly, server.Name) { - targets = append(targets, server) - } - } - } - - if len(conf.InternetTestServersExclude) != 0 { - for _, server := range serverList { - if !slices.Contains(conf.InternetTestServersExclude, server.Name) { - targets = append(targets, server) - } - } - } - - if targets == nil { - targets = serverList - } - - servers, err := targets.FindServer([]int{}) - if err != nil { - return speedtest.Server{}, errors.Wrap(err, "find Ookla server") - } - - return *servers[0], nil -} - func performanceInternetLatencyTest(ctx context.Context, conf *testPerformanceConfig) testResult { testRes := testResult{Name: "InternetLatency"} @@ -770,3 +592,187 @@ func performanceInternetUploadSpeedTest(ctx context.Context, conf *testPerforman return testRes } + +// helper functions + +func fioCommand(ctx context.Context, filename string, blocksize int, operation string) ([]byte, error) { + //nolint:gosec + cmd, err := exec.CommandContext(ctx, "fio", + "--name=fioTest", + fmt.Sprintf("--filename=%v/fiotest", filename), + fmt.Sprintf("--size=%vMb", diskOpsMBsTotal/diskOpsNumOfJobs), + fmt.Sprintf("--blocksize=%vk", blocksize), + fmt.Sprintf("--numjobs=%v", diskOpsNumOfJobs), + fmt.Sprintf("--rw=%v", operation), + "--direct=1", + "--runtime=60s", + "--group_reporting", + "--output-format=json", + ).Output() + if err != nil { + return nil, errors.Wrap(err, "exec fio command") + } + + return cmd, nil +} + +func availableMemoryLinux(context.Context) (int64, error) { + file, err := os.Open("/proc/meminfo") + if err != nil { + return 0, errors.Wrap(err, "open /proc/meminfo") + } + scanner := bufio.NewScanner(file) + if scanner.Err() != nil { + return 0, errors.Wrap(err, "new scanner") + } + + for scanner.Scan() { + line := scanner.Text() + if !strings.Contains(line, "MemAvailable") { + continue + } + splitText := strings.Split(line, ": ") + kbs := strings.Trim(strings.Split(splitText[1], "kB")[0], " ") + kbsInt, err := strconv.ParseInt(kbs, 10, 64) + if err != nil { + return 0, errors.Wrap(err, "parse MemAvailable int") + } + + return kbsInt * 1024, nil + } + + return 0, errors.New("memAvailable not found in /proc/meminfo") +} + +func availableMemoryMacos(ctx context.Context) (int64, error) { + pageSizeBytes, err := exec.CommandContext(ctx, "pagesize").Output() + if err != nil { + return 0, errors.Wrap(err, "run pagesize") + } + memorySizePerPage, err := strconv.ParseInt(strings.TrimSuffix(string(pageSizeBytes), "\n"), 10, 64) + if err != nil { + return 0, errors.Wrap(err, "parse memorySizePerPage int") + } + + out, err := exec.CommandContext(ctx, "vm_stat").Output() + if err != nil { + return 0, errors.Wrap(err, "run vm_stat") + } + outBuf := bytes.NewBuffer(out) + scanner := bufio.NewScanner(outBuf) + if scanner.Err() != nil { + return 0, errors.Wrap(err, "new scanner") + } + + var pagesFree, pagesInactive, pagesSpeculative int64 + for scanner.Scan() { + line := scanner.Text() + splitText := strings.Split(line, ": ") + + var bytes int64 + var err error + switch { + case strings.Contains(splitText[0], "Pages free"): + bytes, err = strconv.ParseInt(strings.Trim(strings.Split(splitText[1], ".")[0], " "), 10, 64) + if err != nil { + return 0, errors.Wrap(err, "parse Pages free int") + } + pagesFree = bytes + case strings.Contains(splitText[0], "Pages inactive"): + bytes, err = strconv.ParseInt(strings.Trim(strings.Split(splitText[1], ".")[0], " "), 10, 64) + if err != nil { + return 0, errors.Wrap(err, "parse Pages inactive int") + } + pagesInactive = bytes + case strings.Contains(splitText[0], "Pages speculative"): + bytes, err = strconv.ParseInt(strings.Trim(strings.Split(splitText[1], ".")[0], " "), 10, 64) + if err != nil { + return 0, errors.Wrap(err, "parse Pages speculative int") + } + pagesSpeculative = bytes + } + } + + return ((pagesFree + pagesInactive + pagesSpeculative) * memorySizePerPage), nil +} + +func totalMemoryLinux(context.Context) (int64, error) { + file, err := os.Open("/proc/meminfo") + if err != nil { + return 0, errors.Wrap(err, "open /proc/meminfo") + } + scanner := bufio.NewScanner(file) + if scanner.Err() != nil { + return 0, errors.Wrap(err, "new scanner") + } + + for scanner.Scan() { + line := scanner.Text() + if !strings.Contains(line, "MemTotal") { + continue + } + splitText := strings.Split(line, ": ") + kbs := strings.Trim(strings.Split(splitText[1], "kB")[0], " ") + kbsInt, err := strconv.ParseInt(kbs, 10, 64) + if err != nil { + return 0, errors.Wrap(err, "parse MemTotal int") + } + + return kbsInt * 1024, nil + } + + return 0, errors.New("memTotal not found in /proc/meminfo") +} + +func totalMemoryMacos(ctx context.Context) (int64, error) { + out, err := exec.CommandContext(ctx, "sysctl", "hw.memsize").Output() + if err != nil { + return 0, errors.Wrap(err, "run sysctl hw.memsize") + } + + memSize := strings.TrimSuffix(strings.Split(string(out), ": ")[1], "\n") + memSizeInt, err := strconv.ParseInt(memSize, 10, 64) + if err != nil { + return 0, errors.Wrap(err, "parse memSize int") + } + + return memSizeInt, nil +} + +func fetchOoklaServer(_ context.Context, conf *testPerformanceConfig) (speedtest.Server, error) { + speedtestClient := speedtest.New() + + serverList, err := speedtestClient.FetchServers() + if err != nil { + return speedtest.Server{}, errors.Wrap(err, "fetch Ookla servers") + } + + var targets speedtest.Servers + + if len(conf.InternetTestServersOnly) != 0 { + for _, server := range serverList { + if slices.Contains(conf.InternetTestServersOnly, server.Name) { + targets = append(targets, server) + } + } + } + + if len(conf.InternetTestServersExclude) != 0 { + for _, server := range serverList { + if !slices.Contains(conf.InternetTestServersExclude, server.Name) { + targets = append(targets, server) + } + } + } + + if targets == nil { + targets = serverList + } + + servers, err := targets.FindServer([]int{}) + if err != nil { + return speedtest.Server{}, errors.Wrap(err, "find Ookla server") + } + + return *servers[0], nil +} diff --git a/cmd/testvalidator.go b/cmd/testvalidator.go index d60a19b29a..39d134e6ed 100644 --- a/cmd/testvalidator.go +++ b/cmd/testvalidator.go @@ -49,14 +49,14 @@ func newTestValidatorCmd(runFunc func(context.Context, io.Writer, testValidatorC } bindTestFlags(cmd, &config.testConfig) - bindTestValidatorFlags(cmd, &config) + bindTestValidatorFlags(cmd, &config, "") return cmd } -func bindTestValidatorFlags(cmd *cobra.Command, config *testValidatorConfig) { - cmd.Flags().StringVar(&config.APIAddress, "validator-api-address", "127.0.0.1:3600", "Listening address (ip and port) for validator-facing traffic proxying the beacon-node API.") - cmd.Flags().DurationVar(&config.LoadTestDuration, "load-test-duration", 5*time.Second, "Time to keep running the load tests in seconds. For each second a new continuous ping instance is spawned.") +func bindTestValidatorFlags(cmd *cobra.Command, config *testValidatorConfig, flagsPrefix string) { + cmd.Flags().StringVar(&config.APIAddress, flagsPrefix+"validator-api-address", "127.0.0.1:3600", "Listening address (ip and port) for validator-facing traffic proxying the beacon-node API.") + cmd.Flags().DurationVar(&config.LoadTestDuration, flagsPrefix+"load-test-duration", 5*time.Second, "Time to keep running the load tests in seconds. For each second a new continuous ping instance is spawned.") } func supportedValidatorTestCases() map[testCaseName]func(context.Context, *testValidatorConfig) testResult { @@ -68,6 +68,8 @@ func supportedValidatorTestCases() map[testCaseName]func(context.Context, *testV } func runTestValidator(ctx context.Context, w io.Writer, cfg testValidatorConfig) (err error) { + log.Info(ctx, "Starting validator client test") + testCases := supportedValidatorTestCases() queuedTests := filterTests(maps.Keys(testCases), cfg.testConfig) if len(queuedTests) == 0 { @@ -124,6 +126,8 @@ func runTestValidator(ctx context.Context, w io.Writer, cfg testValidatorConfig) return nil } +// validator client tests + func testSingleValidator(ctx context.Context, queuedTestCases []testCaseName, allTestCases map[testCaseName]func(context.Context, *testValidatorConfig) testResult, cfg testValidatorConfig, resCh chan map[string][]testResult) { defer close(resCh) singleTestResCh := make(chan testResult) @@ -194,41 +198,11 @@ func validatorPingMeasureTest(ctx context.Context, conf *testValidatorConfig) te defer conn.Close() rtt := time.Since(before) - if rtt > thresholdValidatorMeasurePoor { - testRes.Verdict = testVerdictPoor - } else if rtt > thresholdValidatorMeasureAvg { - testRes.Verdict = testVerdictAvg - } else { - testRes.Verdict = testVerdictGood - } - testRes.Measurement = Duration{rtt}.String() + testRes = evaluateRTT(rtt, testRes, thresholdValidatorMeasureAvg, thresholdValidatorMeasurePoor) return testRes } -func pingValidatorContinuously(ctx context.Context, address string, resCh chan<- time.Duration) { - d := net.Dialer{Timeout: time.Second} - for { - before := time.Now() - conn, err := d.DialContext(ctx, "tcp", address) - if err != nil { - return - } - rtt := time.Since(before) - err = conn.Close() - if err != nil { - return - } - select { - case <-ctx.Done(): - return - case resCh <- rtt: - awaitTime := rand.Intn(100) //nolint:gosec // weak generator is not an issue here - sleepWithContext(ctx, time.Duration(awaitTime)*time.Millisecond) - } - } -} - func validatorPingLoadTest(ctx context.Context, conf *testValidatorConfig) testResult { log.Info(ctx, "Running ping load tests...", z.Any("duration", conf.LoadTestDuration), @@ -258,20 +232,32 @@ func validatorPingLoadTest(ctx context.Context, conf *testValidatorConfig) testR close(testResCh) log.Info(ctx, "Ping load tests finished", z.Any("target", conf.APIAddress)) - highestRTT := time.Duration(0) - for rtt := range testResCh { - if rtt > highestRTT { - highestRTT = rtt - } - } - if highestRTT > thresholdValidatorLoadPoor { - testRes.Verdict = testVerdictPoor - } else if highestRTT > thresholdValidatorLoadAvg { - testRes.Verdict = testVerdictAvg - } else { - testRes.Verdict = testVerdictGood - } - testRes.Measurement = Duration{highestRTT}.String() + testRes = evaluateHighestRTTScores(testResCh, testRes, thresholdValidatorLoadAvg, thresholdValidatorLoadPoor) return testRes } + +// helper functions + +func pingValidatorContinuously(ctx context.Context, address string, resCh chan<- time.Duration) { + d := net.Dialer{Timeout: time.Second} + for { + before := time.Now() + conn, err := d.DialContext(ctx, "tcp", address) + if err != nil { + return + } + rtt := time.Since(before) + err = conn.Close() + if err != nil { + return + } + select { + case <-ctx.Done(): + return + case resCh <- rtt: + awaitTime := rand.Intn(100) //nolint:gosec // weak generator is not an issue here + sleepWithContext(ctx, time.Duration(awaitTime)*time.Millisecond) + } + } +}