Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ public interface ICrontabService
{
Task<List<CrontabItem>> GetCrontable();
Task ScheduledTimeArrived(CrontabItem item);
Task ExecuteTimeArrivedItemWithReentryProtection(CrontabItem item);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class CrontabItem : ScheduleTaskArgs
[JsonPropertyName("trigger_type")]
public CronTabItemTriggerType TriggerType { get; set; } = CronTabItemTriggerType.BackgroundWatcher;

[JsonPropertyName("reentry_protection")]
public bool ReentryProtection { get; set; } = true;

public override string ToString()
{
return $"{Title}: {Description} [AgentId: {AgentId}, UserId: {UserId}]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.
******************************************************************************/

using BotSharp.Abstraction.Agents.Models;
using BotSharp.Abstraction.Infrastructures;
using BotSharp.Abstraction.Repositories;
using BotSharp.Abstraction.Repositories.Filters;
using BotSharp.Abstraction.Tasks;
Expand Down Expand Up @@ -116,7 +117,12 @@ public async Task ScheduledTimeArrived(CrontabItem item)
{
_logger.LogDebug($"ScheduledTimeArrived {item}");

if (!await HasEnabledTriggerRule(item)) return;
var triggerEnabled = await HasEnabledTriggerRule(item);
if (!triggerEnabled)
{
_logger.LogWarning("Crontab: {0}, Trigger is not enabled, skipping this occurrence.", item.Title);
return;
}

await HookEmitter.Emit<ICrontabHook>(_services, async hook =>
{
Expand Down Expand Up @@ -150,4 +156,62 @@ private async Task<bool> HasEnabledTriggerRule(CrontabItem item)
// Opt-out only: block when a matching trigger rule exists and Disabled is true.
return !agent.Rules.Any(r => r.TriggerName == item.Title && r.Disabled);
}

public async Task ExecuteTimeArrivedItemWithReentryProtection(CrontabItem item)
{
if (!item.ReentryProtection)
{
await ExecuteTimeArrivedItem(item);
return;
}

var lockKey = $"crontab:execution:{item.Title}";
using var scope = _services.CreateScope();
var locker = scope.ServiceProvider.GetRequiredService<IDistributedLocker>();
var acquired = false;
var lockAcquired = false;

try
{
acquired = await locker.LockAsync(lockKey, async () =>
{
lockAcquired = true;
_logger.LogInformation("Crontab: {0}, Distributed lock acquired, beginning execution...", item.Title);
await ExecuteTimeArrivedItem(item);
}, timeout: 600);

if (!acquired)
{
_logger.LogWarning("Crontab: {0}, Failed to acquire distributed lock, task is still executing, skipping this occurrence to prevent re-entry.", item.Title);
}
Comment on lines +168 to +186
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Lock key too coarse 🐞 Bug ≡ Correctness

ExecuteTimeArrivedItemWithReentryProtection builds the distributed lock key from only
CrontabItem.Title, so different crontab items (e.g., different ConversationId/AgentId) sharing the
same Title will block each other and cause skipped executions.
Agent Prompt
### Issue description
`ExecuteTimeArrivedItemWithReentryProtection` uses `crontab:execution:{item.Title}` as the distributed lock key. Since crontab items are persisted/upserted by `ConversationId` (and items also vary by `AgentId`/`UserId`), different jobs can share a title and incorrectly block each other, leading to skipped executions.

### Issue Context
- Crontab items in Mongo are upserted by `ConversationId`, not by `Title`, so `Title` is not a safe global unique identifier.
- The lock key should incorporate the job's real identity (at least `ConversationId`, and likely `AgentId` too).

### Fix Focus Areas
- src/Infrastructure/BotSharp.Core.Crontab/Services/CrontabService.cs[132-158]

### Implementation notes
- Build the lock key from stable identifiers, e.g.:
  - `crontab:execution:{item.TriggerType}:{item.AgentId}:{item.ConversationId}:{item.Title}`
  - If some sources don’t have `ConversationId`, fall back to a safe placeholder like `global`.
- Keep the log message consistent with the new key (include key parts for debugging).

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

}
catch (Exception ex)
{
if (!lockAcquired)
{
_logger.LogWarning("Crontab: {0}, Redis exception occurred before acquiring lock: {1}, executing without lock protection (re-entry protection disabled).", item.Title, ex.Message);
await ExecuteTimeArrivedItem(item);
}
else
{
_logger.LogWarning("Crontab: {0}, Redis exception occurred after lock acquired: {1}, task execution completed but lock release failed.", item.Title, ex.Message);
}
}
}

private async Task<bool> ExecuteTimeArrivedItem(CrontabItem item)
{
try
{
_logger.LogInformation($"Start running crontab {item.Title}");
await ScheduledTimeArrived(item);
_logger.LogInformation($"Complete running crontab {item.Title}");
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error when running crontab {item.Title}");
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public async Task<CrontabSchedulingResult> SchedulingCrontab()
{
if (item.CheckNextOccurrenceEveryOneMinute())
{
_logger.LogInformation("Crontab: {0}, One occurrence was matched, Beginning execution...", item.Title);
_logger.LogInformation($"Crontab: {item.Title}, One occurrence was matched, attempting to execute...");
Task.Run(() => ExecuteTimeArrivedItem(item, _services));
result.OccurrenceMatchedItems.Add(item.Title);
}
Expand All @@ -84,21 +84,10 @@ private async Task<List<CrontabItem>> GetCrontabItems(string? title = null)
return allowedCrons.Where(cron => cron.Title.IsEqualTo(title)).ToList();
}

private async Task<bool> ExecuteTimeArrivedItem(CrontabItem item, IServiceProvider services)
private async Task ExecuteTimeArrivedItem(CrontabItem item, IServiceProvider services)
{
try
{
using var scope = services.CreateScope();
var crontabService = scope.ServiceProvider.GetRequiredService<ICrontabService>();
_logger.LogInformation($"Start running crontab {item.Title}");
await crontabService.ScheduledTimeArrived(item);
_logger.LogInformation($"Complete running crontab {item.Title}");
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, $"Error when running crontab {item.Title}");
return false;
}
using var scope = services.CreateScope();
var crontabService = scope.ServiceProvider.GetRequiredService<ICrontabService>();
await crontabService.ExecuteTimeArrivedItemWithReentryProtection(item);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public class CrontabItemDocument : MongoBase
public bool LessThan60Seconds { get; set; } = false;
public IEnumerable<CronTaskMongoElement> Tasks { get; set; } = [];
public DateTime CreatedTime { get; set; } = DateTime.UtcNow;
public int TriggerType { get; set; }
public bool ReentryProtection { get; set; } = true;

public static CrontabItem ToDomainModel(CrontabItemDocument item)
{
Expand All @@ -36,7 +38,9 @@ public static CrontabItem ToDomainModel(CrontabItemDocument item)
LastExecutionTime = item.LastExecutionTime,
LessThan60Seconds = item.LessThan60Seconds,
Tasks = item.Tasks?.Select(x => CronTaskMongoElement.ToDomainElement(x))?.ToArray() ?? [],
CreatedTime = item.CreatedTime
CreatedTime = item.CreatedTime,
TriggerType = (CronTabItemTriggerType)item.TriggerType,
ReentryProtection = item.ReentryProtection
};
}

Expand All @@ -57,7 +61,9 @@ public static CrontabItemDocument ToMongoModel(CrontabItem item)
LastExecutionTime = item.LastExecutionTime,
LessThan60Seconds = item.LessThan60Seconds,
Tasks = item.Tasks?.Select(x => CronTaskMongoElement.ToMongoElement(x))?.ToList() ?? [],
CreatedTime = item.CreatedTime
CreatedTime = item.CreatedTime,
TriggerType = (int)item.TriggerType,
ReentryProtection = item.ReentryProtection
};
}
}
Loading