Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions backend/src/baserow/api/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
# These are not passwords
BAD_TOKEN_SIGNATURE = "BAD_TOKEN_SIGNATURE" # nosec
EXPIRED_TOKEN_SIGNATURE = "EXPIRED_TOKEN_SIGNATURE" # nosec
ERROR_RESET_PASSWORD_TOKEN_USED = (
"ERROR_RESET_PASSWORD_TOKEN_USED",
HTTP_400_BAD_REQUEST,
"The password reset link has already been used.",
)
ERROR_HOSTNAME_IS_NOT_ALLOWED = (
"ERROR_HOSTNAME_IS_NOT_ALLOWED",
HTTP_400_BAD_REQUEST,
Expand Down
4 changes: 4 additions & 0 deletions backend/src/baserow/api/user/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from baserow.api.errors import (
BAD_TOKEN_SIGNATURE,
ERROR_HOSTNAME_IS_NOT_ALLOWED,
ERROR_RESET_PASSWORD_TOKEN_USED,
EXPIRED_TOKEN_SIGNATURE,
)
from baserow.api.schemas import get_error_schema
Expand Down Expand Up @@ -82,6 +83,7 @@
InvalidVerificationToken,
RefreshTokenAlreadyBlacklisted,
ResetPasswordDisabledError,
ResetPasswordTokenAlreadyUsed,
UserAlreadyExist,
UserIsLastAdmin,
UserNotFound,
Expand Down Expand Up @@ -434,6 +436,7 @@ class ResetPasswordView(APIView):
"BAD_TOKEN_SIGNATURE",
"EXPIRED_TOKEN_SIGNATURE",
"ERROR_USER_NOT_FOUND",
"ERROR_RESET_PASSWORD_TOKEN_USED",
"ERROR_REQUEST_BODY_VALIDATION",
]
),
Expand All @@ -448,6 +451,7 @@ class ResetPasswordView(APIView):
SignatureExpired: EXPIRED_TOKEN_SIGNATURE,
UserNotFound: ERROR_USER_NOT_FOUND,
ResetPasswordDisabledError: ERROR_DISABLED_RESET_PASSWORD,
ResetPasswordTokenAlreadyUsed: ERROR_RESET_PASSWORD_TOKEN_USED,
}
)
@validate_body(ResetPasswordBodyValidationSerializer)
Expand Down
48 changes: 41 additions & 7 deletions backend/src/baserow/config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ def __setitem__(self, key, value):
EXTRA_PUBLIC_WEB_FRONTEND_HOSTNAMES.append(hostname)

FROM_EMAIL = os.getenv("FROM_EMAIL", "no-reply@localhost")
RESET_PASSWORD_TOKEN_MAX_AGE = 60 * 60 * 48 # 48 hours
RESET_PASSWORD_TOKEN_MAX_AGE = 60 * 60 * 2 # 2 hours
CHANGE_EMAIL_TOKEN_MAX_AGE = 60 * 60 * 12 # 12 hours

