|
1 | 1 | from collections import defaultdict |
2 | | -from datetime import datetime, timedelta |
| 2 | +from datetime import timedelta |
3 | 3 | from typing import Any, Dict, List, Optional |
4 | 4 | from zipfile import ZipFile |
5 | 5 |
|
6 | 6 | from django.conf import settings |
7 | 7 | from django.contrib.auth.models import AbstractUser |
8 | 8 | from django.core.files.storage import Storage |
9 | 9 | from django.db import transaction |
10 | | -from django.db.models import QuerySet |
| 10 | +from django.db.models import Q, QuerySet |
11 | 11 | from django.utils import timezone |
12 | 12 |
|
13 | 13 | from celery.canvas import Signature, chain |
|
62 | 62 | find_unused_name, |
63 | 63 | ) |
64 | 64 |
|
65 | | -WORKFLOW_RATE_LIMIT_CACHE_PREFIX = "automation_workflow_{}" |
66 | 65 | WORKFLOW_HISTORY_RATE_LIMIT_CACHE_PREFIX = "automation_workflow_history_{}" |
67 | 66 | AUTOMATION_WORKFLOW_CACHE_LOCK_SECONDS = 5 |
68 | 67 |
|
@@ -134,7 +133,6 @@ def _invalidate_workflow_caches(self, workflow: AutomationWorkflow) -> None: |
134 | 133 | original_workflow = workflow.get_original() |
135 | 134 |
|
136 | 135 | global_cache.invalidate(f"wa_published_workflow_{original_workflow.id}") |
137 | | - global_cache.invalidate(self._get_rate_limit_cache_key(original_workflow)) |
138 | 136 | global_cache.invalidate( |
139 | 137 | self._get_workflow_history_rate_limit_cache_key(original_workflow) |
140 | 138 | ) |
@@ -804,75 +802,65 @@ def _mark_failure_for_timed_out_history( |
804 | 802 | message="This workflow took too long and was timed out.", |
805 | 803 | ) |
806 | 804 |
|
807 | | - def _get_rate_limit_cache_key(self, original_workflow: AutomationWorkflow) -> str: |
808 | | - return WORKFLOW_RATE_LIMIT_CACHE_PREFIX.format(original_workflow.id) |
809 | | - |
810 | 805 | def _get_workflow_history_rate_limit_cache_key( |
811 | 806 | self, original_workflow: AutomationWorkflow |
812 | 807 | ) -> str: |
813 | 808 | return WORKFLOW_HISTORY_RATE_LIMIT_CACHE_PREFIX.format(original_workflow.id) |
814 | 809 |
|
815 | 810 | def _get_histories_for_current_workflow_version(self, workflow: AutomationWorkflow): |
816 | | - histories = AutomationHistoryHandler().get_workflow_histories( |
817 | | - workflow.get_original() |
818 | | - ) |
| 811 | + original_workflow = workflow.get_original() |
| 812 | + histories = AutomationHistoryHandler().get_workflow_histories(original_workflow) |
819 | 813 |
|
820 | | - if workflow != workflow.get_original(): |
| 814 | + if workflow != original_workflow: |
821 | 815 | histories = histories.filter(started_on__gte=workflow.created_on) |
822 | 816 |
|
823 | 817 | return histories |
824 | 818 |
|
825 | 819 | def _check_is_rate_limited(self, workflow: AutomationWorkflow) -> bool: |
826 | | - """Uses a global cache key to track recent runs for the given workflow.""" |
827 | | - |
828 | | - original_workflow = workflow.get_original() |
829 | | - |
830 | | - cache_key = self._get_rate_limit_cache_key(original_workflow) |
831 | | - rate_cache_timeout = ( |
832 | | - settings.AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS |
833 | | - ) |
834 | | - |
835 | | - now = timezone.now() |
836 | | - |
837 | | - def update_last_run_cache(previous_last_runs): |
838 | | - """ |
839 | | - Given a list of recent workflow run timestamps, determines whether |
840 | | - the workflow run should be rate limited. If so, raises the |
841 | | - AutomationWorkflowRateLimited error. |
842 | | - """ |
843 | | - start_window = now - timedelta( |
844 | | - seconds=settings.AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS |
845 | | - ) |
| 820 | + """ |
| 821 | + Checks workflow histories against the configured rate limit windows. |
846 | 822 |
|
847 | | - # Keep only past runs that are in the window |
848 | | - runs_in_window = [ |
849 | | - timestamp |
850 | | - for timestamp in previous_last_runs |
851 | | - if isinstance(timestamp, datetime) and timestamp > start_window |
852 | | - ] |
| 823 | + The histories are fetched once for the largest configured window and each |
| 824 | + smaller window is evaluated in Python to avoid issuing one COUNT query per |
| 825 | + configured rate limit. |
853 | 826 |
|
854 | | - runs_in_window.append(now) |
| 827 | + Raises AutomationWorkflowRateLimited when the workflow exceeds one of the |
| 828 | + configured rate limits. |
| 829 | + """ |
855 | 830 |
|
856 | | - return runs_in_window |
| 831 | + rate_limits = settings.AUTOMATION_WORKFLOW_RATE_LIMITS |
| 832 | + if not rate_limits: |
| 833 | + return False |
857 | 834 |
|
858 | | - runs_in_window = global_cache.update( |
859 | | - cache_key, |
860 | | - update_last_run_cache, |
861 | | - default_value=lambda: [], |
862 | | - timeout=rate_cache_timeout, |
| 835 | + now = timezone.now() |
| 836 | + largest_window_seconds = max( |
| 837 | + window_seconds for _, window_seconds in rate_limits |
863 | 838 | ) |
864 | | - |
865 | | - if len(runs_in_window) > settings.AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS: |
866 | | - return True |
867 | | - |
868 | | - started_workflows = ( |
| 839 | + oldest_start_window = now - timedelta(seconds=largest_window_seconds) |
| 840 | + history_windows = list( |
869 | 841 | self._get_histories_for_current_workflow_version(workflow) |
870 | | - .filter(status=HistoryStatusChoices.STARTED) |
871 | | - .count() |
| 842 | + .filter( |
| 843 | + Q(started_on__gte=oldest_start_window) |
| 844 | + | Q(status=HistoryStatusChoices.STARTED) |
| 845 | + ) |
| 846 | + .order_by() |
| 847 | + .values_list("started_on", "status") |
872 | 848 | ) |
873 | 849 |
|
874 | | - if started_workflows > settings.AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS: |
875 | | - return True |
| 850 | + for max_runs, window_seconds in rate_limits: |
| 851 | + start_window = now - timedelta(seconds=window_seconds) |
| 852 | + if ( |
| 853 | + sum( |
| 854 | + started_on >= start_window or status == HistoryStatusChoices.STARTED |
| 855 | + for started_on, status in history_windows |
| 856 | + ) |
| 857 | + >= max_runs |
| 858 | + ): |
| 859 | + raise AutomationWorkflowRateLimited( |
| 860 | + "The workflow was rate limited due to too many recent or " |
| 861 | + f"unfinished runs. Limit exceeded: {max_runs} runs in " |
| 862 | + f"{window_seconds} seconds." |
| 863 | + ) |
876 | 864 |
|
877 | 865 | return False |
878 | 866 |
|
@@ -926,12 +914,7 @@ def before_run(self, workflow: AutomationWorkflow) -> None: |
926 | 914 | "The workflow was disabled due to too many consecutive errors." |
927 | 915 | ) |
928 | 916 |
|
929 | | - if self._check_is_rate_limited(workflow): |
930 | | - # Early return if we had too many execution during a short amount of time |
931 | | - raise AutomationWorkflowRateLimited( |
932 | | - "The workflow was rate limited due to too many recent or unfinished " |
933 | | - "runs." |
934 | | - ) |
| 917 | + self._check_is_rate_limited(workflow) |
935 | 918 |
|
936 | 919 | def async_start_workflow( |
937 | 920 | self, |
|
0 commit comments