diff --git a/assets/crd/microshift.io_remoteclusters.yaml b/assets/crd/microshift.io_remoteclusters.yaml index daf41edbed..d521b766f0 100644 --- a/assets/crd/microshift.io_remoteclusters.yaml +++ b/assets/crd/microshift.io_remoteclusters.yaml @@ -66,6 +66,27 @@ spec: lastSuccessfulProbe: format: date-time type: string + latency: + description: Latency statistics from recent probes (rolling window + of 20 samples). + properties: + avg: + type: string + last: + type: string + max: + type: string + min: + type: string + stddev: + type: string + required: + - avg + - last + - max + - min + - stddev + type: object state: default: NeverProbed enum: diff --git a/pkg/apis/microshift/v1alpha1/types.go b/pkg/apis/microshift/v1alpha1/types.go index b3b68cbbd2..d80140d67c 100644 --- a/pkg/apis/microshift/v1alpha1/types.go +++ b/pkg/apis/microshift/v1alpha1/types.go @@ -42,6 +42,19 @@ type RemoteClusterStatus struct { LastProbeTime *metav1.Time `json:"lastProbeTime,omitempty"` // +optional Errors []string `json:"errors,omitempty"` + // Latency statistics from recent probes (rolling window of 20 samples). + // +optional + Latency *LatencyStats `json:"latency,omitempty"` +} + +// LatencyStats contains latency statistics computed from a rolling window of probe samples. +// All duration values are serialized as Go duration strings (e.g. "1.234ms"). +type LatencyStats struct { + Avg metav1.Duration `json:"avg"` + Min metav1.Duration `json:"min"` + Max metav1.Duration `json:"max"` + Last metav1.Duration `json:"last"` + Stddev metav1.Duration `json:"stddev"` } // +kubebuilder:object:root=true diff --git a/pkg/apis/microshift/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/microshift/v1alpha1/zz_generated.deepcopy.go index 0cdb32da4d..896a4c96d1 100644 --- a/pkg/apis/microshift/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/microshift/v1alpha1/zz_generated.deepcopy.go @@ -8,6 +8,26 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LatencyStats) DeepCopyInto(out *LatencyStats) { + *out = *in + out.Avg = in.Avg + out.Min = in.Min + out.Max = in.Max + out.Last = in.Last + out.Stddev = in.Stddev +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LatencyStats. +func (in *LatencyStats) DeepCopy() *LatencyStats { + if in == nil { + return nil + } + out := new(LatencyStats) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RemoteCluster) DeepCopyInto(out *RemoteCluster) { *out = *in @@ -99,6 +119,11 @@ func (in *RemoteClusterStatus) DeepCopyInto(out *RemoteClusterStatus) { *out = make([]string, len(*in)) copy(*out, *in) } + if in.Latency != nil { + in, out := &in.Latency, &out.Latency + *out = new(LatencyStats) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RemoteClusterStatus. diff --git a/pkg/controllers/c2cc/latency.go b/pkg/controllers/c2cc/latency.go new file mode 100644 index 0000000000..b0d2bc0b16 --- /dev/null +++ b/pkg/controllers/c2cc/latency.go @@ -0,0 +1,74 @@ +package c2cc + +import ( + "math" + "time" + + microshiftv1alpha1 "github.com/openshift/microshift/pkg/apis/microshift/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const windowSize = 20 + +type latencyWindow struct { + samples [windowSize]time.Duration + pos int + count int +} + +func (w *latencyWindow) add(d time.Duration) { + w.samples[w.pos] = d + w.pos = (w.pos + 1) % windowSize + if w.count < windowSize { + w.count++ + } +} + +func (w *latencyWindow) stats() *microshiftv1alpha1.LatencyStats { + if w.count == 0 { + return nil + } + + n := w.count + start := 0 + if n == windowSize { + start = w.pos + } + + var sum time.Duration + minD := w.samples[start] + maxD := w.samples[start] + var last time.Duration + + for i := 0; i < n; i++ { + idx := (start + i) % windowSize + d := w.samples[idx] + sum += d + if d < minD { + minD = d + } + if d > maxD { + maxD = d + } + last = d + } + + avg := sum / time.Duration(n) + + var varianceSum float64 + avgF := float64(avg) + for i := 0; i < n; i++ { + idx := (start + i) % windowSize + diff := float64(w.samples[idx]) - avgF + varianceSum += diff * diff + } + stddev := time.Duration(math.Sqrt(varianceSum / float64(n))) + + return µshiftv1alpha1.LatencyStats{ + Avg: metav1.Duration{Duration: avg}, + Min: metav1.Duration{Duration: minD}, + Max: metav1.Duration{Duration: maxD}, + Last: metav1.Duration{Duration: last}, + Stddev: metav1.Duration{Duration: stddev}, + } +} diff --git a/pkg/controllers/c2cc/latency_test.go b/pkg/controllers/c2cc/latency_test.go new file mode 100644 index 0000000000..04ac28e7f5 --- /dev/null +++ b/pkg/controllers/c2cc/latency_test.go @@ -0,0 +1,100 @@ +package c2cc + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestLatencyWindow_EmptyReturnsNil(t *testing.T) { + w := &latencyWindow{} + assert.Nil(t, w.stats()) +} + +func TestLatencyWindow_SingleSample(t *testing.T) { + w := &latencyWindow{} + w.add(10 * time.Millisecond) + + s := w.stats() + require.NotNil(t, s) + assert.Equal(t, 10*time.Millisecond, s.Avg.Duration) + assert.Equal(t, 10*time.Millisecond, s.Min.Duration) + assert.Equal(t, 10*time.Millisecond, s.Max.Duration) + assert.Equal(t, 10*time.Millisecond, s.Last.Duration) + assert.Equal(t, time.Duration(0), s.Stddev.Duration) +} + +func TestLatencyWindow_PartialFill(t *testing.T) { + w := &latencyWindow{} + w.add(10 * time.Millisecond) + w.add(20 * time.Millisecond) + w.add(30 * time.Millisecond) + + s := w.stats() + require.NotNil(t, s) + assert.Equal(t, 20*time.Millisecond, s.Avg.Duration) + assert.Equal(t, 10*time.Millisecond, s.Min.Duration) + assert.Equal(t, 30*time.Millisecond, s.Max.Duration) + assert.Equal(t, 30*time.Millisecond, s.Last.Duration) + assert.True(t, s.Stddev.Duration > 0, "stddev should be > 0 for varied samples") +} + +func TestLatencyWindow_FullWindowWraps(t *testing.T) { + w := &latencyWindow{} + + // Fill with 25 samples: values 1ms through 25ms + for i := 1; i <= 25; i++ { + w.add(time.Duration(i) * time.Millisecond) + } + + assert.Equal(t, windowSize, w.count) + + s := w.stats() + require.NotNil(t, s) + + // Last 20 samples: 6ms through 25ms + assert.Equal(t, 25*time.Millisecond, s.Last.Duration) + assert.Equal(t, 6*time.Millisecond, s.Min.Duration) + assert.Equal(t, 25*time.Millisecond, s.Max.Duration) + + // Avg of 6..25 = (6+25)/2 = 15.5ms, truncated to 15ms by integer division + expectedAvg := time.Duration(15) * time.Millisecond + assert.InDelta(t, float64(expectedAvg), float64(s.Avg.Duration), float64(time.Millisecond)) +} + +func TestLatencyWindow_StatsComputation(t *testing.T) { + w := &latencyWindow{} + // 5 samples: 100, 200, 300, 400, 500 µs + for i := 1; i <= 5; i++ { + w.add(time.Duration(i*100) * time.Microsecond) + } + + s := w.stats() + require.NotNil(t, s) + + // avg = 300µs + assert.Equal(t, 300*time.Microsecond, s.Avg.Duration) + assert.Equal(t, 100*time.Microsecond, s.Min.Duration) + assert.Equal(t, 500*time.Microsecond, s.Max.Duration) + assert.Equal(t, 500*time.Microsecond, s.Last.Duration) + + // stddev = sqrt(mean((x-300)^2)) = sqrt((40000+10000+0+10000+40000)/5) = sqrt(20000) ≈ 141.4µs + assert.InDelta(t, float64(141*time.Microsecond), float64(s.Stddev.Duration), float64(2*time.Microsecond)) +} + +func TestLatencyWindow_IdenticalSamples(t *testing.T) { + w := &latencyWindow{} + for i := 0; i < 10; i++ { + w.add(5 * time.Millisecond) + } + + s := w.stats() + require.NotNil(t, s) + assert.Equal(t, 5*time.Millisecond, s.Avg.Duration) + assert.Equal(t, 5*time.Millisecond, s.Min.Duration) + assert.Equal(t, 5*time.Millisecond, s.Max.Duration) + assert.Equal(t, 5*time.Millisecond, s.Last.Duration) + assert.Equal(t, time.Duration(0), s.Stddev.Duration) +} diff --git a/pkg/controllers/c2cc/probe.go b/pkg/controllers/c2cc/probe.go index 77b8135f99..88af3df033 100644 --- a/pkg/controllers/c2cc/probe.go +++ b/pkg/controllers/c2cc/probe.go @@ -58,8 +58,9 @@ func RunProbe(ctx context.Context) error { }() pm := &probeManager{ - client: msClient, - probes: make(map[string]context.CancelFunc), + client: msClient, + probes: make(map[string]context.CancelFunc), + latencies: make(map[string]*latencyWindow), } factory := microshiftinformers.NewSharedInformerFactory(msClient, informerResync) @@ -110,9 +111,10 @@ func RunProbe(ctx context.Context) error { } type probeManager struct { - client microshiftclientset.Interface - mu sync.Mutex - probes map[string]context.CancelFunc + client microshiftclientset.Interface + mu sync.Mutex + probes map[string]context.CancelFunc + latencies map[string]*latencyWindow } func (pm *probeManager) startProbe(ctx context.Context, rc *microshiftv1alpha1.RemoteCluster) { @@ -125,10 +127,11 @@ func (pm *probeManager) startProbe(ctx context.Context, rc *microshiftv1alpha1.R probeCtx, cancel := context.WithCancel(ctx) pm.probes[rc.Name] = cancel + pm.latencies[rc.Name] = &latencyWindow{} klog.Infof("Starting probe for %q (target=%s, interval=%s)", rc.Name, rc.Spec.ProbeTarget, rc.Spec.ProbeInterval.Duration) - go pm.runProbeLoop(probeCtx, rc.Name, rc.Spec.ProbeTarget, rc.Spec.ProbeInterval.Duration) + go pm.runProbeLoop(probeCtx, rc.Name, rc.Spec.ProbeTarget, rc.Spec.ProbeInterval.Duration, pm.latencies[rc.Name]) } func (pm *probeManager) restartProbe(ctx context.Context, rc *microshiftv1alpha1.RemoteCluster) { @@ -143,6 +146,7 @@ func (pm *probeManager) stopProbe(name string) { if cancel, exists := pm.probes[name]; exists { cancel() delete(pm.probes, name) + delete(pm.latencies, name) klog.Infof("Stopped probe for %q", name) } } @@ -154,10 +158,11 @@ func (pm *probeManager) stopAll() { for name, cancel := range pm.probes { cancel() delete(pm.probes, name) + delete(pm.latencies, name) } } -func (pm *probeManager) runProbeLoop(ctx context.Context, name, target string, interval time.Duration) { +func (pm *probeManager) runProbeLoop(ctx context.Context, name, target string, interval time.Duration, window *latencyWindow) { httpClient := &http.Client{Timeout: probeHTTPTimeout} consecutiveFailures := 0 url := "http://" + target + "/" @@ -170,7 +175,7 @@ func (pm *probeManager) runProbeLoop(ctx context.Context, name, target string, i case <-ctx.Done(): return case <-ticker.C: - probeErr := doProbe(ctx, httpClient, url) + rtt, probeErr := doProbe(ctx, httpClient, url) now := metav1.Now() status := microshiftv1alpha1.RemoteClusterStatus{ @@ -191,8 +196,11 @@ func (pm *probeManager) runProbeLoop(ctx context.Context, name, target string, i consecutiveFailures = 0 status.State = "Healthy" status.LastSuccessfulProbe = &now + window.add(rtt) } + status.Latency = window.stats() + if err := pm.updateStatus(ctx, name, status); err != nil { klog.Errorf("Failed to update status for %q: %v", name, err) } @@ -200,14 +208,16 @@ func (pm *probeManager) runProbeLoop(ctx context.Context, name, target string, i } } -func doProbe(ctx context.Context, client *http.Client, url string) error { +func doProbe(ctx context.Context, client *http.Client, url string) (time.Duration, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { - return fmt.Errorf("failed to create request: %w", err) + return 0, fmt.Errorf("failed to create request: %w", err) } + start := time.Now() resp, err := client.Do(req) // #nosec G704 -- URL built from trusted RemoteCluster CR spec + rtt := time.Since(start) if err != nil { - return fmt.Errorf("failed to execute probe request: %w", err) + return 0, fmt.Errorf("failed to execute probe request: %w", err) } defer func() { if err := resp.Body.Close(); err != nil { @@ -215,9 +225,9 @@ func doProbe(ctx context.Context, client *http.Client, url string) error { } }() if resp.StatusCode != http.StatusOK { - return fmt.Errorf("failed with unexpected status %d", resp.StatusCode) + return 0, fmt.Errorf("failed with unexpected status %d", resp.StatusCode) } - return nil + return rtt, nil } func (pm *probeManager) updateStatus(ctx context.Context, name string, status microshiftv1alpha1.RemoteClusterStatus) error { @@ -229,10 +239,13 @@ func (pm *probeManager) updateStatus(ctx context.Context, name string, status mi return fmt.Errorf("failed to get RemoteCluster %q: %w", name, err) } - // Preserve LastSuccessfulProbe from the existing status if this probe failed + // Preserve LastSuccessfulProbe & Latency from the existing status if this probe failed if rc.Status.LastSuccessfulProbe != nil && status.LastSuccessfulProbe == nil { status.LastSuccessfulProbe = rc.Status.LastSuccessfulProbe } + if rc.Status.Latency != nil && status.Latency == nil { + status.Latency = rc.Status.Latency + } rc.Status = status _, err = rcClient.UpdateStatus(ctx, rc, metav1.UpdateOptions{}) diff --git a/test/suites/c2cc/probe.robot b/test/suites/c2cc/probe.robot index e58e123193..48c029a283 100644 --- a/test/suites/c2cc/probe.robot +++ b/test/suites/c2cc/probe.robot @@ -78,6 +78,14 @@ RemoteCluster Status Has LastSuccessfulProbe END END +RemoteCluster Status Has Latency Stats + [Documentation] Verify that all latency stat fields (avg/min/max/last/stddev) are populated + ... on all RemoteCluster CRs across all clusters. + FOR ${alias} IN cluster-a cluster-b cluster-c + Wait Until Keyword Succeeds 2m 10s + ... Verify Latency Stats Populated ${alias} + END + Probe Deployment Self-Heals After Deletion [Documentation] Delete the probe deployment and verify it is recreated by the controller. Oc On Cluster cluster-a @@ -208,3 +216,21 @@ Delete Probe Deny Policy [Arguments] ${alias} Oc On Cluster ${alias} ... oc delete networkpolicy deny-probe-ingress -n ${C2CC_NAMESPACE} --ignore-not-found + +Verify Latency Stats Populated + [Documentation] Check that all latency fields (avg/min/max/last/stddev) are populated + ... on all RemoteCluster CRs for the given cluster. + [Arguments] ${alias} + FOR ${field} IN avg min max last stddev + ${stdout}= Oc On Cluster ${alias} + ... oc get remoteclusters.microshift.io -o jsonpath='{.items[*].status.latency.${field}}' + Should Not Be Empty ${stdout} + @{values}= Split String ${stdout} + ${count}= Get Length ${values} + # "2" is expected because there are two remote clusters, + # so the above jsonpath provides values from both remote cluster CR. + Should Be Equal As Integers ${count} 2 Expected 2 latency ${field} values, got ${count} + FOR ${v} IN @{values} + Should Not Be Empty ${v} + END + END