Skip to content
7 changes: 7 additions & 0 deletions packages/api/internal/clusters/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,3 +263,10 @@ func (c *Cluster) GetOrchestrators() []*Instance {
func (c *Cluster) GetResources() ClusterResource {
return c.resources
}

// SyncInstances performs an immediate synchronization of cluster instances from
// the service discovery source. It is called on-demand when a node lookup fails,
// to handle newly joined orchestrators that may not yet be in the in-memory pool.
func (c *Cluster) SyncInstances(ctx context.Context) error {
return c.synchronization.Sync(ctx)
}
167 changes: 154 additions & 13 deletions packages/api/internal/orchestrator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package orchestrator
import (
"context"
"fmt"
"sync"
"time"

"github.com/google/uuid"
Expand All @@ -21,28 +22,60 @@ func (o *Orchestrator) connectToNode(ctx context.Context, discovered nodemanager
ctx, childSpan := tracer.Start(ctx, "connect-to-node")
defer childSpan.End()

orchestratorNode, err := nodemanager.New(ctx, o.tel.TracerProvider, o.tel.MeterProvider, discovered)
if err != nil {
return err
}
_, err, _ := o.connectGroup.Do(discovered.NomadNodeShortID, func() (any, error) {
// Re-check inside the singleflight to prevent race issues due to overwriting existing nodes in the map
if o.GetNodeByNomadShortID(discovered.NomadNodeShortID) != nil {
return nil, nil
}

// Update host metrics from service info
o.registerNode(orchestratorNode)
connectCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), nodeConnectTimeout)
defer cancel()

return nil
orchestratorNode, err := nodemanager.New(connectCtx, o.tel.TracerProvider, o.tel.MeterProvider, discovered)
if err != nil {
return nil, err
}

o.registerNode(orchestratorNode)

return nil, nil
})

return err
}

func (o *Orchestrator) connectToClusterNode(ctx context.Context, cluster *clusters.Cluster, i *clusters.Instance) {
orchestratorNode, err := nodemanager.NewClusterNode(ctx, i.GetClient(), cluster.ID, cluster.SandboxDomain, i)
if err != nil {
logger.L().Error(ctx, "Failed to create node", zap.Error(err))
ctx, span := tracer.Start(ctx, "connect-to-cluster-node")
defer span.End()

return
}
// connectGroup is keyed by scopedNodeID so that concurrent callers targeting
// the same cluster instance share a single dial attempt.
scopedKey := o.scopedNodeID(cluster.ID, i.NodeID)

o.connectGroup.Do(scopedKey, func() (any, error) { //nolint:errcheck
// Re-check inside the singleflight for the same reason as connectToNode.
if o.GetNode(cluster.ID, i.NodeID) != nil {
return nil, nil
}

connectCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), nodeConnectTimeout)
defer cancel()

orchestratorNode, err := nodemanager.NewClusterNode(connectCtx, i.GetClient(), cluster.ID, cluster.SandboxDomain, i)
if err != nil {
logger.L().Error(ctx, "Failed to create node", zap.Error(err))

o.registerNode(orchestratorNode)
return nil, nil
}

o.registerNode(orchestratorNode)

return nil, nil
})
}

// registerNode adds the given node to the in-memory map of nodes
// It has to be called only once per node
func (o *Orchestrator) registerNode(node *nodemanager.Node) {
scopedKey := o.scopedNodeID(node.ClusterID, node.ID)
o.nodes.Insert(scopedKey, node)
Expand Down Expand Up @@ -94,6 +127,114 @@ func (o *Orchestrator) GetNode(clusterID uuid.UUID, nodeID string) *nodemanager.
return n
}