ROW_PAGE_SIZE_LIMIT = int(os.getenv("BASEROW_ROW_PAGE_SIZE_LIMIT", 200))
Expand All @@ -829,16 +829,50 @@ def __setitem__(self, key, value):
AUTOMATION_HISTORY_PAGE_SIZE_LIMIT = int(
os.getenv("BASEROW_AUTOMATION_HISTORY_PAGE_SIZE_LIMIT", 100)
)
AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS = int(
os.getenv("BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS", 10)
_legacy_workflow_rate_limit_max_runs = os.getenv(
"BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS"
)
AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS = int(
os.getenv("BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS", 5)
_legacy_workflow_rate_limit_window_seconds = os.getenv(
"BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS"
)
_automation_workflow_rate_limits_env = os.getenv(
"BASEROW_AUTOMATION_WORKFLOW_RATE_LIMITS"
)

if _automation_workflow_rate_limits_env is not None:
_automation_workflow_rate_limit_values = [
int(value.strip())
for value in _automation_workflow_rate_limits_env.split(",")
if value.strip()
]
elif (
_legacy_workflow_rate_limit_max_runs is not None
or _legacy_workflow_rate_limit_window_seconds is not None
):
_automation_workflow_rate_limit_values = [
int(_legacy_workflow_rate_limit_max_runs or 10),
int(_legacy_workflow_rate_limit_window_seconds or 5),
]
else:
_automation_workflow_rate_limit_values = [10, 5, 30, 60 * 5, 100, 60 * 60]

if len(_automation_workflow_rate_limit_values) % 2 != 0:
raise ImproperlyConfigured(
"BASEROW_AUTOMATION_WORKFLOW_RATE_LIMITS must contain an even number of "
"comma-separated integers formatted as max_runs,window_seconds pairs."
)

AUTOMATION_WORKFLOW_RATE_LIMITS = tuple(
(
_automation_workflow_rate_limit_values[index],
_automation_workflow_rate_limit_values[index + 1],
)
for index in range(0, len(_automation_workflow_rate_limit_values), 2)
)
AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS = int(
os.getenv(
"BASEROW_AUTOMATION_WORKFLOW_HISTORY_RATE_LIMIT_CACHE_EXPIRY_SECONDS",
AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS,
_legacy_workflow_rate_limit_window_seconds or 5,
)
)
AUTOMATION_WORKFLOW_MAX_CONSECUTIVE_ERRORS = int(
Expand All @@ -851,7 +885,7 @@ def __setitem__(self, key, value):
os.getenv("BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS", 30)
)
AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES = int(
os.getenv("BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES", 50)
os.getenv("BASEROW_AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES", 200)
)

TRASH_PAGE_SIZE_LIMIT = 200 # How many trash entries can be requested at once.
Expand Down
101 changes: 42 additions & 59 deletions backend/src/baserow/contrib/automation/workflows/handler.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from collections import defaultdict
from datetime import datetime, timedelta
from datetime import timedelta
from typing import Any, Dict, List, Optional
from zipfile import ZipFile

from django.conf import settings
from django.contrib.auth.models import AbstractUser
from django.core.files.storage import Storage
from django.db import transaction
from django.db.models import QuerySet
from django.db.models import Q, QuerySet
from django.utils import timezone

from celery.canvas import Signature, chain
Expand Down Expand Up @@ -62,7 +62,6 @@
find_unused_name,
)

WORKFLOW_RATE_LIMIT_CACHE_PREFIX = "automation_workflow_{}"
WORKFLOW_HISTORY_RATE_LIMIT_CACHE_PREFIX = "automation_workflow_history_{}"
AUTOMATION_WORKFLOW_CACHE_LOCK_SECONDS = 5

Expand Down Expand Up @@ -134,7 +133,6 @@ def _invalidate_workflow_caches(self, workflow: AutomationWorkflow) -> None:
original_workflow = workflow.get_original()

global_cache.invalidate(f"wa_published_workflow_{original_workflow.id}")
global_cache.invalidate(self._get_rate_limit_cache_key(original_workflow))
global_cache.invalidate(
self._get_workflow_history_rate_limit_cache_key(original_workflow)
)
Expand Down Expand Up @@ -804,75 +802,65 @@ def _mark_failure_for_timed_out_history(
message="This workflow took too long and was timed out.",
)

def _get_rate_limit_cache_key(self, original_workflow: AutomationWorkflow) -> str:
return WORKFLOW_RATE_LIMIT_CACHE_PREFIX.format(original_workflow.id)

def _get_workflow_history_rate_limit_cache_key(
self, original_workflow: AutomationWorkflow
) -> str:
return WORKFLOW_HISTORY_RATE_LIMIT_CACHE_PREFIX.format(original_workflow.id)

def _get_histories_for_current_workflow_version(self, workflow: AutomationWorkflow):
histories = AutomationHistoryHandler().get_workflow_histories(
workflow.get_original()
)
original_workflow = workflow.get_original()
histories = AutomationHistoryHandler().get_workflow_histories(original_workflow)

if workflow != workflow.get_original():
if workflow != original_workflow:
histories = histories.filter(started_on__gte=workflow.created_on)

return histories

def _check_is_rate_limited(self, workflow: AutomationWorkflow) -> bool:
"""Uses a global cache key to track recent runs for the given workflow."""

original_workflow = workflow.get_original()

cache_key = self._get_rate_limit_cache_key(original_workflow)
rate_cache_timeout = (
settings.AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS
)

now = timezone.now()

def update_last_run_cache(previous_last_runs):
"""
Given a list of recent workflow run timestamps, determines whether
the workflow run should be rate limited. If so, raises the
AutomationWorkflowRateLimited error.
"""
start_window = now - timedelta(
seconds=settings.AUTOMATION_WORKFLOW_RATE_LIMIT_CACHE_EXPIRY_SECONDS
)
"""
Checks workflow histories against the configured rate limit windows.

# Keep only past runs that are in the window
runs_in_window = [
timestamp
for timestamp in previous_last_runs
if isinstance(timestamp, datetime) and timestamp > start_window
]
The histories are fetched once for the largest configured window and each
smaller window is evaluated in Python to avoid issuing one COUNT query per
configured rate limit.

runs_in_window.append(now)
Raises AutomationWorkflowRateLimited when the workflow exceeds one of the
configured rate limits.
"""

return runs_in_window
rate_limits = settings.AUTOMATION_WORKFLOW_RATE_LIMITS
if not rate_limits:
return False

runs_in_window = global_cache.update(
cache_key,
update_last_run_cache,
default_value=lambda: [],
timeout=rate_cache_timeout,
now = timezone.now()
largest_window_seconds = max(
window_seconds for _, window_seconds in rate_limits
)

if len(runs_in_window) > settings.AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS:
return True

started_workflows = (
oldest_start_window = now - timedelta(seconds=largest_window_seconds)
history_windows = list(
self._get_histories_for_current_workflow_version(workflow)
.filter(status=HistoryStatusChoices.STARTED)
.count()
.filter(
Q(started_on__gte=oldest_start_window)
| Q(status=HistoryStatusChoices.STARTED)
)
.order_by()
.values_list("started_on", "status")
)

if started_workflows > settings.AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS:
return True
for max_runs, window_seconds in rate_limits:
start_window = now - timedelta(seconds=window_seconds)
if (
sum(
started_on >= start_window or status == HistoryStatusChoices.STARTED
for started_on, status in history_windows
)
>= max_runs
):
raise AutomationWorkflowRateLimited(
"The workflow was rate limited due to too many recent or "
f"unfinished runs. Limit exceeded: {max_runs} runs in "
f"{window_seconds} seconds."
)

return False

Expand Down Expand Up @@ -926,12 +914,7 @@ def before_run(self, workflow: AutomationWorkflow) -> None:
"The workflow was disabled due to too many consecutive errors."
)

if self._check_is_rate_limited(workflow):
# Early return if we had too many execution during a short amount of time
raise AutomationWorkflowRateLimited(
"The workflow was rate limited due to too many recent or unfinished "
"runs."
)
self._check_is_rate_limited(workflow)

def async_start_workflow(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def _connection(self, instance):
cursor = None
connection = None

if not is_hostname_safe(instance.postgresql_host):
if not settings.TESTS and not is_hostname_safe(instance.postgresql_host):
raise SyncError("It's not allowed to connect to this hostname.")

baserow_postgresql_connection = (
Expand Down
Loading
Loading