-
Notifications
You must be signed in to change notification settings - Fork 278
fix(api): race conditions with multiple APIs and fresh orchestrators #2191
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
7b7b576
278593a
28ef543
dba2377
a0c2968
b9726e4
b342645
d91442f
3e8e22f
ad5d1ff
5da23c0
ed51bf9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ package orchestrator | |
| import ( | ||
| "context" | ||
| "fmt" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/google/uuid" | ||
|
|
@@ -21,28 +22,60 @@ func (o *Orchestrator) connectToNode(ctx context.Context, discovered nodemanager | |
| ctx, childSpan := tracer.Start(ctx, "connect-to-node") | ||
| defer childSpan.End() | ||
|
|
||
| orchestratorNode, err := nodemanager.New(ctx, o.tel.TracerProvider, o.tel.MeterProvider, discovered) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| _, err, _ := o.connectGroup.Do(discovered.NomadNodeShortID, func() (any, error) { | ||
| // Re-check inside the singleflight to prevent race issues due to overwriting existing nodes in the map | ||
| if o.GetNodeByNomadShortID(discovered.NomadNodeShortID) != nil { | ||
| return nil, nil | ||
| } | ||
|
|
||
| // Update host metrics from service info | ||
| o.registerNode(orchestratorNode) | ||
| connectCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), nodeConnectTimeout) | ||
| defer cancel() | ||
|
|
||
| return nil | ||
| orchestratorNode, err := nodemanager.New(connectCtx, o.tel.TracerProvider, o.tel.MeterProvider, discovered) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| o.registerNode(orchestratorNode) | ||
|
|
||
| return nil, nil | ||
| }) | ||
|
|
||
| return err | ||
| } | ||
|
|
||
| func (o *Orchestrator) connectToClusterNode(ctx context.Context, cluster *clusters.Cluster, i *clusters.Instance) { | ||
| orchestratorNode, err := nodemanager.NewClusterNode(ctx, i.GetClient(), cluster.ID, cluster.SandboxDomain, i) | ||
| if err != nil { | ||
| logger.L().Error(ctx, "Failed to create node", zap.Error(err)) | ||
| ctx, span := tracer.Start(ctx, "connect-to-cluster-node") | ||
| defer span.End() | ||
|
|
||
| return | ||
| } | ||
| // connectGroup is keyed by scopedNodeID so that concurrent callers targeting | ||
| // the same cluster instance share a single dial attempt. | ||
| scopedKey := o.scopedNodeID(cluster.ID, i.NodeID) | ||
|
|
||
| o.connectGroup.Do(scopedKey, func() (any, error) { //nolint:errcheck | ||
| // Re-check inside the singleflight for the same reason as connectToNode. | ||
| if o.GetNode(cluster.ID, i.NodeID) != nil { | ||
| return nil, nil | ||
| } | ||
|
|
||
| connectCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), nodeConnectTimeout) | ||
| defer cancel() | ||
|
|
||
| orchestratorNode, err := nodemanager.NewClusterNode(connectCtx, i.GetClient(), cluster.ID, cluster.SandboxDomain, i) | ||
| if err != nil { | ||
| logger.L().Error(ctx, "Failed to create node", zap.Error(err)) | ||
|
|
||
| o.registerNode(orchestratorNode) | ||
| return nil, nil | ||
| } | ||
|
|
||
| o.registerNode(orchestratorNode) | ||
|
|
||
| return nil, nil | ||
| }) | ||
| } | ||
|
|
||
| // registerNode adds the given node to the in-memory map of nodes | ||
| // It has to be called only once per node | ||
| func (o *Orchestrator) registerNode(node *nodemanager.Node) { | ||
| scopedKey := o.scopedNodeID(node.ClusterID, node.ID) | ||
| o.nodes.Insert(scopedKey, node) | ||
|
|
@@ -94,6 +127,114 @@ func (o *Orchestrator) GetNode(clusterID uuid.UUID, nodeID string) *nodemanager. | |
| return n | ||
| } | ||
|
|
||
| // getOrConnectNode returns a node from the in-memory cache. When the node is absent it | ||
| // performs a targeted on-demand discovery and connection attempt, handling the race | ||
| // condition where a new orchestrator joined the cluster after this API instance's last | ||
| // sync cycle but another API instance already routed a sandbox there. | ||
| // | ||
| // There are two distinct gaps that must be covered: | ||
| // - Gap 1 (0–5 s for clusters, 0–20 s for Nomad): the node exists in the upstream | ||
| // source (Nomad / remote service discovery) but has not yet been pulled into the | ||
| // local instance map by the background sync loop. | ||
| // - Gap 2 (0–20 s): the node is in the local instance map but has not yet been | ||
| // promoted into o.nodes by keepInSync. | ||
|
Comment on lines
+139
to
+140
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can simplify by removing two layers os nodes sync. This should remove duplicated logic, but it should be done separately as it will introduce lot of changes. |
||
| // | ||
| // discoveryGroup ensures that concurrent requests targeting the same missing | ||
| // node share a single discovery attempt rather than fanning out. | ||
| func (o *Orchestrator) getOrConnectNode(ctx context.Context, clusterID uuid.UUID, nodeID string) *nodemanager.Node { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add some tracing here so we can see why some sandbox requests will be slower. |
||
| ctx, span := tracer.Start(ctx, "get-or-connect-node") | ||
| defer span.End() | ||
|
|
||
| if node := o.GetNode(clusterID, nodeID); node != nil { | ||
| return node | ||
| } | ||
|
|
||
| logger.L().Warn(ctx, "Node not found in cache, attempting on-demand connection", | ||
| logger.WithNodeID(nodeID), | ||
| zap.String("cluster_id", clusterID.String()), | ||
| ) | ||
|
|
||
| scopedKey := o.scopedNodeID(clusterID, nodeID) | ||
|
|
||
| o.discoveryGroup.Do(scopedKey, func() (any, error) { //nolint:errcheck | ||
| // Re-check inside the singleflight | ||
| if node := o.GetNode(clusterID, nodeID); node != nil { | ||
| return nil, nil | ||
| } | ||
|
|
||
| connectCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), cacheSyncTime) | ||
| defer cancel() | ||
|
|
||
| if clusterID == consts.LocalClusterID { | ||
| o.discoverNomadNodes(connectCtx) | ||
| } else { | ||
| o.discoverClusterNode(connectCtx, clusterID) | ||
| } | ||
|
|
||
| return nil, nil | ||
| }) | ||
|
|
||
| return o.GetNode(clusterID, nodeID) | ||
| } | ||
|
|
||
| // discoverNomadNodes lists all ready Nomad nodes and connects any that are not yet in the pool. | ||
| // Once a new node is connected its orchestrator ID becomes the map key, making subsequent GetNode calls succeed. | ||
| func (o *Orchestrator) discoverNomadNodes(ctx context.Context) { | ||
| ctx, span := tracer.Start(ctx, "discover-nomad-nodes") | ||
| defer span.End() | ||
|
|
||
| nomadNodes, err := o.listNomadNodes(ctx) | ||
| if err != nil { | ||
| logger.L().Error(ctx, "Error listing Nomad nodes during on-demand discovery", zap.Error(err)) | ||
|
|
||
| return | ||
| } | ||
|
|
||
| var wg sync.WaitGroup | ||
| defer wg.Wait() | ||
|
|
||
| for _, n := range nomadNodes { | ||
| if o.GetNodeByNomadShortID(n.NomadNodeShortID) == nil { | ||
| wg.Go(func() { | ||
| if err := o.connectToNode(ctx, n); err != nil { | ||
sitole marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| logger.L().Error(ctx, "Error connecting to Nomad node on demand", | ||
| zap.Error(err), zap.String("nomad_short_id", n.NomadNodeShortID)) | ||
| } | ||
| }) | ||
| } | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
|
|
||
| // discoverClusterNode forces a fresh service discovery query so that nodes which joined after the | ||
| // last periodic sync are pulled into cluster.instances, then opportunistically connects all | ||
| // unknown nodes into o.nodes (not just the target), avoiding repeated on-demand discoveries. | ||
| func (o *Orchestrator) discoverClusterNode(ctx context.Context, clusterID uuid.UUID) { | ||
| ctx, span := tracer.Start(ctx, "discover-cluster-node") | ||
| defer span.End() | ||
|
|
||
| cluster, found := o.clusters.GetClusterById(clusterID) | ||
| if !found { | ||
| logger.L().Error(ctx, "Cluster not found during on-demand node discovery", logger.WithClusterID(clusterID)) | ||
|
|
||
| return | ||
| } | ||
|
|
||
| if err := cluster.SyncInstances(ctx); err != nil { | ||
| logger.L().Error(ctx, "Error syncing cluster instances during on-demand node discovery", zap.Error(err), logger.WithClusterID(clusterID)) | ||
|
|
||
| return | ||
| } | ||
|
|
||
| var wg sync.WaitGroup | ||
| defer wg.Wait() | ||
|
|
||
| for _, instance := range cluster.GetOrchestrators() { | ||
| wg.Go(func() { | ||
| o.connectToClusterNode(ctx, cluster, instance) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func (o *Orchestrator) GetClusterNodes(clusterID uuid.UUID) []*nodemanager.Node { | ||
| clusterNodes := make([]*nodemanager.Node, 0) | ||
| for _, n := range o.nodes.Items() { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.