// getOrConnectNode returns a node from the in-memory cache. When the node is absent it
// performs a targeted on-demand discovery and connection attempt, handling the race
// condition where a new orchestrator joined the cluster after this API instance's last
// sync cycle but another API instance already routed a sandbox there.
//
// There are two distinct gaps that must be covered:
// - Gap 1 (0–5 s for clusters, 0–20 s for Nomad): the node exists in the upstream
// source (Nomad / remote service discovery) but has not yet been pulled into the
// local instance map by the background sync loop.
// - Gap 2 (0–20 s): the node is in the local instance map but has not yet been
// promoted into o.nodes by keepInSync.
Comment on lines +139 to +140
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can simplify by removing two layers os nodes sync. This should remove duplicated logic, but it should be done separately as it will introduce lot of changes.

//
// discoveryGroup ensures that concurrent requests targeting the same missing
// node share a single discovery attempt rather than fanning out.
func (o *Orchestrator) getOrConnectNode(ctx context.Context, clusterID uuid.UUID, nodeID string) *nodemanager.Node {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some tracing here so we can see why some sandbox requests will be slower.

ctx, span := tracer.Start(ctx, "get-or-connect-node")
defer span.End()

if node := o.GetNode(clusterID, nodeID); node != nil {
return node
}

logger.L().Warn(ctx, "Node not found in cache, attempting on-demand connection",
logger.WithNodeID(nodeID),
zap.String("cluster_id", clusterID.String()),
)

scopedKey := o.scopedNodeID(clusterID, nodeID)

o.discoveryGroup.Do(scopedKey, func() (any, error) { //nolint:errcheck
// Re-check inside the singleflight
if node := o.GetNode(clusterID, nodeID); node != nil {
return nil, nil
}

connectCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), cacheSyncTime)
defer cancel()

if clusterID == consts.LocalClusterID {
o.discoverNomadNodes(connectCtx)
} else {
o.discoverClusterNode(connectCtx, clusterID)
}

return nil, nil
})

return o.GetNode(clusterID, nodeID)
}

// discoverNomadNodes lists all ready Nomad nodes and connects any that are not yet in the pool.
// Once a new node is connected its orchestrator ID becomes the map key, making subsequent GetNode calls succeed.
func (o *Orchestrator) discoverNomadNodes(ctx context.Context) {
ctx, span := tracer.Start(ctx, "discover-nomad-nodes")
defer span.End()

nomadNodes, err := o.listNomadNodes(ctx)
if err != nil {
logger.L().Error(ctx, "Error listing Nomad nodes during on-demand discovery", zap.Error(err))

return
}

var wg sync.WaitGroup
defer wg.Wait()

for _, n := range nomadNodes {
if o.GetNodeByNomadShortID(n.NomadNodeShortID) == nil {
wg.Go(func() {
if err := o.connectToNode(ctx, n); err != nil {
logger.L().Error(ctx, "Error connecting to Nomad node on demand",
zap.Error(err), zap.String("nomad_short_id", n.NomadNodeShortID))
}
})
}
}
}

// discoverClusterNode forces a fresh service discovery query so that nodes which joined after the
// last periodic sync are pulled into cluster.instances, then opportunistically connects all
// unknown nodes into o.nodes (not just the target), avoiding repeated on-demand discoveries.
func (o *Orchestrator) discoverClusterNode(ctx context.Context, clusterID uuid.UUID) {
ctx, span := tracer.Start(ctx, "discover-cluster-node")
defer span.End()

cluster, found := o.clusters.GetClusterById(clusterID)
if !found {
logger.L().Error(ctx, "Cluster not found during on-demand node discovery", logger.WithClusterID(clusterID))

return
}

if err := cluster.SyncInstances(ctx); err != nil {
logger.L().Error(ctx, "Error syncing cluster instances during on-demand node discovery", zap.Error(err), logger.WithClusterID(clusterID))

return
}

var wg sync.WaitGroup
defer wg.Wait()

for _, instance := range cluster.GetOrchestrators() {
wg.Go(func() {
o.connectToClusterNode(ctx, cluster, instance)
})
}
}

func (o *Orchestrator) GetClusterNodes(clusterID uuid.UUID) []*nodemanager.Node {
clusterNodes := make([]*nodemanager.Node, 0)
for _, n := range o.nodes.Items() {
Expand Down
Loading
Loading