diff --git a/src/Infrastructure/BotSharp.Abstraction/Crontab/ICrontabService.cs b/src/Infrastructure/BotSharp.Abstraction/Crontab/ICrontabService.cs index 97ef728f5..025685f96 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Crontab/ICrontabService.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Crontab/ICrontabService.cs @@ -4,4 +4,5 @@ public interface ICrontabService { Task> GetCrontable(); Task ScheduledTimeArrived(CrontabItem item); + Task ExecuteTimeArrivedItemWithReentryProtection(CrontabItem item); } diff --git a/src/Infrastructure/BotSharp.Abstraction/Crontab/Models/CrontabItem.cs b/src/Infrastructure/BotSharp.Abstraction/Crontab/Models/CrontabItem.cs index 227ca99ef..538e04db6 100644 --- a/src/Infrastructure/BotSharp.Abstraction/Crontab/Models/CrontabItem.cs +++ b/src/Infrastructure/BotSharp.Abstraction/Crontab/Models/CrontabItem.cs @@ -32,6 +32,9 @@ public class CrontabItem : ScheduleTaskArgs [JsonPropertyName("trigger_type")] public CronTabItemTriggerType TriggerType { get; set; } = CronTabItemTriggerType.BackgroundWatcher; + [JsonPropertyName("reentry_protection")] + public bool ReentryProtection { get; set; } = true; + public override string ToString() { return $"{Title}: {Description} [AgentId: {AgentId}, UserId: {UserId}]"; diff --git a/src/Infrastructure/BotSharp.Core.Crontab/Services/CrontabService.cs b/src/Infrastructure/BotSharp.Core.Crontab/Services/CrontabService.cs index 3a2b05dbb..8913800fb 100644 --- a/src/Infrastructure/BotSharp.Core.Crontab/Services/CrontabService.cs +++ b/src/Infrastructure/BotSharp.Core.Crontab/Services/CrontabService.cs @@ -15,6 +15,7 @@ limitations under the License. ******************************************************************************/ using BotSharp.Abstraction.Agents.Models; +using BotSharp.Abstraction.Infrastructures; using BotSharp.Abstraction.Repositories; using BotSharp.Abstraction.Repositories.Filters; using BotSharp.Abstraction.Tasks; @@ -116,7 +117,12 @@ public async Task ScheduledTimeArrived(CrontabItem item) { _logger.LogDebug($"ScheduledTimeArrived {item}"); - if (!await HasEnabledTriggerRule(item)) return; + var triggerEnabled = await HasEnabledTriggerRule(item); + if (!triggerEnabled) + { + _logger.LogWarning("Crontab: {0}, Trigger is not enabled, skipping this occurrence.", item.Title); + return; + } await HookEmitter.Emit(_services, async hook => { @@ -150,4 +156,62 @@ private async Task HasEnabledTriggerRule(CrontabItem item) // Opt-out only: block when a matching trigger rule exists and Disabled is true. return !agent.Rules.Any(r => r.TriggerName == item.Title && r.Disabled); } + + public async Task ExecuteTimeArrivedItemWithReentryProtection(CrontabItem item) + { + if (!item.ReentryProtection) + { + await ExecuteTimeArrivedItem(item); + return; + } + + var lockKey = $"crontab:execution:{item.Title}"; + using var scope = _services.CreateScope(); + var locker = scope.ServiceProvider.GetRequiredService(); + var acquired = false; + var lockAcquired = false; + + try + { + acquired = await locker.LockAsync(lockKey, async () => + { + lockAcquired = true; + _logger.LogInformation("Crontab: {0}, Distributed lock acquired, beginning execution...", item.Title); + await ExecuteTimeArrivedItem(item); + }, timeout: 600); + + if (!acquired) + { + _logger.LogWarning("Crontab: {0}, Failed to acquire distributed lock, task is still executing, skipping this occurrence to prevent re-entry.", item.Title); + } + } + catch (Exception ex) + { + if (!lockAcquired) + { + _logger.LogWarning("Crontab: {0}, Redis exception occurred before acquiring lock: {1}, executing without lock protection (re-entry protection disabled).", item.Title, ex.Message); + await ExecuteTimeArrivedItem(item); + } + else + { + _logger.LogWarning("Crontab: {0}, Redis exception occurred after lock acquired: {1}, task execution completed but lock release failed.", item.Title, ex.Message); + } + } + } + + private async Task ExecuteTimeArrivedItem(CrontabItem item) + { + try + { + _logger.LogInformation($"Start running crontab {item.Title}"); + await ScheduledTimeArrived(item); + _logger.LogInformation($"Complete running crontab {item.Title}"); + return true; + } + catch (Exception ex) + { + _logger.LogError(ex, $"Error when running crontab {item.Title}"); + return false; + } + } } diff --git a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Crontab/CrontabController.cs b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Crontab/CrontabController.cs index 09e4cf9e3..bc5f98b93 100644 --- a/src/Infrastructure/BotSharp.OpenAPI/Controllers/Crontab/CrontabController.cs +++ b/src/Infrastructure/BotSharp.OpenAPI/Controllers/Crontab/CrontabController.cs @@ -60,7 +60,7 @@ public async Task SchedulingCrontab() { if (item.CheckNextOccurrenceEveryOneMinute()) { - _logger.LogInformation("Crontab: {0}, One occurrence was matched, Beginning execution...", item.Title); + _logger.LogInformation($"Crontab: {item.Title}, One occurrence was matched, attempting to execute..."); Task.Run(() => ExecuteTimeArrivedItem(item, _services)); result.OccurrenceMatchedItems.Add(item.Title); } @@ -84,21 +84,10 @@ private async Task> GetCrontabItems(string? title = null) return allowedCrons.Where(cron => cron.Title.IsEqualTo(title)).ToList(); } - private async Task ExecuteTimeArrivedItem(CrontabItem item, IServiceProvider services) + private async Task ExecuteTimeArrivedItem(CrontabItem item, IServiceProvider services) { - try - { - using var scope = services.CreateScope(); - var crontabService = scope.ServiceProvider.GetRequiredService(); - _logger.LogInformation($"Start running crontab {item.Title}"); - await crontabService.ScheduledTimeArrived(item); - _logger.LogInformation($"Complete running crontab {item.Title}"); - return true; - } - catch (Exception ex) - { - _logger.LogError(ex, $"Error when running crontab {item.Title}"); - return false; - } + using var scope = services.CreateScope(); + var crontabService = scope.ServiceProvider.GetRequiredService(); + await crontabService.ExecuteTimeArrivedItemWithReentryProtection(item); } } diff --git a/src/Plugins/BotSharp.Plugin.MongoStorage/Collections/CrontabItemDocument.cs b/src/Plugins/BotSharp.Plugin.MongoStorage/Collections/CrontabItemDocument.cs index 17a49c2bc..ee5858ce4 100644 --- a/src/Plugins/BotSharp.Plugin.MongoStorage/Collections/CrontabItemDocument.cs +++ b/src/Plugins/BotSharp.Plugin.MongoStorage/Collections/CrontabItemDocument.cs @@ -18,6 +18,8 @@ public class CrontabItemDocument : MongoBase public bool LessThan60Seconds { get; set; } = false; public IEnumerable Tasks { get; set; } = []; public DateTime CreatedTime { get; set; } = DateTime.UtcNow; + public int TriggerType { get; set; } + public bool ReentryProtection { get; set; } = true; public static CrontabItem ToDomainModel(CrontabItemDocument item) { @@ -36,7 +38,9 @@ public static CrontabItem ToDomainModel(CrontabItemDocument item) LastExecutionTime = item.LastExecutionTime, LessThan60Seconds = item.LessThan60Seconds, Tasks = item.Tasks?.Select(x => CronTaskMongoElement.ToDomainElement(x))?.ToArray() ?? [], - CreatedTime = item.CreatedTime + CreatedTime = item.CreatedTime, + TriggerType = (CronTabItemTriggerType)item.TriggerType, + ReentryProtection = item.ReentryProtection }; } @@ -57,7 +61,9 @@ public static CrontabItemDocument ToMongoModel(CrontabItem item) LastExecutionTime = item.LastExecutionTime, LessThan60Seconds = item.LessThan60Seconds, Tasks = item.Tasks?.Select(x => CronTaskMongoElement.ToMongoElement(x))?.ToList() ?? [], - CreatedTime = item.CreatedTime + CreatedTime = item.CreatedTime, + TriggerType = (int)item.TriggerType, + ReentryProtection = item.ReentryProtection }; } }