-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Activate Drained/Inactive version if a workflow is "moved" to it
#9147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4893d34
7136f94
7c3e682
eae0388
95f110c
9fbe766
885f033
79ae1c1
040d04d
fa26af0
b0d1138
dfc4094
1490d62
ef0fa0d
7139756
f084d50
30a5a21
cc050fe
c33dd2e
87fdbad
4c03435
38012f0
b9ed17c
39c252a
04be7bb
3a91e57
adaac7d
cf74e58
fcecd45
15a5ac6
06ad026
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| //nolint:staticcheck | ||
| package worker_versioning | ||
|
|
||
| import ( | ||
| "go.temporal.io/server/common/cache" | ||
| "go.temporal.io/server/common/metrics" | ||
| ) | ||
|
|
||
| // ReactivationSignalCache deduplicates reactivation signals to version workflows. | ||
| // | ||
| // Implementations are expected to be safe for concurrent use. | ||
| type ( | ||
| ReactivationSignalCache interface { | ||
| // ShouldSendSignal returns true if signal should be sent (not recently sent) | ||
| // and atomically marks it as sent. Returns false if recently sent. | ||
| ShouldSendSignal(namespaceID, deploymentName, buildID string) bool | ||
| } | ||
|
|
||
| reactivationSignalCacheKey struct { | ||
| namespaceID string | ||
| deploymentName string | ||
| buildID string | ||
| } | ||
|
|
||
| ReactivationSignalCacheImpl struct { | ||
| cache.Cache | ||
| metricsHandler metrics.Handler | ||
| } | ||
| ) | ||
|
|
||
| // NewReactivationSignalCache wraps the provided cache with a typed API and metrics. | ||
| func NewReactivationSignalCache(c cache.Cache, metricsHandler metrics.Handler) ReactivationSignalCache { | ||
| h := metricsHandler.WithTags(metrics.CacheTypeTag(metrics.VersionReactivationSignalCacheTypeTagValue)) | ||
| return &ReactivationSignalCacheImpl{ | ||
| Cache: c, | ||
| metricsHandler: h, | ||
| } | ||
| } | ||
|
|
||
| func (c *ReactivationSignalCacheImpl) ShouldSendSignal( | ||
| namespaceID, deploymentName, buildID string, | ||
| ) bool { | ||
| handler := c.metricsHandler.WithTags( | ||
| metrics.OperationTag(metrics.VersionReactivationSignalCacheShouldSendScope), | ||
| metrics.NamespaceIDTag(namespaceID), | ||
| ) | ||
| metrics.CacheRequests.With(handler).Record(1) | ||
|
|
||
| key := reactivationSignalCacheKey{ | ||
| namespaceID: namespaceID, | ||
| deploymentName: deploymentName, | ||
| buildID: buildID, | ||
| } | ||
|
|
||
| // Check if we recently sent a signal for this version | ||
| if c.Cache.Get(key) != nil { | ||
| // Entry exists, signal was recently sent - deduplicate | ||
| return false | ||
| } | ||
|
|
||
| // No recent signal, mark as sent and return true | ||
| metrics.CacheMissCounter.With(handler).Record(1) | ||
| c.Cache.Put(key, true) | ||
| return true | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -60,6 +60,8 @@ type Starter struct { | |
| namespace *namespace.Namespace | ||
| createOrUpdateLeaseFn api.CreateOrUpdateLeaseFunc | ||
| versionMembershipCache worker_versioning.VersionMembershipCache | ||
| reactivationSignalCache worker_versioning.ReactivationSignalCache | ||
| reactivationSignaler api.VersionReactivationSignalerFn | ||
| } | ||
|
|
||
| // creationParams is a container for all information obtained from creating the uncommitted execution. | ||
|
|
@@ -89,6 +91,8 @@ func NewStarter( | |
| request *historyservice.StartWorkflowExecutionRequest, | ||
| matchingClient matchingservice.MatchingServiceClient, | ||
| versionMembershipCache worker_versioning.VersionMembershipCache, | ||
| reactivationSignalCache worker_versioning.ReactivationSignalCache, | ||
| reactivationSignaler api.VersionReactivationSignalerFn, | ||
| createLeaseFn api.CreateOrUpdateLeaseFunc, | ||
| ) (*Starter, error) { | ||
| namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(request.GetNamespaceId()), request.StartRequest.WorkflowId) | ||
|
|
@@ -107,6 +111,8 @@ func NewStarter( | |
| namespace: namespaceEntry, | ||
| createOrUpdateLeaseFn: createLeaseFn, | ||
| versionMembershipCache: versionMembershipCache, | ||
| reactivationSignalCache: reactivationSignalCache, | ||
| reactivationSignaler: reactivationSignaler, | ||
| }, nil | ||
| } | ||
|
|
||
|
|
@@ -208,12 +214,21 @@ func (s *Starter) Invoke( | |
| var currentWorkflowConditionFailedError *persistence.CurrentWorkflowConditionFailedError | ||
| if errors.As(err, ¤tWorkflowConditionFailedError) && len(currentWorkflowConditionFailedError.RunID) > 0 { | ||
| // The history and mutable state generated above will be deleted by a background process. | ||
| return s.handleConflict(ctx, creationParams, currentWorkflowConditionFailedError) | ||
| resp, outcome, conflictErr := s.handleConflict(ctx, creationParams, currentWorkflowConditionFailedError) | ||
| if conflictErr == nil && outcome == StartNew { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the main idea here was to only send the reactivation signal iff we have actually started/created a new workflow execution - all other outcomes, except StartNew, do not do that. |
||
| // Notify version workflow if we are starting a workflow execution on a potentially drained version. | ||
| // Only signal when a new workflow was actually created (StartNew), not for deduped retries | ||
| // (StartDeduped) or reused existing workflows (StartReused) where the pinned override is not applied. | ||
| api.ReactivateVersionWorkflowIfPinned(ctx, s.namespace, s.request.StartRequest.GetVersioningOverride(), s.reactivationSignalCache, s.reactivationSignaler, s.shardContext.GetConfig().EnableVersionReactivationSignals()) | ||
| } | ||
| return resp, outcome, conflictErr | ||
|
Shivs11 marked this conversation as resolved.
|
||
| } | ||
|
|
||
| return nil, StartErr, err | ||
| } | ||
|
|
||
| // Notify version workflow if we're pinning to a potentially drained version | ||
| api.ReactivateVersionWorkflowIfPinned(ctx, s.namespace, s.request.StartRequest.GetVersioningOverride(), s.reactivationSignalCache, s.reactivationSignaler, s.shardContext.GetConfig().EnableVersionReactivationSignals()) | ||
|
|
||
| resp, err = s.generateResponse( | ||
| creationParams.runID, | ||
| creationParams.workflowTaskInfo, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| package api | ||
|
|
||
| import ( | ||
| "context" | ||
|
|
||
| workflowpb "go.temporal.io/api/workflow/v1" | ||
| "go.temporal.io/server/common/namespace" | ||
| "go.temporal.io/server/common/worker_versioning" | ||
| ) | ||
|
|
||
| // VersionReactivationSignalerFn is a function type for sending reactivation signals to version workflows. | ||
| // This abstraction allows the history API layer to use the deployment client without importing it directly, | ||
| // avoiding import cycles between history/api and worker/workerdeployment packages. | ||
| type VersionReactivationSignalerFn func( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any reason for this type instead of passing client itself?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. circular dependencies; my first stab at this involved me just directly passing the workerDeployment.Client, but that did not go well as: service/history/api → service/worker/workerdeployment → service/history/api |
||
| ctx context.Context, | ||
| namespaceEntry *namespace.Namespace, | ||
| deploymentName, buildID string, | ||
| ) error | ||
|
|
||
| // ReactivateVersionWorkflowIfPinned sends a reactivation signal to the version workflow | ||
| // when workflows are pinned to a potentially DRAINED/INACTIVE version. It also deduplicates | ||
| // signals within the cache TTL window. | ||
| // This is a fire-and-forget operation - the signal is sent asynchronously and errors are | ||
| // logged by the signaler implementation. | ||
| // | ||
| //nolint:revive,errcheck | ||
| func ReactivateVersionWorkflowIfPinned( | ||
|
carlydf marked this conversation as resolved.
|
||
| ctx context.Context, | ||
| namespaceEntry *namespace.Namespace, | ||
| override *workflowpb.VersioningOverride, | ||
| signalCache worker_versioning.ReactivationSignalCache, | ||
| signaler VersionReactivationSignalerFn, | ||
| enabled bool, | ||
| ) { | ||
| // Check if signals are enabled globally | ||
| if !enabled { | ||
| return | ||
| } | ||
|
|
||
| // Only process if we're pinning to a specific version | ||
| if !worker_versioning.OverrideIsPinned(override) { | ||
| return | ||
| } | ||
|
|
||
| pinnedVersion := worker_versioning.GetOverridePinnedVersion(override) | ||
| if pinnedVersion == nil { | ||
| return | ||
| } | ||
|
|
||
| // Check cache - skip if signal was recently sent | ||
| if signalCache != nil && !signalCache.ShouldSendSignal( | ||
| namespaceEntry.ID().String(), | ||
| pinnedVersion.GetDeploymentName(), | ||
| pinnedVersion.GetBuildId(), | ||
| ) { | ||
| return | ||
| } | ||
|
|
||
| // Send the signal asynchronously to avoid adding latency to the caller's request. | ||
| // Errors are logged by the signaler implementation (e.g. via convertAndRecordError). However, | ||
| // errors are not propagated to the caller as this is a fire-and-forget operation. | ||
| go func() { | ||
| signaler(context.Background(), namespaceEntry, pinnedVersion.GetDeploymentName(), pinnedVersion.GetBuildId()) //nolint:errcheck | ||
| }() | ||
|
Comment on lines
+62
to
+64
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a thing to remember here is that any error, if present, would be caught in the client's recordAndError method and would be logged out. However, I am not returning the error over here since I don't think we should be stopping existing operations based on this error |
||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.