From 7b91e197122502e5c210fc4ac38bdc3b4f5fcc85 Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Tue, 14 Apr 2026 10:51:17 -0300 Subject: [PATCH 1/9] wait for schedule or context cancellation --- .../capabilities/fakes/manual_cron_trigger.go | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/core/capabilities/fakes/manual_cron_trigger.go b/core/capabilities/fakes/manual_cron_trigger.go index 8a54f1e3c80..277e96e9135 100644 --- a/core/capabilities/fakes/manual_cron_trigger.go +++ b/core/capabilities/fakes/manual_cron_trigger.go @@ -6,6 +6,7 @@ import ( "fmt" "time" + "github.com/robfig/cron/v3" "google.golang.org/protobuf/types/known/timestamppb" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" @@ -43,6 +44,7 @@ type ManualCronTriggerService struct { callbackCh map[string]chan capabilities.TriggerAndId[*crontypedapi.Payload] legacyCallbackCh chan capabilities.TriggerAndId[*crontypedapi.LegacyPayload] //nolint:staticcheck // LegacyPayload intentionally used for backward compatibility workflowIDs map[string]string // triggerID -> workflowID mapping + triggerConfigs map[string]*crontypedapi.Config } func NewManualCronTriggerService(parentLggr logger.Logger) *ManualCronTriggerService { @@ -55,6 +57,7 @@ func NewManualCronTriggerService(parentLggr logger.Logger) *ManualCronTriggerSer callbackCh: make(map[string]chan capabilities.TriggerAndId[*crontypedapi.Payload]), legacyCallbackCh: make(chan capabilities.TriggerAndId[*crontypedapi.LegacyPayload]), //nolint:staticcheck // LegacyPayload intentionally used for backward compatibility workflowIDs: make(map[string]string), + triggerConfigs: make(map[string]*crontypedapi.Config), } } @@ -86,6 +89,7 @@ func (f *ManualCronTriggerService) Initialise(ctx context.Context, dependencies func (f *ManualCronTriggerService) RegisterTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *crontypedapi.Config) (<-chan capabilities.TriggerAndId[*crontypedapi.Payload], caperrors.Error) { f.callbackCh[triggerID] = make(chan capabilities.TriggerAndId[*crontypedapi.Payload]) f.workflowIDs[triggerID] = metadata.WorkflowID + f.triggerConfigs[triggerID] = input return f.callbackCh[triggerID], nil } @@ -105,7 +109,17 @@ func (f *ManualCronTriggerService) AckEvent(ctx context.Context, triggerID strin return nil } -func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID string, scheduledExecutionTime time.Time) error { +func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID string) error { + config, exists := f.triggerConfigs[triggerID] + if !exists { + return fmt.Errorf(`trigger config "%s" not found`, triggerID) + } + schedule, err := cron.ParseStandard(config.Schedule) + if err != nil { + return err + } + scheduledExecutionTime := schedule.Next(time.Now()) + f.lggr.Debugf("ManualTrigger: %s", scheduledExecutionTime.Format(time.RFC3339Nano)) triggerEvent := f.createManualTriggerEvent(scheduledExecutionTime) @@ -129,13 +143,12 @@ func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID } go func() { - select { - case f.callbackCh[triggerID] <- triggerEvent: - // Successfully sent trigger response - case <-ctx.Done(): - // Context cancelled, cleanup goroutine - f.lggr.Debug("ManualTrigger goroutine cancelled due to context cancellation") - } + deadlineCtx, cancel := context.WithDeadline(ctx, scheduledExecutionTime) + defer cancel() + + <-deadlineCtx.Done() + // Successfully sent trigger response + f.callbackCh[triggerID] <- triggerEvent }() return nil From d8a2fcf03f28e8e301005083a7d23cfc64d47207 Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Tue, 14 Apr 2026 11:18:18 -0300 Subject: [PATCH 2/9] use gocron lib --- .../capabilities/fakes/manual_cron_trigger.go | 47 +++++++++++++++---- go.mod | 1 + go.sum | 2 + 3 files changed, 41 insertions(+), 9 deletions(-) diff --git a/core/capabilities/fakes/manual_cron_trigger.go b/core/capabilities/fakes/manual_cron_trigger.go index 277e96e9135..0aa2e0429ee 100644 --- a/core/capabilities/fakes/manual_cron_trigger.go +++ b/core/capabilities/fakes/manual_cron_trigger.go @@ -6,7 +6,7 @@ import ( "fmt" "time" - "github.com/robfig/cron/v3" + "github.com/go-co-op/gocron/v2" "google.golang.org/protobuf/types/known/timestamppb" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" @@ -26,6 +26,7 @@ var _ cronserver.CronCapability = (*ManualCronTriggerService)(nil) const ServiceName = "CronTriggerService" const ID = "cron-trigger@1.0.0" const defaultFastestScheduleIntervalSeconds = 1 +const allowSeconds = true var manualCronTriggerInfo = capabilities.MustNewCapabilityInfo( ID, @@ -45,6 +46,7 @@ type ManualCronTriggerService struct { legacyCallbackCh chan capabilities.TriggerAndId[*crontypedapi.LegacyPayload] //nolint:staticcheck // LegacyPayload intentionally used for backward compatibility workflowIDs map[string]string // triggerID -> workflowID mapping triggerConfigs map[string]*crontypedapi.Config + scheduler gocron.Scheduler } func NewManualCronTriggerService(parentLggr logger.Logger) *ManualCronTriggerService { @@ -78,8 +80,15 @@ func (f *ManualCronTriggerService) Initialise(ctx context.Context, dependencies f.config = cronConfig - err := f.Start(ctx) + scheduler, err := gocron.NewScheduler() if err != nil { + return fmt.Errorf("failed to initialise cron scheduler: %w", err) + } + scheduler.Start() + + f.scheduler = scheduler + + if err := f.Start(ctx); err != nil { return fmt.Errorf("error when starting trigger service: %w", err) } @@ -114,11 +123,20 @@ func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID if !exists { return fmt.Errorf(`trigger config "%s" not found`, triggerID) } - schedule, err := cron.ParseStandard(config.Schedule) + + jobFired := make(chan struct{}) + + jobDef := gocron.CronJob(config.Schedule, allowSeconds) + job, err := f.scheduler.NewJob(jobDef, gocron.NewTask(func() { + jobFired <- struct{}{} + })) + if err != nil { + return fmt.Errorf("failed to create cron job: %w", err) + } + scheduledExecutionTime, err := job.NextRun() if err != nil { - return err + return fmt.Errorf("failed to get next scheduled execution time: %w", err) } - scheduledExecutionTime := schedule.Next(time.Now()) f.lggr.Debugf("ManualTrigger: %s", scheduledExecutionTime.Format(time.RFC3339Nano)) @@ -143,11 +161,17 @@ func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID } go func() { - deadlineCtx, cancel := context.WithDeadline(ctx, scheduledExecutionTime) - defer cancel() + defer close(jobFired) + + // Either wait for cron trigger or context cancellation + select { + case <-jobFired: + break + case <-ctx.Done(): + break + } - <-deadlineCtx.Done() - // Successfully sent trigger response + // Sent trigger response f.callbackCh[triggerID] <- triggerEvent }() @@ -179,6 +203,11 @@ func (f *ManualCronTriggerService) Start(ctx context.Context) error { func (f *ManualCronTriggerService) Close() error { f.lggr.Debug("Closing ManualCronTriggerService") + if f.scheduler != nil { + if err := f.scheduler.Shutdown(); err != nil { + f.lggr.Errorw("failed to close scheduler", "err", err) + } + } return nil } diff --git a/go.mod b/go.mod index c66fa585cdf..d53c6499e61 100644 --- a/go.mod +++ b/go.mod @@ -247,6 +247,7 @@ require ( github.com/gedex/inflector v0.0.0-20170307190818-16278e9db813 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-asn1-ber/asn1-ber v1.5.5 // indirect + github.com/go-co-op/gocron/v2 v2.18.0 // indirect github.com/go-jose/go-jose/v4 v4.1.4 // indirect github.com/go-kit/kit v0.13.0 // indirect github.com/go-kit/log v0.2.1 // indirect diff --git a/go.sum b/go.sum index c92d82d1f7b..12ecd02fdae 100644 --- a/go.sum +++ b/go.sum @@ -472,6 +472,8 @@ github.com/gin-gonic/gin v1.10.1 h1:T0ujvqyCSqRopADpgPgiTT63DUQVSfojyME59Ei63pQ= github.com/gin-gonic/gin v1.10.1/go.mod h1:4PMNQiOhvDRa013RKVbsiNwoyezlm2rm0uX/T7kzp5Y= github.com/go-asn1-ber/asn1-ber v1.5.5 h1:MNHlNMBDgEKD4TcKr36vQN68BA00aDfjIt3/bD50WnA= github.com/go-asn1-ber/asn1-ber v1.5.5/go.mod h1:hEBeB/ic+5LoWskz+yKT7vGhhPYkProFKoKdwZRWMe0= +github.com/go-co-op/gocron/v2 v2.18.0 h1:DS3Uhru66q1jy/5f9V0itmi3cLXcn2b7N+duGfgT7gU= +github.com/go-co-op/gocron/v2 v2.18.0/go.mod h1:Zii6he+Zfgy5W9B+JKk/KwejFOW0kZTFvHtwIpR4aBI= github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk= github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= From f18b9c39e20bbd44a2443b0dec12ca5bcd14d39f Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Tue, 14 Apr 2026 11:29:20 -0300 Subject: [PATCH 3/9] remove job from scheduler after trigger --- core/capabilities/fakes/manual_cron_trigger.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/core/capabilities/fakes/manual_cron_trigger.go b/core/capabilities/fakes/manual_cron_trigger.go index 0aa2e0429ee..744d69aedcd 100644 --- a/core/capabilities/fakes/manual_cron_trigger.go +++ b/core/capabilities/fakes/manual_cron_trigger.go @@ -125,11 +125,12 @@ func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID } jobFired := make(chan struct{}) - - jobDef := gocron.CronJob(config.Schedule, allowSeconds) - job, err := f.scheduler.NewJob(jobDef, gocron.NewTask(func() { - jobFired <- struct{}{} - })) + job, err := f.scheduler.NewJob( + gocron.CronJob(config.Schedule, allowSeconds), + gocron.NewTask(func() { + jobFired <- struct{}{} + }), + ) if err != nil { return fmt.Errorf("failed to create cron job: %w", err) } @@ -162,6 +163,7 @@ func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID go func() { defer close(jobFired) + defer f.scheduler.RemoveJob(job.ID()) // Either wait for cron trigger or context cancellation select { From 0f5a518bdfa160ba89274be677a4be475b572e50 Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Tue, 14 Apr 2026 11:41:59 -0300 Subject: [PATCH 4/9] move scheduler lifecycle --- .../capabilities/fakes/manual_cron_trigger.go | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/core/capabilities/fakes/manual_cron_trigger.go b/core/capabilities/fakes/manual_cron_trigger.go index 744d69aedcd..e0b37abe78b 100644 --- a/core/capabilities/fakes/manual_cron_trigger.go +++ b/core/capabilities/fakes/manual_cron_trigger.go @@ -49,9 +49,14 @@ type ManualCronTriggerService struct { scheduler gocron.Scheduler } -func NewManualCronTriggerService(parentLggr logger.Logger) *ManualCronTriggerService { +func NewManualCronTriggerService(parentLggr logger.Logger) (*ManualCronTriggerService, error) { lggr := logger.Named(parentLggr, "CronTriggerService") // ManualCronTriggerService + scheduler, err := gocron.NewScheduler() + if err != nil { + return nil, fmt.Errorf("failed to create cron scheduler: %w", err) + } + return &ManualCronTriggerService{ CapabilityInfo: manualCronTriggerInfo, config: ManualCronConfig{FastestScheduleIntervalSeconds: 1}, @@ -60,7 +65,8 @@ func NewManualCronTriggerService(parentLggr logger.Logger) *ManualCronTriggerSer legacyCallbackCh: make(chan capabilities.TriggerAndId[*crontypedapi.LegacyPayload]), //nolint:staticcheck // LegacyPayload intentionally used for backward compatibility workflowIDs: make(map[string]string), triggerConfigs: make(map[string]*crontypedapi.Config), - } + scheduler: scheduler, + }, nil } func (f *ManualCronTriggerService) Initialise(ctx context.Context, dependencies core.StandardCapabilitiesDependencies) error { @@ -80,14 +86,6 @@ func (f *ManualCronTriggerService) Initialise(ctx context.Context, dependencies f.config = cronConfig - scheduler, err := gocron.NewScheduler() - if err != nil { - return fmt.Errorf("failed to initialise cron scheduler: %w", err) - } - scheduler.Start() - - f.scheduler = scheduler - if err := f.Start(ctx); err != nil { return fmt.Errorf("error when starting trigger service: %w", err) } @@ -200,15 +198,14 @@ func (f *ManualCronTriggerService) createManualTriggerEvent(scheduledExecutionTi func (f *ManualCronTriggerService) Start(ctx context.Context) error { f.lggr.Debugw("Starting ManualCronTriggerService") + f.scheduler.Start() return nil } func (f *ManualCronTriggerService) Close() error { f.lggr.Debug("Closing ManualCronTriggerService") - if f.scheduler != nil { - if err := f.scheduler.Shutdown(); err != nil { - f.lggr.Errorw("failed to close scheduler", "err", err) - } + if err := f.scheduler.Shutdown(); err != nil { + f.lggr.Errorw("failed to close scheduler", "err", err) } return nil } From 31388ecef7831a2d69aca68a827ebf98bdb28b27 Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Tue, 14 Apr 2026 13:17:53 -0300 Subject: [PATCH 5/9] fix send on close --- core/capabilities/fakes/manual_cron_trigger.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/capabilities/fakes/manual_cron_trigger.go b/core/capabilities/fakes/manual_cron_trigger.go index e0b37abe78b..e8e02a09b7e 100644 --- a/core/capabilities/fakes/manual_cron_trigger.go +++ b/core/capabilities/fakes/manual_cron_trigger.go @@ -126,7 +126,9 @@ func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID job, err := f.scheduler.NewJob( gocron.CronJob(config.Schedule, allowSeconds), gocron.NewTask(func() { - jobFired <- struct{}{} + select { + case jobFired <- struct{}{}: + } }), ) if err != nil { From 2b364eee8b625d51e96a94b9266aa711e4947b3a Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Tue, 14 Apr 2026 13:35:38 -0300 Subject: [PATCH 6/9] add done signal --- core/capabilities/fakes/manual_cron_trigger.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/capabilities/fakes/manual_cron_trigger.go b/core/capabilities/fakes/manual_cron_trigger.go index e8e02a09b7e..38e1710716f 100644 --- a/core/capabilities/fakes/manual_cron_trigger.go +++ b/core/capabilities/fakes/manual_cron_trigger.go @@ -116,10 +116,10 @@ func (f *ManualCronTriggerService) AckEvent(ctx context.Context, triggerID strin return nil } -func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID string) error { +func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID string) (chan struct{}, error) { config, exists := f.triggerConfigs[triggerID] if !exists { - return fmt.Errorf(`trigger config "%s" not found`, triggerID) + return nil, fmt.Errorf(`trigger config "%s" not found`, triggerID) } jobFired := make(chan struct{}) @@ -132,11 +132,11 @@ func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID }), ) if err != nil { - return fmt.Errorf("failed to create cron job: %w", err) + return nil, fmt.Errorf("failed to create cron job: %w", err) } scheduledExecutionTime, err := job.NextRun() if err != nil { - return fmt.Errorf("failed to get next scheduled execution time: %w", err) + return nil, fmt.Errorf("failed to get next scheduled execution time: %w", err) } f.lggr.Debugf("ManualTrigger: %s", scheduledExecutionTime.Format(time.RFC3339Nano)) @@ -161,8 +161,11 @@ func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID f.lggr.Errorw("failed to emit trigger execution started event", "err", err) } + done := make(chan struct{}, 1) + go func() { defer close(jobFired) + defer close(done) defer f.scheduler.RemoveJob(job.ID()) // Either wait for cron trigger or context cancellation @@ -175,9 +178,10 @@ func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID // Sent trigger response f.callbackCh[triggerID] <- triggerEvent + done <- struct{}{} }() - return nil + return done, nil } func (f *ManualCronTriggerService) createManualTriggerEvent(scheduledExecutionTime time.Time) capabilities.TriggerAndId[*crontypedapi.Payload] { From 917b625ab79e9f687a7ed177092a228614aa72b4 Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Tue, 14 Apr 2026 14:20:45 -0300 Subject: [PATCH 7/9] fix job fired signal lifecycle --- core/capabilities/fakes/manual_cron_trigger.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/capabilities/fakes/manual_cron_trigger.go b/core/capabilities/fakes/manual_cron_trigger.go index 38e1710716f..c17c7c920a2 100644 --- a/core/capabilities/fakes/manual_cron_trigger.go +++ b/core/capabilities/fakes/manual_cron_trigger.go @@ -122,13 +122,12 @@ func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID return nil, fmt.Errorf(`trigger config "%s" not found`, triggerID) } - jobFired := make(chan struct{}) + jobFired := make(chan struct{}, 1) job, err := f.scheduler.NewJob( gocron.CronJob(config.Schedule, allowSeconds), gocron.NewTask(func() { - select { - case jobFired <- struct{}{}: - } + defer close(jobFired) + jobFired <- struct{}{} }), ) if err != nil { @@ -164,7 +163,6 @@ func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID done := make(chan struct{}, 1) go func() { - defer close(jobFired) defer close(done) defer f.scheduler.RemoveJob(job.ID()) From a9cb2575c7f385bc092d3f10e84f83c66b97aae4 Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Tue, 14 Apr 2026 14:21:54 -0300 Subject: [PATCH 8/9] mod tidy --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index d53c6499e61..aef01916198 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/gin-contrib/sessions v0.0.5 github.com/gin-contrib/size v0.0.0-20230212012657-e14a14094dc4 github.com/gin-gonic/gin v1.10.1 + github.com/go-co-op/gocron/v2 v2.18.0 github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874 github.com/go-ldap/ldap/v3 v3.4.6 github.com/go-viper/mapstructure/v2 v2.5.0 @@ -247,7 +248,6 @@ require ( github.com/gedex/inflector v0.0.0-20170307190818-16278e9db813 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-asn1-ber/asn1-ber v1.5.5 // indirect - github.com/go-co-op/gocron/v2 v2.18.0 // indirect github.com/go-jose/go-jose/v4 v4.1.4 // indirect github.com/go-kit/kit v0.13.0 // indirect github.com/go-kit/log v0.2.1 // indirect From 59f57edbc6653fb08a7a346413a196ace9b73c71 Mon Sep 17 00:00:00 2001 From: Tarcisio Ferraz Date: Wed, 15 Apr 2026 08:54:28 -0300 Subject: [PATCH 9/9] fix lint --- core/capabilities/fakes/manual_cron_trigger.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/capabilities/fakes/manual_cron_trigger.go b/core/capabilities/fakes/manual_cron_trigger.go index c17c7c920a2..8949708b38e 100644 --- a/core/capabilities/fakes/manual_cron_trigger.go +++ b/core/capabilities/fakes/manual_cron_trigger.go @@ -164,7 +164,9 @@ func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID go func() { defer close(done) - defer f.scheduler.RemoveJob(job.ID()) + defer func() { + _ = f.scheduler.RemoveJob(job.ID()) + }() // Either wait for cron trigger or context cancellation select {