Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 57 additions & 10 deletions core/capabilities/fakes/manual_cron_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"time"

"github.com/go-co-op/gocron/v2"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
Expand All @@ -25,6 +26,7 @@ var _ cronserver.CronCapability = (*ManualCronTriggerService)(nil)
const ServiceName = "CronTriggerService"
const ID = "cron-trigger@1.0.0"
const defaultFastestScheduleIntervalSeconds = 1
const allowSeconds = true

var manualCronTriggerInfo = capabilities.MustNewCapabilityInfo(
ID,
Expand All @@ -43,19 +45,28 @@ type ManualCronTriggerService struct {
callbackCh map[string]chan capabilities.TriggerAndId[*crontypedapi.Payload]
legacyCallbackCh chan capabilities.TriggerAndId[*crontypedapi.LegacyPayload] //nolint:staticcheck // LegacyPayload intentionally used for backward compatibility
workflowIDs map[string]string // triggerID -> workflowID mapping
triggerConfigs map[string]*crontypedapi.Config
scheduler gocron.Scheduler
}

func NewManualCronTriggerService(parentLggr logger.Logger) *ManualCronTriggerService {
func NewManualCronTriggerService(parentLggr logger.Logger) (*ManualCronTriggerService, error) {
lggr := logger.Named(parentLggr, "CronTriggerService") // ManualCronTriggerService

scheduler, err := gocron.NewScheduler()
if err != nil {
return nil, fmt.Errorf("failed to create cron scheduler: %w", err)
}

return &ManualCronTriggerService{
CapabilityInfo: manualCronTriggerInfo,
config: ManualCronConfig{FastestScheduleIntervalSeconds: 1},
lggr: lggr,
callbackCh: make(map[string]chan capabilities.TriggerAndId[*crontypedapi.Payload]),
legacyCallbackCh: make(chan capabilities.TriggerAndId[*crontypedapi.LegacyPayload]), //nolint:staticcheck // LegacyPayload intentionally used for backward compatibility
workflowIDs: make(map[string]string),
}
triggerConfigs: make(map[string]*crontypedapi.Config),
scheduler: scheduler,
}, nil
}

func (f *ManualCronTriggerService) Initialise(ctx context.Context, dependencies core.StandardCapabilitiesDependencies) error {
Expand All @@ -75,8 +86,7 @@ func (f *ManualCronTriggerService) Initialise(ctx context.Context, dependencies

f.config = cronConfig

err := f.Start(ctx)
if err != nil {
if err := f.Start(ctx); err != nil {
return fmt.Errorf("error when starting trigger service: %w", err)
}

Expand All @@ -86,6 +96,7 @@ func (f *ManualCronTriggerService) Initialise(ctx context.Context, dependencies
func (f *ManualCronTriggerService) RegisterTrigger(ctx context.Context, triggerID string, metadata capabilities.RequestMetadata, input *crontypedapi.Config) (<-chan capabilities.TriggerAndId[*crontypedapi.Payload], caperrors.Error) {
f.callbackCh[triggerID] = make(chan capabilities.TriggerAndId[*crontypedapi.Payload])
f.workflowIDs[triggerID] = metadata.WorkflowID
f.triggerConfigs[triggerID] = input
return f.callbackCh[triggerID], nil
}

Expand All @@ -105,7 +116,28 @@ func (f *ManualCronTriggerService) AckEvent(ctx context.Context, triggerID strin
return nil
}

func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID string, scheduledExecutionTime time.Time) error {
func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID string) (chan struct{}, error) {
config, exists := f.triggerConfigs[triggerID]
if !exists {
return nil, fmt.Errorf(`trigger config "%s" not found`, triggerID)
}

jobFired := make(chan struct{}, 1)
job, err := f.scheduler.NewJob(
gocron.CronJob(config.Schedule, allowSeconds),
gocron.NewTask(func() {
defer close(jobFired)
jobFired <- struct{}{}
}),
)
if err != nil {
return nil, fmt.Errorf("failed to create cron job: %w", err)
}
scheduledExecutionTime, err := job.NextRun()
if err != nil {
return nil, fmt.Errorf("failed to get next scheduled execution time: %w", err)
}

f.lggr.Debugf("ManualTrigger: %s", scheduledExecutionTime.Format(time.RFC3339Nano))

triggerEvent := f.createManualTriggerEvent(scheduledExecutionTime)
Expand All @@ -128,17 +160,28 @@ func (f *ManualCronTriggerService) ManualTrigger(ctx context.Context, triggerID
f.lggr.Errorw("failed to emit trigger execution started event", "err", err)
}

done := make(chan struct{}, 1)

go func() {
defer close(done)
defer func() {
_ = f.scheduler.RemoveJob(job.ID())
}()

// Either wait for cron trigger or context cancellation
select {
case f.callbackCh[triggerID] <- triggerEvent:
// Successfully sent trigger response
case <-jobFired:
break
case <-ctx.Done():
// Context cancelled, cleanup goroutine
f.lggr.Debug("ManualTrigger goroutine cancelled due to context cancellation")
break
}

// Sent trigger response
f.callbackCh[triggerID] <- triggerEvent
done <- struct{}{}
Comment thread
tarcisiozf marked this conversation as resolved.
}()

return nil
return done, nil
}

func (f *ManualCronTriggerService) createManualTriggerEvent(scheduledExecutionTime time.Time) capabilities.TriggerAndId[*crontypedapi.Payload] {
Expand All @@ -161,11 +204,15 @@ func (f *ManualCronTriggerService) createManualTriggerEvent(scheduledExecutionTi

func (f *ManualCronTriggerService) Start(ctx context.Context) error {
f.lggr.Debugw("Starting ManualCronTriggerService")
f.scheduler.Start()
return nil
}

func (f *ManualCronTriggerService) Close() error {
f.lggr.Debug("Closing ManualCronTriggerService")
if err := f.scheduler.Shutdown(); err != nil {
f.lggr.Errorw("failed to close scheduler", "err", err)
Comment thread
tarcisiozf marked this conversation as resolved.
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/gin-contrib/sessions v0.0.5
github.com/gin-contrib/size v0.0.0-20230212012657-e14a14094dc4
github.com/gin-gonic/gin v1.10.1
github.com/go-co-op/gocron/v2 v2.18.0
github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874
github.com/go-ldap/ldap/v3 v3.4.6
github.com/go-viper/mapstructure/v2 v2.5.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading