Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions assets/crd/microshift.io_remoteclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,27 @@ spec:
lastSuccessfulProbe:
format: date-time
type: string
latency:
description: Latency statistics from recent probes (rolling window
of 20 samples).
properties:
avg:
type: string
last:
type: string
max:
type: string
min:
type: string
stddev:
type: string
required:
- avg
- last
- max
- min
- stddev
type: object
state:
default: NeverProbed
enum:
Expand Down
13 changes: 13 additions & 0 deletions pkg/apis/microshift/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,19 @@ type RemoteClusterStatus struct {
LastProbeTime *metav1.Time `json:"lastProbeTime,omitempty"`
// +optional
Errors []string `json:"errors,omitempty"`
// Latency statistics from recent probes (rolling window of 20 samples).
// +optional
Latency *LatencyStats `json:"latency,omitempty"`
}

// LatencyStats contains latency statistics computed from a rolling window of probe samples.
// All duration values are serialized as Go duration strings (e.g. "1.234ms").
type LatencyStats struct {
Avg metav1.Duration `json:"avg"`
Min metav1.Duration `json:"min"`
Max metav1.Duration `json:"max"`
Last metav1.Duration `json:"last"`
Stddev metav1.Duration `json:"stddev"`
}

// +kubebuilder:object:root=true
Expand Down
25 changes: 25 additions & 0 deletions pkg/apis/microshift/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 74 additions & 0 deletions pkg/controllers/c2cc/latency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package c2cc

