Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4893d34
current work
Shivs11 Jan 9, 2026
7136f94
current tinkering
Shivs11 Jan 13, 2026
7c3e682
currently working test
Shivs11 Jan 26, 2026
eae0388
Merge origin/main into ss/reactive-drained-inactive-wfs
Shivs11 Jan 26, 2026
95f110c
draft 1 with tests
Shivs11 Jan 26, 2026
9fbe766
added cache alongside tests
Shivs11 Jan 27, 2026
885f033
Remove CLAUDE.md and revert .gitignore changes
Shivs11 Jan 27, 2026
79ae1c1
removed buggy drainage condition
Shivs11 Jan 27, 2026
040d04d
add version gate for this safe since its not NDE safe
Shivs11 Jan 27, 2026
fa26af0
lint and removal of prints
Shivs11 Jan 27, 2026
b0d1138
remove some logs
Shivs11 Jan 27, 2026
dfc4094
move signal sending to the workerdeployment Client
Shivs11 Feb 3, 2026
1490d62
add dynamic config for thsi
Shivs11 Feb 3, 2026
ef0fa0d
send signal after update persistence
Shivs11 Feb 3, 2026
7139756
Merge branch 'main' into ss/reactive-drained-inactive-wfs
Shivs11 Feb 3, 2026
f084d50
fix breaking build
Shivs11 Feb 3, 2026
30a5a21
Merge branch 'main' into ss/reactive-drained-inactive-wfs
Shivs11 Feb 11, 2026
cc050fe
address cursor bugbot comments
Shivs11 Feb 11, 2026
c33dd2e
only reactivate when outcome is StartNew
Shivs11 Feb 11, 2026
87fdbad
add noop worker deployment client to make unit tests pass
Shivs11 Feb 11, 2026
4c03435
fix unit test with an expectation call
Shivs11 Feb 11, 2026
38012f0
address comments
Shivs11 Feb 12, 2026
b9ed17c
goimports
Shivs11 Feb 12, 2026
39c252a
lint
Shivs11 Feb 12, 2026
04be7bb
helpful comment
Shivs11 Feb 12, 2026
3a91e57
hoping this resolves my lint issues
Shivs11 Feb 12, 2026
adaac7d
revive in lint
Shivs11 Feb 12, 2026
cf74e58
finally ran make lint locally (was not working before)
Shivs11 Feb 12, 2026
fcecd45
add version prefix to reactivation cache names
carlydf Feb 12, 2026
15a5ac6
merge
carlydf Feb 12, 2026
06ad026
Merge branch 'main' of github.com:temporalio/temporal into ss/reactiv…
carlydf Feb 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2794,6 +2794,27 @@ instead of the previous HSM backed implementation.`,
`Maximum number of entries in the version membership cache.`,
)

VersionReactivationSignalCacheTTL = NewGlobalDurationSetting(
"history.versionReactivationSignalCacheTTL",
10*time.Second,
`TTL for caching drainage reactivation signals to version workflows. These signals are sent from the history service to update the version workflow's
draining status to DRAINING from DRAINED/INACTIVE states.`,
)

VersionReactivationSignalCacheMaxSize = NewGlobalIntSetting(
"history.versionReactivationSignalCacheMaxSize",
10000,
`Maximum number of entries in the version reactivation signal cache.`,
)

EnableVersionReactivationSignals = NewGlobalBoolSetting(
"history.enableVersionReactivationSignals",
true,
`EnableVersionReactivationSignals controls whether reactivation signals are sent to version workflows
when workflows are pinned to a potentially DRAINED/INACTIVE version. Set to false to disable signals
globally if load becomes problematic.`,
)

RoutingInfoCacheTTL = NewGlobalDurationSetting(
"history.routingInfoCacheTTL",
1*time.Second,
Expand Down
3 changes: 3 additions & 0 deletions common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
MutableStateCacheTypeTagValue = "mutablestate"
EventsCacheTypeTagValue = "events"
VersionMembershipCacheTypeTagValue = "version_membership"
VersionReactivationSignalCacheTypeTagValue = "version_reactivation_signal"
RoutingInfoCacheTypeTagValue = "routing_info"
NexusEndpointRegistryReadThroughCacheTypeTagValue = "nexus_endpoint_registry_readthrough"

Expand Down Expand Up @@ -458,6 +459,8 @@ const (
VersionMembershipCacheGetScope = "VersionMembershipCacheGet"
// VersionMembershipCachePutScope is the scope used by version membership cache
VersionMembershipCachePutScope = "VersionMembershipCachePut"
// VersionReactivationSignalCacheShouldSendScope is the scope used by version reactivation signal cache
VersionReactivationSignalCacheShouldSendScope = "VersionReactivationSignalCacheShouldSend"
// RoutingInfoCacheGetScope is the scope used by routing info cache
RoutingInfoCacheGetScope = "RoutingInfoCacheGet"
// RoutingInfoCachePutScope is the scope used by routing info cache
Expand Down
65 changes: 65 additions & 0 deletions common/worker_versioning/version_reactivation_signal_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//nolint:staticcheck
package worker_versioning

import (
"go.temporal.io/server/common/cache"
"go.temporal.io/server/common/metrics"
)

// ReactivationSignalCache deduplicates reactivation signals to version workflows.
//
// Implementations are expected to be safe for concurrent use.
type (
ReactivationSignalCache interface {
// ShouldSendSignal returns true if signal should be sent (not recently sent)
// and atomically marks it as sent. Returns false if recently sent.
ShouldSendSignal(namespaceID, deploymentName, buildID string) bool
}

reactivationSignalCacheKey struct {
namespaceID string
deploymentName string
buildID string
}

ReactivationSignalCacheImpl struct {
cache.Cache
metricsHandler metrics.Handler
}
)

// NewReactivationSignalCache wraps the provided cache with a typed API and metrics.
func NewReactivationSignalCache(c cache.Cache, metricsHandler metrics.Handler) ReactivationSignalCache {
h := metricsHandler.WithTags(metrics.CacheTypeTag(metrics.VersionReactivationSignalCacheTypeTagValue))
return &ReactivationSignalCacheImpl{
Cache: c,
metricsHandler: h,
}
}

func (c *ReactivationSignalCacheImpl) ShouldSendSignal(
namespaceID, deploymentName, buildID string,
) bool {
handler := c.metricsHandler.WithTags(
metrics.OperationTag(metrics.VersionReactivationSignalCacheShouldSendScope),
metrics.NamespaceIDTag(namespaceID),
)
metrics.CacheRequests.With(handler).Record(1)

key := reactivationSignalCacheKey{
namespaceID: namespaceID,
deploymentName: deploymentName,
buildID: buildID,
}

// Check if we recently sent a signal for this version
if c.Cache.Get(key) != nil {
// Entry exists, signal was recently sent - deduplicate
return false
}

// No recent signal, mark as sent and return true
metrics.CacheMissCounter.With(handler).Record(1)
c.Cache.Put(key, true)
return true
Comment thread
cursor[bot] marked this conversation as resolved.
}
4 changes: 4 additions & 0 deletions service/history/api/multioperation/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func Invoke(
tokenSerializer *tasktoken.Serializer,
matchingClient matchingservice.MatchingServiceClient,
versionMembershipCache worker_versioning.VersionMembershipCache,
reactivationSignalCache worker_versioning.ReactivationSignalCache,
reactivationSignaler api.VersionReactivationSignalerFn,
testHooks testhooks.TestHooks,
) (*historyservice.ExecuteMultiOperationResponse, error) {
namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId()), req.WorkflowId)
Expand Down Expand Up @@ -101,6 +103,8 @@ func Invoke(
startReq,
matchingClient,
versionMembershipCache,
reactivationSignalCache,
reactivationSignaler,
uws.workflowLeaseCallback(ctx),
)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions service/history/api/resetworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func Invoke(
workflowConsistencyChecker api.WorkflowConsistencyChecker,
matchingClient matchingservice.MatchingServiceClient,
versionMembershipCache worker_versioning.VersionMembershipCache,
reactivationSignalCache worker_versioning.ReactivationSignalCache,
reactivationSignaler api.VersionReactivationSignalerFn,
) (_ *historyservice.ResetWorkflowExecutionResponse, retError error) {
namespaceID := namespace.ID(resetRequest.GetNamespaceId())
err := api.ValidateNamespaceUUID(namespaceID)
Expand Down Expand Up @@ -177,6 +179,15 @@ func Invoke(
); err != nil {
return nil, err
}

// Notify version workflow if we're pinning to a potentially drained version via post-reset operations
for _, operation := range request.GetPostResetOperations() {
if updateOpts, ok := operation.GetVariant().(*workflowpb.PostResetOperation_UpdateWorkflowOptions_); ok {
api.ReactivateVersionWorkflowIfPinned(ctx, namespaceEntry,
updateOpts.UpdateWorkflowOptions.GetWorkflowExecutionOptions().GetVersioningOverride(), reactivationSignalCache, reactivationSignaler, shardContext.GetConfig().EnableVersionReactivationSignals())
}
}

return &historyservice.ResetWorkflowExecutionResponse{
RunId: resetRunID,
}, nil
Expand Down
8 changes: 8 additions & 0 deletions service/history/api/signalwithstartworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ func Invoke(
workflowConsistencyChecker api.WorkflowConsistencyChecker,
matchingClient matchingservice.MatchingServiceClient,
versionMembershipCache worker_versioning.VersionMembershipCache,
reactivationSignalCache worker_versioning.ReactivationSignalCache,
reactivationSignaler api.VersionReactivationSignalerFn,
) (_ *historyservice.SignalWithStartWorkflowExecutionResponse, retError error) {
namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(signalWithStartRequest.GetNamespaceId()), signalWithStartRequest.SignalWithStartRequest.WorkflowId)
if err != nil {
Expand Down Expand Up @@ -91,6 +93,12 @@ func Invoke(
if err != nil {
return nil, err
}

// Notify version workflow if we're starting a new workflow pinned to a potentially drained version
if started {
api.ReactivateVersionWorkflowIfPinned(ctx, namespaceEntry, request.GetVersioningOverride(), reactivationSignalCache, reactivationSignaler, shard.GetConfig().EnableVersionReactivationSignals())
}

return &historyservice.SignalWithStartWorkflowExecutionResponse{
RunId: runID,
Started: started,
Expand Down
19 changes: 17 additions & 2 deletions service/history/api/startworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type Starter struct {
namespace *namespace.Namespace
createOrUpdateLeaseFn api.CreateOrUpdateLeaseFunc
versionMembershipCache worker_versioning.VersionMembershipCache
reactivationSignalCache worker_versioning.ReactivationSignalCache
reactivationSignaler api.VersionReactivationSignalerFn
}

// creationParams is a container for all information obtained from creating the uncommitted execution.
Expand Down Expand Up @@ -89,6 +91,8 @@ func NewStarter(
request *historyservice.StartWorkflowExecutionRequest,
matchingClient matchingservice.MatchingServiceClient,
versionMembershipCache worker_versioning.VersionMembershipCache,
reactivationSignalCache worker_versioning.ReactivationSignalCache,
reactivationSignaler api.VersionReactivationSignalerFn,
createLeaseFn api.CreateOrUpdateLeaseFunc,
) (*Starter, error) {
namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(request.GetNamespaceId()), request.StartRequest.WorkflowId)
Expand All @@ -107,6 +111,8 @@ func NewStarter(
namespace: namespaceEntry,
createOrUpdateLeaseFn: createLeaseFn,
versionMembershipCache: versionMembershipCache,
reactivationSignalCache: reactivationSignalCache,
reactivationSignaler: reactivationSignaler,
}, nil
}

Expand Down Expand Up @@ -208,12 +214,21 @@ func (s *Starter) Invoke(
var currentWorkflowConditionFailedError *persistence.CurrentWorkflowConditionFailedError
if errors.As(err, &currentWorkflowConditionFailedError) && len(currentWorkflowConditionFailedError.RunID) > 0 {
// The history and mutable state generated above will be deleted by a background process.
return s.handleConflict(ctx, creationParams, currentWorkflowConditionFailedError)
resp, outcome, conflictErr := s.handleConflict(ctx, creationParams, currentWorkflowConditionFailedError)
if conflictErr == nil && outcome == StartNew {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main idea here was to only send the reactivation signal iff we have actually started/created a new workflow execution - all other outcomes, except StartNew, do not do that.

// Notify version workflow if we are starting a workflow execution on a potentially drained version.
// Only signal when a new workflow was actually created (StartNew), not for deduped retries
// (StartDeduped) or reused existing workflows (StartReused) where the pinned override is not applied.
api.ReactivateVersionWorkflowIfPinned(ctx, s.namespace, s.request.StartRequest.GetVersioningOverride(), s.reactivationSignalCache, s.reactivationSignaler, s.shardContext.GetConfig().EnableVersionReactivationSignals())
}
return resp, outcome, conflictErr
Comment thread
Shivs11 marked this conversation as resolved.
}

return nil, StartErr, err
}

// Notify version workflow if we're pinning to a potentially drained version
api.ReactivateVersionWorkflowIfPinned(ctx, s.namespace, s.request.StartRequest.GetVersioningOverride(), s.reactivationSignalCache, s.reactivationSignaler, s.shardContext.GetConfig().EnableVersionReactivationSignals())

resp, err = s.generateResponse(
creationParams.runID,
creationParams.workflowTaskInfo,
Expand Down
13 changes: 13 additions & 0 deletions service/history/api/updateworkflowoptions/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func Invoke(
workflowConsistencyChecker api.WorkflowConsistencyChecker,
matchingClient matchingservice.MatchingServiceClient,
versionMembershipCache worker_versioning.VersionMembershipCache,
reactivationSignalCache worker_versioning.ReactivationSignalCache,
reactivationSignaler api.VersionReactivationSignalerFn,
) (*historyservice.UpdateWorkflowExecutionOptionsResponse, error) {
req := request.GetUpdateRequest()
ns, err := api.GetActiveNamespace(shardCtx, namespace.ID(request.GetNamespaceId()), req.GetWorkflowExecution().GetWorkflowId())
Expand All @@ -36,6 +38,9 @@ func Invoke(
}
ret := &historyservice.UpdateWorkflowExecutionOptionsResponse{}

// Store versioning override to send reactivation signal after successful persistence
var versioningOverrideForReactivation *workflowpb.VersioningOverride

err = api.GetAndUpdateWorkflowWithNew(
ctx,
nil,
Expand Down Expand Up @@ -94,6 +99,9 @@ func Invoke(
}, nil
}

// Store versioning override to send reactivation signal after successful persistence
versioningOverrideForReactivation = mergedOpts.GetVersioningOverride()

// TODO (carly) part 2: handle safe deployment change --> CreateWorkflowTask=true
return &api.UpdateWorkflowAction{
Noop: false,
Expand All @@ -107,6 +115,11 @@ func Invoke(
if err != nil {
return nil, err
}

// Notify version workflow if we're pinning to a potentially drained version.
// This is done after successful persistence to avoid signaling if the update fails.
Comment thread
carlydf marked this conversation as resolved.
api.ReactivateVersionWorkflowIfPinned(ctx, ns, versioningOverrideForReactivation, reactivationSignalCache, reactivationSignaler, shardCtx.GetConfig().EnableVersionReactivationSignals())

return ret, nil
}

Expand Down
16 changes: 15 additions & 1 deletion service/history/api/updateworkflowoptions/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ func (noopVersionMembershipCache) Put(
) {
}

type noopReactivationSignalCache struct{}

func (noopReactivationSignalCache) ShouldSendSignal(_, _, _ string) bool {
return false // Always return false to skip sending signals in tests
}

// noopReactivationSignaler is a no-op signaler function for tests
func noopReactivationSignaler(_ context.Context, _ *namespace.Namespace, _, _ string) error {
return nil
}

var (
emptyOptions = &workflowpb.WorkflowExecutionOptions{}
unpinnedOverrideOptions = &workflowpb.WorkflowExecutionOptions{
Expand Down Expand Up @@ -184,6 +195,7 @@ func (s *updateWorkflowOptionsSuite) SetupTest() {
s.shardContext = historyi.NewMockShardContext(s.controller)
s.shardContext.EXPECT().GetNamespaceRegistry().Return(s.namespaceRegistry)
s.shardContext.EXPECT().GetClusterMetadata().Return(clustertest.NewMetadataForTest(cluster.NewTestClusterMetadataConfig(true, true)))
s.shardContext.EXPECT().GetConfig().Return(tests.NewDynamicConfig()).AnyTimes()

// mock a mutable state with an existing versioning override
s.currentMutableState = historyi.NewMockMutableState(s.controller)
Expand Down Expand Up @@ -263,7 +275,9 @@ func (s *updateWorkflowOptionsSuite) TestInvoke_Success() {
s.shardContext,
s.workflowConsistencyChecker,
s.mockMatchingClient,
noopVersionMembershipCache{}, // cache not meant to be used in this test
noopVersionMembershipCache{}, // cache not meant to be used in this test
noopReactivationSignalCache{}, // cache not meant to be used in this test
noopReactivationSignaler, // signaler not meant to be used in this test
)
s.NoError(err)
s.NotNil(resp)
Expand Down
65 changes: 65 additions & 0 deletions service/history/api/worker_versioning_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package api

import (
"context"

workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/worker_versioning"
)

// VersionReactivationSignalerFn is a function type for sending reactivation signals to version workflows.
// This abstraction allows the history API layer to use the deployment client without importing it directly,
// avoiding import cycles between history/api and worker/workerdeployment packages.
type VersionReactivationSignalerFn func(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason for this type instead of passing client itself?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

circular dependencies; my first stab at this involved me just directly passing the workerDeployment.Client, but that did not go well as:

service/history/api → service/worker/workerdeployment → service/history/api

ctx context.Context,
namespaceEntry *namespace.Namespace,
deploymentName, buildID string,
) error

// ReactivateVersionWorkflowIfPinned sends a reactivation signal to the version workflow
// when workflows are pinned to a potentially DRAINED/INACTIVE version. It also deduplicates
// signals within the cache TTL window.
// This is a fire-and-forget operation - the signal is sent asynchronously and errors are
// logged by the signaler implementation.
//
//nolint:revive,errcheck
func ReactivateVersionWorkflowIfPinned(
Comment thread
carlydf marked this conversation as resolved.
ctx context.Context,
namespaceEntry *namespace.Namespace,
override *workflowpb.VersioningOverride,
signalCache worker_versioning.ReactivationSignalCache,
signaler VersionReactivationSignalerFn,
enabled bool,
) {
// Check if signals are enabled globally
if !enabled {
return
}

// Only process if we're pinning to a specific version
if !worker_versioning.OverrideIsPinned(override) {
return
}

pinnedVersion := worker_versioning.GetOverridePinnedVersion(override)
if pinnedVersion == nil {
return
}

// Check cache - skip if signal was recently sent
if signalCache != nil && !signalCache.ShouldSendSignal(
namespaceEntry.ID().String(),
pinnedVersion.GetDeploymentName(),
pinnedVersion.GetBuildId(),
) {
return
}

// Send the signal asynchronously to avoid adding latency to the caller's request.
// Errors are logged by the signaler implementation (e.g. via convertAndRecordError). However,
// errors are not propagated to the caller as this is a fire-and-forget operation.
go func() {
signaler(context.Background(), namespaceEntry, pinnedVersion.GetDeploymentName(), pinnedVersion.GetBuildId()) //nolint:errcheck
}()
Comment on lines +62 to +64
Copy link
Copy Markdown
Member Author

@Shivs11 Shivs11 Feb 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a thing to remember here is that any error, if present, would be caught in the client's recordAndError method and would be logged out. However, I am not returning the error over here since I don't think we should be stopping existing operations based on this error

}
Loading
Loading