import (
"math"
"time"

microshiftv1alpha1 "github.com/openshift/microshift/pkg/apis/microshift/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const windowSize = 20

type latencyWindow struct {
samples [windowSize]time.Duration
pos int
count int
}

func (w *latencyWindow) add(d time.Duration) {
w.samples[w.pos] = d
w.pos = (w.pos + 1) % windowSize
if w.count < windowSize {
w.count++
}
}

func (w *latencyWindow) stats() *microshiftv1alpha1.LatencyStats {
if w.count == 0 {
return nil
}

n := w.count
start := 0
if n == windowSize {
start = w.pos
}

var sum time.Duration
minD := w.samples[start]
maxD := w.samples[start]
var last time.Duration

for i := 0; i < n; i++ {
idx := (start + i) % windowSize
d := w.samples[idx]
sum += d
if d < minD {
minD = d
}
if d > maxD {
maxD = d
}
last = d
}

avg := sum / time.Duration(n)

var varianceSum float64
avgF := float64(avg)
for i := 0; i < n; i++ {
idx := (start + i) % windowSize
diff := float64(w.samples[idx]) - avgF
varianceSum += diff * diff
}
stddev := time.Duration(math.Sqrt(varianceSum / float64(n)))

return &microshiftv1alpha1.LatencyStats{
Avg: metav1.Duration{Duration: avg},
Min: metav1.Duration{Duration: minD},
Max: metav1.Duration{Duration: maxD},
Last: metav1.Duration{Duration: last},
Stddev: metav1.Duration{Duration: stddev},
}
}
100 changes: 100 additions & 0 deletions pkg/controllers/c2cc/latency_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package c2cc

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestLatencyWindow_EmptyReturnsNil(t *testing.T) {
w := &latencyWindow{}
assert.Nil(t, w.stats())
}

func TestLatencyWindow_SingleSample(t *testing.T) {
w := &latencyWindow{}
w.add(10 * time.Millisecond)

s := w.stats()
require.NotNil(t, s)
assert.Equal(t, 10*time.Millisecond, s.Avg.Duration)
assert.Equal(t, 10*time.Millisecond, s.Min.Duration)
assert.Equal(t, 10*time.Millisecond, s.Max.Duration)
assert.Equal(t, 10*time.Millisecond, s.Last.Duration)
assert.Equal(t, time.Duration(0), s.Stddev.Duration)
}

func TestLatencyWindow_PartialFill(t *testing.T) {
w := &latencyWindow{}
w.add(10 * time.Millisecond)
w.add(20 * time.Millisecond)
w.add(30 * time.Millisecond)

s := w.stats()
require.NotNil(t, s)
assert.Equal(t, 20*time.Millisecond, s.Avg.Duration)
assert.Equal(t, 10*time.Millisecond, s.Min.Duration)
assert.Equal(t, 30*time.Millisecond, s.Max.Duration)
assert.Equal(t, 30*time.Millisecond, s.Last.Duration)
assert.True(t, s.Stddev.Duration > 0, "stddev should be > 0 for varied samples")
}

func TestLatencyWindow_FullWindowWraps(t *testing.T) {
w := &latencyWindow{}

// Fill with 25 samples: values 1ms through 25ms
for i := 1; i <= 25; i++ {
w.add(time.Duration(i) * time.Millisecond)
}

assert.Equal(t, windowSize, w.count)

s := w.stats()
require.NotNil(t, s)

// Last 20 samples: 6ms through 25ms
assert.Equal(t, 25*time.Millisecond, s.Last.Duration)
assert.Equal(t, 6*time.Millisecond, s.Min.Duration)
assert.Equal(t, 25*time.Millisecond, s.Max.Duration)

// Avg of 6..25 = (6+25)/2 = 15.5ms, truncated to 15ms by integer division
expectedAvg := time.Duration(15) * time.Millisecond
assert.InDelta(t, float64(expectedAvg), float64(s.Avg.Duration), float64(time.Millisecond))
}

func TestLatencyWindow_StatsComputation(t *testing.T) {
w := &latencyWindow{}
// 5 samples: 100, 200, 300, 400, 500 µs
for i := 1; i <= 5; i++ {
w.add(time.Duration(i*100) * time.Microsecond)
}

s := w.stats()
require.NotNil(t, s)

// avg = 300µs
assert.Equal(t, 300*time.Microsecond, s.Avg.Duration)
assert.Equal(t, 100*time.Microsecond, s.Min.Duration)
assert.Equal(t, 500*time.Microsecond, s.Max.Duration)
assert.Equal(t, 500*time.Microsecond, s.Last.Duration)

// stddev = sqrt(mean((x-300)^2)) = sqrt((40000+10000+0+10000+40000)/5) = sqrt(20000) ≈ 141.4µs
assert.InDelta(t, float64(141*time.Microsecond), float64(s.Stddev.Duration), float64(2*time.Microsecond))
}

func TestLatencyWindow_IdenticalSamples(t *testing.T) {
w := &latencyWindow{}
for i := 0; i < 10; i++ {
w.add(5 * time.Millisecond)
}

s := w.stats()
require.NotNil(t, s)
assert.Equal(t, 5*time.Millisecond, s.Avg.Duration)
assert.Equal(t, 5*time.Millisecond, s.Min.Duration)
assert.Equal(t, 5*time.Millisecond, s.Max.Duration)
assert.Equal(t, 5*time.Millisecond, s.Last.Duration)
assert.Equal(t, time.Duration(0), s.Stddev.Duration)
}
41 changes: 27 additions & 14 deletions pkg/controllers/c2cc/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ func RunProbe(ctx context.Context) error {
}()

pm := &probeManager{
client: msClient,
probes: make(map[string]context.CancelFunc),
client: msClient,
probes: make(map[string]context.CancelFunc),
latencies: make(map[string]*latencyWindow),
}

factory := microshiftinformers.NewSharedInformerFactory(msClient, informerResync)
Expand Down Expand Up @@ -110,9 +111,10 @@ func RunProbe(ctx context.Context) error {
}

type probeManager struct {
client microshiftclientset.Interface
mu sync.Mutex
probes map[string]context.CancelFunc
client microshiftclientset.Interface
mu sync.Mutex
probes map[string]context.CancelFunc
latencies map[string]*latencyWindow
}

func (pm *probeManager) startProbe(ctx context.Context, rc *microshiftv1alpha1.RemoteCluster) {
Expand All @@ -125,10 +127,11 @@ func (pm *probeManager) startProbe(ctx context.Context, rc *microshiftv1alpha1.R

probeCtx, cancel := context.WithCancel(ctx)
pm.probes[rc.Name] = cancel
pm.latencies[rc.Name] = &latencyWindow{}

klog.Infof("Starting probe for %q (target=%s, interval=%s)",
rc.Name, rc.Spec.ProbeTarget, rc.Spec.ProbeInterval.Duration)
go pm.runProbeLoop(probeCtx, rc.Name, rc.Spec.ProbeTarget, rc.Spec.ProbeInterval.Duration)
go pm.runProbeLoop(probeCtx, rc.Name, rc.Spec.ProbeTarget, rc.Spec.ProbeInterval.Duration, pm.latencies[rc.Name])
}

func (pm *probeManager) restartProbe(ctx context.Context, rc *microshiftv1alpha1.RemoteCluster) {
Expand All @@ -143,6 +146,7 @@ func (pm *probeManager) stopProbe(name string) {
if cancel, exists := pm.probes[name]; exists {
cancel()
delete(pm.probes, name)
delete(pm.latencies, name)
klog.Infof("Stopped probe for %q", name)
}
}
Expand All @@ -154,10 +158,11 @@ func (pm *probeManager) stopAll() {
for name, cancel := range pm.probes {
cancel()
delete(pm.probes, name)
delete(pm.latencies, name)
}
}

func (pm *probeManager) runProbeLoop(ctx context.Context, name, target string, interval time.Duration) {
func (pm *probeManager) runProbeLoop(ctx context.Context, name, target string, interval time.Duration, window *latencyWindow) {
httpClient := &http.Client{Timeout: probeHTTPTimeout}
consecutiveFailures := 0
url := "http://" + target + "/"
Expand All @@ -170,7 +175,7 @@ func (pm *probeManager) runProbeLoop(ctx context.Context, name, target string, i
case <-ctx.Done():
return
case <-ticker.C:
probeErr := doProbe(ctx, httpClient, url)
rtt, probeErr := doProbe(ctx, httpClient, url)
now := metav1.Now()

status := microshiftv1alpha1.RemoteClusterStatus{
Expand All @@ -191,33 +196,38 @@ func (pm *probeManager) runProbeLoop(ctx context.Context, name, target string, i
consecutiveFailures = 0
status.State = "Healthy"
status.LastSuccessfulProbe = &now
window.add(rtt)
}

status.Latency = window.stats()

if err := pm.updateStatus(ctx, name, status); err != nil {
klog.Errorf("Failed to update status for %q: %v", name, err)
}
}
}
}

func doProbe(ctx context.Context, client *http.Client, url string) error {
func doProbe(ctx context.Context, client *http.Client, url string) (time.Duration, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
return 0, fmt.Errorf("failed to create request: %w", err)
}
start := time.Now()
resp, err := client.Do(req) // #nosec G704 -- URL built from trusted RemoteCluster CR spec
rtt := time.Since(start)
if err != nil {
return fmt.Errorf("failed to execute probe request: %w", err)
return 0, fmt.Errorf("failed to execute probe request: %w", err)
}
defer func() {
if err := resp.Body.Close(); err != nil {
klog.Errorf("Failed to close probe response body: %v", err)
}
}()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed with unexpected status %d", resp.StatusCode)
return 0, fmt.Errorf("failed with unexpected status %d", resp.StatusCode)
}
return nil
return rtt, nil
}

func (pm *probeManager) updateStatus(ctx context.Context, name string, status microshiftv1alpha1.RemoteClusterStatus) error {
Expand All @@ -229,10 +239,13 @@ func (pm *probeManager) updateStatus(ctx context.Context, name string, status mi
return fmt.Errorf("failed to get RemoteCluster %q: %w", name, err)
}

// Preserve LastSuccessfulProbe from the existing status if this probe failed
// Preserve LastSuccessfulProbe & Latency from the existing status if this probe failed
if rc.Status.LastSuccessfulProbe != nil && status.LastSuccessfulProbe == nil {
status.LastSuccessfulProbe = rc.Status.LastSuccessfulProbe
}
if rc.Status.Latency != nil && status.Latency == nil {
status.Latency = rc.Status.Latency
}

rc.Status = status
_, err = rcClient.UpdateStatus(ctx, rc, metav1.UpdateOptions{})
Expand Down
Loading