diff --git a/backend/src/baserow/contrib/automation/api/workflows/serializers.py b/backend/src/baserow/contrib/automation/api/workflows/serializers.py
index 4707515688..5f3249a869 100644
--- a/backend/src/baserow/contrib/automation/api/workflows/serializers.py
+++ b/backend/src/baserow/contrib/automation/api/workflows/serializers.py
@@ -2,7 +2,10 @@
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
+from baserow.api.pagination import PageNumberPagination
from baserow.contrib.automation.models import (
+ AutomationHistory,
+ AutomationNodeHistory,
AutomationWorkflow,
AutomationWorkflowHistory,
)
@@ -103,14 +106,88 @@ class OrderAutomationWorkflowsSerializer(serializers.Serializer):
)
-class AutomationWorkflowHistorySerializer(serializers.ModelSerializer):
+class AutomationHistorySerializer(serializers.ModelSerializer):
class Meta:
- model = AutomationWorkflowHistory
+ model = AutomationHistory
fields = (
"id",
"started_on",
"completed_on",
- "is_test_run",
"message",
"status",
)
+
+
+class AutomationNodeHistorySerializer(AutomationHistorySerializer):
+ parent_node_id = serializers.SerializerMethodField()
+ iteration = serializers.SerializerMethodField()
+ result = serializers.SerializerMethodField()
+ node_type = serializers.SerializerMethodField()
+ node_label = serializers.SerializerMethodField()
+
+ class Meta:
+ model = AutomationNodeHistory
+ fields = AutomationHistorySerializer.Meta.fields + (
+ "workflow_history",
+ "node",
+ "node_type",
+ "node_label",
+ "parent_node_id",
+ "iteration",
+ "result",
+ )
+
+ def _get_first_node_result(self, obj):
+ results = obj.node_results.all()
+ return results[0] if results else None
+
+ @extend_schema_field(OpenApiTypes.STR)
+ def get_node_type(self, obj):
+ return obj.node.get_type().type
+
+ @extend_schema_field(OpenApiTypes.STR)
+ def get_node_label(self, obj):
+ return obj.node.label
+
+ @extend_schema_field(OpenApiTypes.INT)
+ def get_parent_node_id(self, obj):
+ parent_nodes = obj.node.get_parent_nodes()
+ if not parent_nodes:
+ return None
+ return parent_nodes[-1].id
+
+ @extend_schema_field(OpenApiTypes.INT)
+ def get_iteration(self, obj):
+ result = self._get_first_node_result(obj)
+ if result is None:
+ return None
+
+ if result.iteration_path:
+ return int(result.iteration_path.rsplit(".", 1)[-1])
+
+ return 0
+
+ def get_result(self, obj):
+ result = self._get_first_node_result(obj)
+ return result.result if result else {}
+
+
+class AutomationWorkflowHistorySerializer(AutomationHistorySerializer):
+ node_histories = AutomationNodeHistorySerializer(read_only=True, many=True)
+
+ class Meta:
+ model = AutomationWorkflowHistory
+ fields = AutomationHistorySerializer.Meta.fields + (
+ "is_test_run",
+ "event_payload",
+ "simulate_until_node",
+ "node_histories",
+ )
+
+
+class AutomationWorkflowHistoryPagination(PageNumberPagination):
+ def get_paginated_response(self, data, *, success_count: int, fail_count: int):
+ response = super().get_paginated_response(data)
+ response.data["success_count"] = success_count
+ response.data["fail_count"] = fail_count
+ return response
diff --git a/backend/src/baserow/contrib/automation/api/workflows/views.py b/backend/src/baserow/contrib/automation/api/workflows/views.py
index cc6201526b..696469e624 100644
--- a/backend/src/baserow/contrib/automation/api/workflows/views.py
+++ b/backend/src/baserow/contrib/automation/api/workflows/views.py
@@ -2,11 +2,13 @@
from django.conf import settings
from django.db import transaction
+from django.db.models import Count, Q
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import OpenApiParameter, extend_schema
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
+from rest_framework.serializers import IntegerField
from rest_framework.status import HTTP_202_ACCEPTED
from rest_framework.views import APIView
@@ -14,7 +16,6 @@
from baserow.api.decorators import map_exceptions, validate_body
from baserow.api.jobs.errors import ERROR_MAX_JOB_COUNT_EXCEEDED
from baserow.api.jobs.serializers import JobSerializer
-from baserow.api.pagination import PageNumberPagination
from baserow.api.schemas import CLIENT_SESSION_ID_SCHEMA_PARAMETER, get_error_schema
from baserow.api.serializers import get_example_pagination_serializer_class
from baserow.contrib.automation.api.workflows.errors import (
@@ -23,12 +24,14 @@
ERROR_AUTOMATION_WORKFLOW_NOTIFICATION_RECIPIENTS_INVALID,
)
from baserow.contrib.automation.api.workflows.serializers import (
+ AutomationWorkflowHistoryPagination,
AutomationWorkflowHistorySerializer,
AutomationWorkflowSerializer,
CreateAutomationWorkflowSerializer,
OrderAutomationWorkflowsSerializer,
UpdateAutomationWorkflowSerializer,
)
+from baserow.contrib.automation.history.constants import HistoryStatusChoices
from baserow.contrib.automation.history.service import AutomationHistoryService
from baserow.contrib.automation.workflows.actions import (
CreateAutomationWorkflowActionType,
@@ -231,7 +234,15 @@ class AutomationWorkflowHistoryView(APIView):
description="Retrieve the history for a workflow.",
responses={
200: get_example_pagination_serializer_class(
- AutomationWorkflowHistorySerializer
+ AutomationWorkflowHistorySerializer,
+ additional_fields={
+ "success_count": IntegerField(
+ help_text="The total number of successful workflow runs."
+ ),
+ "fail_count": IntegerField(
+ help_text="The total number of failed workflow runs."
+ ),
+ },
),
404: get_error_schema(
[
@@ -251,16 +262,26 @@ def get(self, request, workflow_id: int):
request.user, workflow_id
)
- paginator = PageNumberPagination(
+ counts = queryset.aggregate(
+ success_count=Count("id", filter=Q(status=HistoryStatusChoices.SUCCESS)),
+ fail_count=Count("id", filter=Q(status=HistoryStatusChoices.ERROR)),
+ )
+
+ paginator = AutomationWorkflowHistoryPagination(
limit_page_size=settings.AUTOMATION_HISTORY_PAGE_SIZE_LIMIT
)
+
page = paginator.paginate_queryset(queryset, request, self)
serializer = AutomationWorkflowHistorySerializer(
page,
many=True,
)
- return paginator.get_paginated_response(serializer.data)
+ return paginator.get_paginated_response(
+ serializer.data,
+ success_count=counts["success_count"],
+ fail_count=counts["fail_count"],
+ )
class OrderAutomationWorkflowsView(APIView):
diff --git a/backend/src/baserow/contrib/automation/history/handler.py b/backend/src/baserow/contrib/automation/history/handler.py
index dac591f612..ad59be4cf7 100644
--- a/backend/src/baserow/contrib/automation/history/handler.py
+++ b/backend/src/baserow/contrib/automation/history/handler.py
@@ -1,7 +1,7 @@
from datetime import datetime
from typing import Dict, List, Optional, Union
-from django.db.models import QuerySet
+from django.db.models import Prefetch, QuerySet
from baserow.contrib.automation.history.constants import HistoryStatusChoices
from baserow.contrib.automation.history.exceptions import (
@@ -31,9 +31,18 @@ def get_workflow_histories(
base_queryset = AutomationWorkflowHistory.objects.all()
return base_queryset.filter(
- workflow=workflow,
+ original_workflow=workflow,
simulate_until_node__isnull=True,
- ).prefetch_related("workflow__automation__workspace")
+ ).prefetch_related(
+ Prefetch(
+ "node_histories",
+ queryset=AutomationNodeHistory.objects.select_related(
+ "node", "node__workflow"
+ )
+ .prefetch_related("node_results")
+ .order_by("started_on"),
+ ),
+ )
def get_workflow_history(
self, history_id: int, base_queryset: Optional[QuerySet] = None
@@ -60,6 +69,7 @@ def get_workflow_history(
def create_workflow_history(
self,
+ original_workflow: AutomationWorkflow,
workflow: AutomationWorkflow,
started_on: datetime,
is_test_run: bool,
@@ -73,6 +83,7 @@ def create_workflow_history(
return AutomationWorkflowHistory.objects.create(
workflow=workflow,
+ original_workflow=original_workflow,
started_on=started_on,
is_test_run=is_test_run,
simulate_until_node=simulate_until_node,
diff --git a/backend/src/baserow/contrib/automation/history/models.py b/backend/src/baserow/contrib/automation/history/models.py
index ed8d8d65f7..67514ef6ef 100644
--- a/backend/src/baserow/contrib/automation/history/models.py
+++ b/backend/src/baserow/contrib/automation/history/models.py
@@ -20,10 +20,18 @@ class Meta:
class AutomationWorkflowHistory(AutomationHistory):
- workflow = models.ForeignKey(
+ original_workflow = models.ForeignKey(
"automation.AutomationWorkflow",
on_delete=models.CASCADE,
related_name="workflow_histories",
+ # TODO ZDM: Make non-nullable after next release and add backfill
+ # migration. See: https://github.com/baserow/baserow/issues/5236
+ null=True,
+ )
+ workflow = models.ForeignKey(
+ "automation.AutomationWorkflow",
+ on_delete=models.CASCADE,
+ related_name="cloned_workflow_histories",
)
simulate_until_node = models.ForeignKey(
"automation.AutomationNode",
diff --git a/backend/src/baserow/contrib/automation/migrations/0028_automationworkflowhistory_original_workflow_and_more.py b/backend/src/baserow/contrib/automation/migrations/0028_automationworkflowhistory_original_workflow_and_more.py
new file mode 100644
index 0000000000..d6743065eb
--- /dev/null
+++ b/backend/src/baserow/contrib/automation/migrations/0028_automationworkflowhistory_original_workflow_and_more.py
@@ -0,0 +1,49 @@
+import django.db.models.deletion
+from django.db import migrations, models
+
+
+def backfill_original_workflow(apps, schema_editor):
+ AutomationWorkflowHistory = apps.get_model(
+ "automation", "AutomationWorkflowHistory"
+ )
+ AutomationWorkflowHistory.objects.filter(original_workflow__isnull=True).update(
+ original_workflow_id=models.F("workflow_id")
+ )
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ("automation", "0027_alter_automationnodehistory_options_and_more"),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name="automationworkflowhistory",
+ name="original_workflow",
+ field=models.ForeignKey(
+ null=True,
+ on_delete=django.db.models.deletion.CASCADE,
+ related_name="workflow_histories",
+ to="automation.automationworkflow",
+ ),
+ ),
+ migrations.RunPython(
+ backfill_original_workflow,
+ reverse_code=migrations.RunPython.noop,
+ ),
+ migrations.AlterField(
+ model_name="automationworkflowhistory",
+ name="workflow",
+ field=models.ForeignKey(
+ on_delete=django.db.models.deletion.CASCADE,
+ related_name="cloned_workflow_histories",
+ to="automation.automationworkflow",
+ ),
+ ),
+ migrations.AlterField(
+ model_name='automationworkflow',
+ name='state',
+ field=models.CharField(choices=[('draft', 'Draft'), ('live', 'Live'), ('paused', 'Paused'), ('disabled', 'Disabled'), ('test_clone', 'Test Clone')], db_default='draft', default='draft', max_length=20),
+ ),
+ ]
diff --git a/backend/src/baserow/contrib/automation/models.py b/backend/src/baserow/contrib/automation/models.py
index 2c241f89a4..ed94950a4f 100644
--- a/backend/src/baserow/contrib/automation/models.py
+++ b/backend/src/baserow/contrib/automation/models.py
@@ -1,6 +1,11 @@
from django.db import models
-from baserow.contrib.automation.history.models import AutomationWorkflowHistory
+from baserow.contrib.automation.history.models import (
+ AutomationHistory,
+ AutomationNodeHistory,
+ AutomationNodeResult,
+ AutomationWorkflowHistory,
+)
from baserow.contrib.automation.workflows.models import (
AutomationWorkflow,
DuplicateAutomationWorkflowJob,
@@ -11,7 +16,10 @@
"Automation",
"AutomationWorkflow",
"DuplicateAutomationWorkflowJob",
+ "AutomationHistory",
"AutomationWorkflowHistory",
+ "AutomationNodeHistory",
+ "AutomationNodeResult",
]
diff --git a/backend/src/baserow/contrib/automation/nodes/handler.py b/backend/src/baserow/contrib/automation/nodes/handler.py
index dca96d342f..3dd90b36b8 100644
--- a/backend/src/baserow/contrib/automation/nodes/handler.py
+++ b/backend/src/baserow/contrib/automation/nodes/handler.py
@@ -367,6 +367,7 @@ def import_node_only(
def _handle_workflow_error(
self,
node_history: AutomationNodeHistory,
+ iteration_path: str,
error: str,
) -> None:
now = timezone.now()
@@ -380,6 +381,11 @@ def _handle_workflow_error(
node_history.status = HistoryStatusChoices.ERROR
node_history.save()
+ AutomationHistoryHandler().create_node_result(
+ node_history=node_history,
+ iteration_path=iteration_path,
+ )
+
def _handle_simulation_notify(
self, simulate_until_node: AutomationNode | None, node: AutomationNode
) -> bool:
@@ -392,9 +398,11 @@ def _handle_simulation_notify(
"""
if simulate_until_node and simulate_until_node.id == node.id:
- node.service.specific.refresh_from_db(fields=["sample_data"])
+ service = node.service.specific
+ service.refresh_from_db(fields=["sample_data"])
automation_node_updated.send(self, user=None, node=node)
return True
+
return False
def dispatch_node(
@@ -469,14 +477,14 @@ def dispatch_node(
simulate_until_node=workflow_history.simulate_until_node,
current_iterations=current_iterations,
)
-
+ iteration_path = dispatch_context.get_iteration_path(node)
node_type: Type[AutomationNodeActionNodeType] = node.get_type()
try:
dispatch_result = node_type.dispatch(node, dispatch_context)
except ServiceImproperlyConfiguredDispatchException as e:
error = f"The node {node.id} is misconfigured and cannot be dispatched. {str(e)}"
- self._handle_workflow_error(node_history, error)
+ self._handle_workflow_error(node_history, iteration_path, error)
self._handle_simulation_notify(simulate_until_node, node)
return None
except UnexpectedDispatchException as e:
@@ -485,7 +493,7 @@ def dispatch_node(
f"Error while running workflow {original_workflow.id}. Error: {str(e)}"
)
logger.warning(error)
- self._handle_workflow_error(node_history, error)
+ self._handle_workflow_error(node_history, iteration_path, error)
self._handle_simulation_notify(simulate_until_node, node)
return None
except Exception as e:
@@ -496,7 +504,7 @@ def dispatch_node(
f"Error: {str(e)}"
)
logger.exception(error)
- self._handle_workflow_error(node_history, error)
+ self._handle_workflow_error(node_history, iteration_path, error)
self._handle_simulation_notify(simulate_until_node, node)
return None
@@ -508,7 +516,7 @@ def dispatch_node(
history_handler.create_node_result(
node_history=node_history,
result=dispatch_result.data,
- iteration_path=dispatch_context.get_iteration_path(node),
+ iteration_path=iteration_path,
)
to_chain = []
diff --git a/backend/src/baserow/contrib/automation/nodes/service.py b/backend/src/baserow/contrib/automation/nodes/service.py
index 6fa3a5493c..bad1be30cb 100644
--- a/backend/src/baserow/contrib/automation/nodes/service.py
+++ b/backend/src/baserow/contrib/automation/nodes/service.py
@@ -34,7 +34,9 @@
ReplacedAutomationNode,
UpdatedAutomationNode,
)
+from baserow.contrib.automation.workflows.constants import WORKFLOW_DIRTY_CACHE_KEY
from baserow.contrib.automation.workflows.signals import automation_workflow_updated
+from baserow.core.cache import global_cache
from baserow.core.handler import CoreHandler
from baserow.core.integrations.handler import IntegrationHandler
from baserow.core.trash.handler import TrashHandler
@@ -196,6 +198,9 @@ def create_node(
workflow.get_graph().insert(new_node, reference_node, position, output)
+ cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(workflow.id)
+ global_cache.update(cache_key, lambda _: True)
+
automation_node_created.send(
self,
node=new_node,
@@ -245,6 +250,9 @@ def update_node(
# Now export the 'new' node values, since everything has been updated.
new_node_values = node_type.export_prepared_values(node)
+ cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(node.workflow.id)
+ global_cache.update(cache_key, lambda _: True)
+
automation_node_updated.send(self, user=user, node=updated_node)
return UpdatedAutomationNode(
@@ -285,6 +293,9 @@ def delete_node(
node,
)
+ cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(workflow.id)
+ global_cache.update(cache_key, lambda _: True)
+
automation_node_deleted.send(
self,
workflow=workflow,
@@ -325,6 +336,9 @@ def duplicate_node(
workflow.get_graph().insert(duplicated_node, source_node, "south", "")
+ cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(workflow.id)
+ global_cache.update(cache_key, lambda _: True)
+
automation_node_created.send(
self,
node=duplicated_node,
diff --git a/backend/src/baserow/contrib/automation/workflows/constants.py b/backend/src/baserow/contrib/automation/workflows/constants.py
index 8d3ce14714..e05ae9fbc6 100644
--- a/backend/src/baserow/contrib/automation/workflows/constants.py
+++ b/backend/src/baserow/contrib/automation/workflows/constants.py
@@ -9,3 +9,7 @@ class WorkflowState(models.TextChoices):
LIVE = "live"
PAUSED = "paused"
DISABLED = "disabled"
+ TEST_CLONE = "test_clone"
+
+
+WORKFLOW_DIRTY_CACHE_KEY = "wa_workflow_dirty_{}"
diff --git a/backend/src/baserow/contrib/automation/workflows/handler.py b/backend/src/baserow/contrib/automation/workflows/handler.py
index b06383e4f7..c5af5f0b47 100644
--- a/backend/src/baserow/contrib/automation/workflows/handler.py
+++ b/backend/src/baserow/contrib/automation/workflows/handler.py
@@ -1,6 +1,6 @@
from collections import defaultdict
from datetime import timedelta
-from typing import Any, Dict, List, Optional
+from typing import Any, Dict, List, Optional, Tuple
from zipfile import ZipFile
from django.conf import settings
@@ -22,7 +22,10 @@
)
from baserow.contrib.automation.history.constants import HistoryStatusChoices
from baserow.contrib.automation.history.handler import AutomationHistoryHandler
-from baserow.contrib.automation.history.models import AutomationWorkflowHistory
+from baserow.contrib.automation.history.models import (
+ AutomationNodeHistory,
+ AutomationWorkflowHistory,
+)
from baserow.contrib.automation.models import Automation
from baserow.contrib.automation.nodes.handler import AutomationNodeHandler
from baserow.contrib.automation.nodes.models import AutomationNode
@@ -32,6 +35,7 @@
from baserow.contrib.automation.types import AutomationWorkflowDict
from baserow.contrib.automation.workflows.constants import (
ALLOW_TEST_RUN_MINUTES,
+ WORKFLOW_DIRTY_CACHE_KEY,
WorkflowState,
)
from baserow.contrib.automation.workflows.exceptions import (
@@ -42,7 +46,10 @@
AutomationWorkflowTooManyErrors,
)
from baserow.contrib.automation.workflows.models import AutomationWorkflow
-from baserow.contrib.automation.workflows.signals import automation_workflow_updated
+from baserow.contrib.automation.workflows.signals import (
+ automation_workflow_dispatch_started,
+ automation_workflow_updated,
+)
from baserow.contrib.automation.workflows.tasks import (
handle_workflow_dispatch_done,
start_workflow_celery_task,
@@ -125,8 +132,13 @@ def get_published_workflow(
def _get_published_workflow(
workflow: AutomationWorkflow,
) -> Optional[AutomationWorkflow]:
- latest_published = workflow.published_to.order_by("-id").first()
- return latest_published.workflows.first() if latest_published else None
+ latest_published = (
+ AutomationWorkflow.objects.exclude(state=WorkflowState.TEST_CLONE)
+ .filter(automation__published_from=workflow)
+ .order_by("-automation__id")
+ .first()
+ )
+ return latest_published
if with_cache:
return local_cache.get(
@@ -143,6 +155,7 @@ def _invalidate_workflow_caches(self, workflow: AutomationWorkflow) -> None:
global_cache.invalidate(
self._get_workflow_history_rate_limit_cache_key(original_workflow)
)
+ global_cache.invalidate(WORKFLOW_DIRTY_CACHE_KEY.format(original_workflow.id))
def get_workflows(
self, automation: Automation, base_queryset: Optional[QuerySet] = None
@@ -609,39 +622,37 @@ def clean_up_previously_published_automations(
if len(published_automations) > 1:
# Delete all but the last published automation
- ids_to_delete = [a.id for a in published_automations[:-1]]
- Automation.objects.filter(id__in=ids_to_delete).delete()
-
- # Disable the last published workflow
- if published_workflow := published_automations[-1].workflows.first():
- published_workflow.state = WorkflowState.DISABLED
- published_workflow.save(update_fields=["state"])
-
- def publish(
+ if ids_to_delete := [
+ a.id
+ for a in published_automations[:-1]
+ # Exclude any automations that have any history entries
+ if not AutomationWorkflowHistory.objects.filter(
+ workflow__automation=a
+ ).exists()
+ ]:
+ Automation.objects.filter(id__in=ids_to_delete).delete()
+
+ # Disable any live workflows
+ AutomationWorkflow.objects.filter(
+ automation__published_from=workflow,
+ state=WorkflowState.LIVE,
+ ).update(state=WorkflowState.DISABLED)
+
+ def _clone_workflow(
self,
workflow: AutomationWorkflow,
- progress: Optional[Progress] = None,
- ) -> AutomationWorkflow:
+ state: WorkflowState | None = None,
+ progress: Progress | None = None,
+ ) -> Tuple[Automation, Dict]:
"""
- Publishes an Automation and a specific workflow. If the automation was
- already published, the previous versions are deleted and a new one
- is created.
-
- When an automation is published, a clone of the current version is
- created to avoid further modifications to the original automation
- which could affect the published version.
+ Creates a clone of the workflow's automation via the export/import process.
- :param workflow: The workflow to be published.
- :param progress: An object to track the publishing progress.
- :return: The published workflow.
+ :param workflow: The workflow to clone.
+ :param state: An optional WorkflowState to assign; defaults to draft.
+ :param progress: An optional progress tracker.
+ :return: The cloned Automation and the id_mapping dict.
"""
- # Make sure we are the only process to update the automation workflow
- # to prevent race conditions.
- workflow = self.get_workflow(workflow.id, for_update=True)
-
- self.clean_up_previously_published_automations(workflow)
-
import_export_config = ImportExportConfig(
include_permission_data=True,
reduce_disk_space_usage=False,
@@ -659,8 +670,8 @@ def publish(
workflows=[workflow],
)
- # Manually set the published status for the newly created workflow.
- exported_automation["workflows"][0]["state"] = WorkflowState.LIVE
+ if state:
+ exported_automation["workflows"][0]["state"] = state
progress_builder = None
if progress:
@@ -669,7 +680,7 @@ def publish(
id_mapping = {"import_workspace_id": workflow.automation.workspace.id}
- duplicate_automation = application_type.import_serialized(
+ cloned_automation = application_type.import_serialized(
None,
exported_automation,
import_export_config,
@@ -679,12 +690,42 @@ def publish(
progress_builder=progress_builder,
)
- duplicate_automation.published_from = workflow
- duplicate_automation.save(update_fields=["published_from"])
+ cloned_automation.published_from = workflow
+ cloned_automation.save(update_fields=["published_from"])
+
+ return cloned_automation, id_mapping
+
+ def publish(
+ self,
+ workflow: AutomationWorkflow,
+ progress: Optional[Progress] = None,
+ ) -> AutomationWorkflow:
+ """
+ Publishes an Automation and a specific workflow. If the automation was
+ already published, the previous versions are deleted and a new one
+ is created.
+
+ When an automation is published, a clone of the current version is
+ created to avoid further modifications to the original automation
+ which could affect the published version.
+
+ :param workflow: The workflow to be published.
+ :param progress: An object to track the publishing progress.
+ :return: The published workflow.
+ """
+
+ # Make sure we are the only process to update the automation workflow
+ # to prevent race conditions.
+ workflow = self.get_workflow(workflow.id, for_update=True)
+ self.clean_up_previously_published_automations(workflow)
+
+ cloned_automation, id_mapping = self._clone_workflow(
+ workflow, WorkflowState.LIVE, progress
+ )
self._invalidate_workflow_caches(workflow)
- return duplicate_automation.workflows.first()
+ return cloned_automation.workflows.first()
def disable_workflow(self, workflow: AutomationWorkflow) -> None:
"""
@@ -761,7 +802,9 @@ def reset_workflow_temporary_states(self, workflow):
automation_workflow_updated.send(self, user=None, workflow=workflow)
def toggle_test_run(
- self, workflow: AutomationWorkflow, simulate_until_node: bool = None
+ self,
+ workflow: AutomationWorkflow,
+ simulate_until_node: AutomationNode | None,
):
"""
Trigger a test run if none is in progress or cancel the planned run. If the
@@ -785,7 +828,6 @@ def toggle_test_run(
# If the service related to the trigger can immediately be tested
# we immediately trigger the workflow run
self.async_start_workflow(workflow)
-
else:
AutomationWorkflowHandler().set_workflow_temporary_states(
workflow, simulate_until_node=simulate_until_node
@@ -818,6 +860,9 @@ def _clear_old_history(self, original_workflow: AutomationWorkflow) -> None:
It will delete any history entries that are older than MAX_HISTORY_DAYS and only
keep the most recent MAX_HISTORY_ENTRIES entries.
+
+ TODO: refactor this once https://github.com/baserow/baserow/pull/5166
+ is merged in.
"""
oldest_history_date = timezone.now() - timedelta(
@@ -836,6 +881,28 @@ def _clear_old_history(self, original_workflow: AutomationWorkflow) -> None:
status=HistoryStatusChoices.STARTED
).exclude(id__in=history_ids_to_keep).delete()
+ # Clean up published automations that no longer have any history entries
+ active_published = self.get_published_workflow(
+ original_workflow, with_cache=False
+ )
+ empty_published = (
+ Automation.objects.filter(
+ published_from=original_workflow,
+ )
+ .exclude(workflows__cloned_workflow_histories__isnull=False)
+ # _ensure_published_for_run() is called to potentially create a
+ # test clone just before calling before_run(). We need to
+ # ensure the newly created clone isn't deleted prematurely.
+ .exclude(
+ workflows__state=WorkflowState.TEST_CLONE,
+ created_on__gte=timezone.now() - timedelta(seconds=5),
+ )
+ )
+ if active_published:
+ empty_published = empty_published.exclude(id=active_published.automation_id)
+
+ empty_published.delete()
+
def _mark_failure_for_timed_out_history(
self, original_workflow: AutomationWorkflow
) -> None:
@@ -844,16 +911,74 @@ def _mark_failure_for_timed_out_history(
is marked as failed.
"""
- max_history_date = timezone.now() - timedelta(
+ now = timezone.now()
+ max_history_date = now - timedelta(
hours=settings.AUTOMATION_WORKFLOW_TIMEOUT_HOURS
)
- original_workflow.workflow_histories.filter(
- status=HistoryStatusChoices.STARTED, started_on__lt=max_history_date
+
+ error = "This workflow took too long and was timed out."
+
+ workflow_history_ids = list(
+ original_workflow.workflow_histories.filter(
+ status=HistoryStatusChoices.STARTED,
+ started_on__lt=max_history_date,
+ ).values_list("id", flat=True)
+ )
+
+ if not workflow_history_ids:
+ return
+
+ AutomationWorkflowHistory.objects.filter(
+ id__in=workflow_history_ids,
+ ).update(
+ status=HistoryStatusChoices.ERROR,
+ message=error,
+ completed_on=now,
+ )
+
+ AutomationNodeHistory.objects.filter(
+ workflow_history_id__in=workflow_history_ids,
+ status=HistoryStatusChoices.STARTED,
).update(
status=HistoryStatusChoices.ERROR,
- message="This workflow took too long and was timed out.",
+ message=error,
+ completed_on=now,
)
+ def _ensure_published_for_run(
+ self, workflow: AutomationWorkflow
+ ) -> AutomationWorkflow:
+ """
+ Ensure an up-to-date cloned automation exists for test runs.
+
+ The cloned workflow is used by the history, which decouples the
+ history's workflow from the draft or published workflow.
+ """
+
+ latest_clone = (
+ AutomationWorkflow.objects.filter(
+ state=WorkflowState.TEST_CLONE, automation__published_from=workflow
+ )
+ .order_by("-automation__id")
+ .first()
+ )
+
+ is_dirty_key = WORKFLOW_DIRTY_CACHE_KEY.format(workflow.id)
+ is_dirty = global_cache.get(is_dirty_key, default=False)
+
+ # If the clone exists and the workflow hasn't been updated since,
+ # use that existing clone's workflow.
+ if not is_dirty and latest_clone:
+ cloned_workflow = latest_clone
+ else:
+ cloned_automation, _ = self._clone_workflow(
+ workflow, WorkflowState.TEST_CLONE
+ )
+ cloned_workflow = cloned_automation.workflows.first()
+ global_cache.invalidate(is_dirty_key)
+
+ return cloned_workflow
+
def _get_workflow_history_rate_limit_cache_key(
self, original_workflow: AutomationWorkflow
) -> str:
@@ -1009,14 +1134,21 @@ def async_start_workflow(
history_status = HistoryStatusChoices.ERROR
create_history_entry = True
- original_workflow = workflow.get_original()
-
simulate_until_node = (
workflow.get_graph().get_node(workflow.simulate_until_node_id)
if workflow.simulate_until_node_id
else None
)
+ is_test_run = (
+ workflow.is_original() or workflow.state == WorkflowState.TEST_CLONE
+ )
+
+ if workflow.is_original() and simulate_until_node is None:
+ workflow = self._ensure_published_for_run(workflow)
+
+ original_workflow = workflow.get_original()
+
try:
self.before_run(workflow)
except AutomationWorkflowRateLimited as e:
@@ -1056,8 +1188,9 @@ def async_start_workflow(
now = timezone.now()
AutomationHistoryHandler().create_workflow_history(
- original_workflow,
- is_test_run=original_workflow == workflow,
+ original_workflow=original_workflow,
+ workflow=workflow,
+ is_test_run=is_test_run,
started_on=now,
completed_on=now,
message=error,
@@ -1065,18 +1198,20 @@ def async_start_workflow(
)
return
- # If the currently running workflow is an unpublished workflow then we are
- # testing it.
- is_test_run = original_workflow == workflow
-
history = AutomationHistoryHandler().create_workflow_history(
- original_workflow,
+ original_workflow=original_workflow,
+ workflow=workflow,
started_on=timezone.now(),
is_test_run=is_test_run,
event_payload=event_payload,
simulate_until_node=simulate_until_node,
)
+ automation_workflow_dispatch_started.send(
+ sender=None,
+ workflow_history=history,
+ )
+
transaction.on_commit(
lambda: start_workflow_celery_task.delay(workflow.id, history.id)
)
diff --git a/backend/src/baserow/contrib/automation/workflows/signals.py b/backend/src/baserow/contrib/automation/workflows/signals.py
index b6588ab3fe..232431ba45 100644
--- a/backend/src/baserow/contrib/automation/workflows/signals.py
+++ b/backend/src/baserow/contrib/automation/workflows/signals.py
@@ -5,3 +5,5 @@
automation_workflow_updated = Signal()
automation_workflow_published = Signal()
automation_workflows_reordered = Signal()
+automation_workflow_dispatch_started = Signal()
+automation_workflow_dispatch_done = Signal()
diff --git a/backend/src/baserow/contrib/automation/workflows/tasks.py b/backend/src/baserow/contrib/automation/workflows/tasks.py
index a3749dae9f..ae4f2c09df 100644
--- a/backend/src/baserow/contrib/automation/workflows/tasks.py
+++ b/backend/src/baserow/contrib/automation/workflows/tasks.py
@@ -8,6 +8,9 @@
from baserow.contrib.automation.history.constants import HistoryStatusChoices
from baserow.contrib.automation.history.handler import AutomationHistoryHandler
from baserow.contrib.automation.history.models import AutomationWorkflowHistory
+from baserow.contrib.automation.workflows.signals import (
+ automation_workflow_dispatch_done,
+)
from baserow.core.db import atomic_with_retry_on_deadlock
@@ -54,7 +57,6 @@ def handle_workflow_dispatch_done(
AutomationWorkflowHistory.objects.filter(
id=history_id, simulate_until_node_id=simulate_until_node_id
).delete()
-
else:
# Only update the history if it's still started.
# If the workflow history was marked as failed by a specific node, we
@@ -66,3 +68,9 @@ def handle_workflow_dispatch_done(
status=HistoryStatusChoices.SUCCESS,
completed_on=timezone.now(),
)
+
+ history = AutomationWorkflowHistory.objects.get(id=history_id)
+ automation_workflow_dispatch_done.send(
+ sender=None,
+ workflow_history=history,
+ )
diff --git a/backend/src/baserow/contrib/automation/workflows/ws/signals.py b/backend/src/baserow/contrib/automation/workflows/ws/signals.py
index 6a0c42f428..867f9c40d2 100644
--- a/backend/src/baserow/contrib/automation/workflows/ws/signals.py
+++ b/backend/src/baserow/contrib/automation/workflows/ws/signals.py
@@ -21,6 +21,8 @@
from baserow.contrib.automation.workflows.signals import (
automation_workflow_created,
automation_workflow_deleted,
+ automation_workflow_dispatch_done,
+ automation_workflow_dispatch_started,
automation_workflow_published,
automation_workflow_updated,
automation_workflows_reordered,
@@ -126,3 +128,41 @@ def workflow_reordered(
getattr(user, "web_socket_id", None),
)
)
+
+
+@receiver(automation_workflow_dispatch_started)
+def workflow_dispatch_started(sender, workflow_history, **kwargs):
+ workflow = workflow_history.original_workflow
+ transaction.on_commit(
+ lambda: broadcast_to_permitted_users.delay(
+ workflow.automation.workspace_id,
+ ReadAutomationWorkflowOperationType.type,
+ AutomationWorkflowObjectScopeType.type,
+ workflow.id,
+ {
+ "type": "automation_workflow_dispatch_started",
+ "workflow_id": workflow.id,
+ "history_id": workflow_history.id,
+ },
+ None,
+ )
+ )
+
+
+@receiver(automation_workflow_dispatch_done)
+def workflow_dispatch_done(sender, workflow_history, **kwargs):
+ workflow = workflow_history.original_workflow
+ transaction.on_commit(
+ lambda: broadcast_to_permitted_users.delay(
+ workflow.automation.workspace_id,
+ ReadAutomationWorkflowOperationType.type,
+ AutomationWorkflowObjectScopeType.type,
+ workflow.id,
+ {
+ "type": "automation_workflow_dispatch_done",
+ "workflow_id": workflow.id,
+ "history_id": workflow_history.id,
+ },
+ None,
+ )
+ )
diff --git a/backend/src/baserow/test_utils/fixtures/automation_history.py b/backend/src/baserow/test_utils/fixtures/automation_history.py
index 894bdd83cd..f2930e32e9 100644
--- a/backend/src/baserow/test_utils/fixtures/automation_history.py
+++ b/backend/src/baserow/test_utils/fixtures/automation_history.py
@@ -42,6 +42,7 @@ def create_workflow_history(self, user=None, **kwargs):
)
history = AutomationHistoryHandler().create_workflow_history(
+ original_workflow=original_workflow,
workflow=original_workflow,
started_on=started_on,
is_test_run=is_test_run,
diff --git a/backend/src/baserow/test_utils/fixtures/automation_workflow_history.py b/backend/src/baserow/test_utils/fixtures/automation_workflow_history.py
index 16f82ab663..b72142f02a 100644
--- a/backend/src/baserow/test_utils/fixtures/automation_workflow_history.py
+++ b/backend/src/baserow/test_utils/fixtures/automation_workflow_history.py
@@ -1,11 +1,18 @@
from django.utils import timezone
from baserow.contrib.automation.history.constants import HistoryStatusChoices
-from baserow.contrib.automation.history.models import AutomationWorkflowHistory
+from baserow.contrib.automation.history.models import (
+ AutomationNodeHistory,
+ AutomationNodeResult,
+ AutomationWorkflowHistory,
+)
class AutomationWorkflowHistoryFixtures:
def create_automation_workflow_history(self, user=None, **kwargs):
+ if "original_workflow" not in kwargs:
+ kwargs["original_workflow"] = kwargs["workflow"]
+
if "workflow" not in kwargs:
user = user or self.create_user()
kwargs["workflow"] = self.create_automation_workflow(user=user)
@@ -20,3 +27,36 @@ def create_automation_workflow_history(self, user=None, **kwargs):
kwargs["is_test_run"] = False
return AutomationWorkflowHistory.objects.create(**kwargs)
+
+ def create_automation_node_history(self, user=None, **kwargs):
+ user = user or self.create_user()
+
+ if "workflow_history" not in kwargs:
+ kwargs["workflow_history"] = self.create_automation_workflow_history(
+ user=user, **kwargs
+ )
+
+ if "node" not in kwargs:
+ kwargs["node"] = self.create_automation_node(
+ user=user,
+ workflow=kwargs["workflow_history"].workflow,
+ **kwargs,
+ )
+
+ if "started_on" not in kwargs:
+ kwargs["started_on"] = timezone.now()
+
+ if "status" not in kwargs:
+ kwargs["status"] = HistoryStatusChoices.STARTED
+
+ return AutomationNodeHistory.objects.create(**kwargs)
+
+ def create_automation_node_result(self, user=None, **kwargs):
+ user = user or self.create_user()
+
+ if "node_history" not in kwargs:
+ kwargs["node_history"] = self.create_automation_node_history(
+ user=user, **kwargs
+ )
+
+ return AutomationNodeResult.objects.create(**kwargs)
diff --git a/backend/tests/baserow/contrib/automation/api/workflows/test_workflow_views.py b/backend/tests/baserow/contrib/automation/api/workflows/test_workflow_views.py
index 2f5ef19d68..e71cbac17d 100644
--- a/backend/tests/baserow/contrib/automation/api/workflows/test_workflow_views.py
+++ b/backend/tests/baserow/contrib/automation/api/workflows/test_workflow_views.py
@@ -1,5 +1,6 @@
import datetime
+from django.db.models import Count, Q
from django.urls import reverse
from django.utils import timezone
@@ -14,8 +15,14 @@
HTTP_404_NOT_FOUND,
)
+from baserow.contrib.automation.api.workflows.serializers import (
+ AutomationWorkflowHistorySerializer,
+)
+from baserow.contrib.automation.history.constants import HistoryStatusChoices
+from baserow.contrib.automation.history.handler import AutomationHistoryHandler
from baserow.contrib.automation.workflows.constants import ALLOW_TEST_RUN_MINUTES
from baserow.contrib.database.rows.handler import RowHandler
+from baserow.core.cache import local_cache
from baserow.test_utils.helpers import AnyInt, AnyStr
from tests.baserow.contrib.automation.api.utils import get_api_kwargs
@@ -635,6 +642,8 @@ def test_get_workflow_histories(api_client, data_fixture):
assert response.status_code == HTTP_200_OK
assert response.json() == {
"count": 1,
+ "fail_count": 0,
+ "success_count": 1,
"next": None,
"previous": None,
"results": [
@@ -645,6 +654,9 @@ def test_get_workflow_histories(api_client, data_fixture):
"is_test_run": False,
"message": "",
"status": "success",
+ "event_payload": None,
+ "node_histories": [],
+ "simulate_until_node": None,
},
],
}
@@ -706,3 +718,150 @@ def test_rename_workflow_using_existing_workflow_name(api_client, data_fixture):
assert workflow_1.name == "test1"
workflow_2.refresh_from_db()
assert workflow_2.name == "test1"
+
+
+@pytest.mark.django_db
+def test_get_workflow_histories_query_count(data_fixture, django_assert_num_queries):
+ user = data_fixture.create_user()
+ workflow = data_fixture.create_automation_workflow(user=user)
+ trigger = workflow.get_trigger()
+
+ handler = AutomationHistoryHandler()
+
+ def _create_histories(count):
+ for _ in range(count):
+ workflow_history = handler.create_workflow_history(
+ original_workflow=workflow,
+ workflow=workflow,
+ started_on=timezone.now(),
+ is_test_run=False,
+ )
+ node_history = handler.create_node_history(
+ workflow_history=workflow_history,
+ node=trigger,
+ started_on=timezone.now(),
+ )
+ handler.create_node_result(
+ node_history=node_history,
+ result={"foo": "bar"},
+ )
+
+ _create_histories(3)
+ local_cache.clear()
+
+ expected_queries = 10
+ with django_assert_num_queries(expected_queries):
+ queryset = handler.get_workflow_histories(workflow)
+ queryset.aggregate(
+ success_count=Count("id", filter=Q(status=HistoryStatusChoices.SUCCESS)),
+ fail_count=Count("id", filter=Q(status=HistoryStatusChoices.ERROR)),
+ )
+ AutomationWorkflowHistorySerializer(list(queryset), many=True).data
+
+ _create_histories(3)
+ local_cache.clear()
+
+ with django_assert_num_queries(expected_queries):
+ queryset = handler.get_workflow_histories(workflow)
+ queryset.aggregate(
+ success_count=Count("id", filter=Q(status=HistoryStatusChoices.SUCCESS)),
+ fail_count=Count("id", filter=Q(status=HistoryStatusChoices.ERROR)),
+ )
+ AutomationWorkflowHistorySerializer(list(queryset), many=True).data
+
+
+@pytest.mark.django_db
+def test_get_workflow_histories_with_node_histories(api_client, data_fixture):
+ user, token = data_fixture.create_user_and_token()
+ workflow = data_fixture.create_automation_workflow(user=user)
+ trigger = workflow.get_trigger()
+ action_node = data_fixture.create_local_baserow_create_row_action_node(
+ user=user, workflow=workflow, label="My Action"
+ )
+
+ now = timezone.now()
+ workflow_history = data_fixture.create_automation_workflow_history(
+ user=user,
+ workflow=workflow,
+ status=HistoryStatusChoices.SUCCESS,
+ completed_on=now,
+ )
+ node_history_1 = data_fixture.create_automation_node_history(
+ user=user,
+ workflow_history=workflow_history,
+ node=trigger,
+ completed_on=now,
+ )
+ node_result_1 = data_fixture.create_automation_node_result(
+ user=user,
+ node_history=node_history_1,
+ result={"rows": [1, 2]},
+ )
+ node_history_2 = data_fixture.create_automation_node_history(
+ user=user,
+ workflow_history=workflow_history,
+ node=action_node,
+ completed_on=now,
+ )
+ node_result_2 = data_fixture.create_automation_node_result(
+ user=user,
+ node_history=node_history_2,
+ result={"created_row_id": 99},
+ iteration_path="0.2.1",
+ )
+
+ url = reverse(API_URL_WORKFLOW_HISTORY, kwargs={"workflow_id": workflow.id})
+ response = api_client.get(url, **get_api_kwargs(token))
+
+ assert response.status_code == HTTP_200_OK
+ data = response.json()
+
+ assert data["count"] == 1
+ assert data["success_count"] == 1
+ assert data["fail_count"] == 0
+
+ w_history = data["results"][0]
+ assert w_history["id"] == workflow_history.id
+ assert w_history["status"] == "success"
+ assert len(w_history["node_histories"]) == 2
+
+ n_history_1 = w_history["node_histories"][0]
+ assert n_history_1["node"] == trigger.id
+ assert n_history_1["workflow_history"] == workflow_history.id
+ assert n_history_1["node_type"] == trigger.get_type().type
+ assert n_history_1["node_label"] == trigger.label
+ assert n_history_1["parent_node_id"] is None
+ assert n_history_1["iteration"] == 0
+ assert n_history_1["result"] == {"rows": [1, 2]}
+
+ n_history_2 = w_history["node_histories"][1]
+ assert n_history_2["node"] == action_node.id
+ assert n_history_2["workflow_history"] == workflow_history.id
+ assert n_history_2["node_type"] == action_node.get_type().type
+ assert n_history_2["node_label"] == "My Action"
+ assert n_history_2["iteration"] == 1
+ assert n_history_2["result"] == {"created_row_id": 99}
+
+
+@pytest.mark.django_db
+def test_get_workflow_histories_has_success_and_fail_counts(api_client, data_fixture):
+ user, token = data_fixture.create_user_and_token()
+ workflow = data_fixture.create_automation_workflow(user=user)
+ data_fixture.create_automation_workflow_history(
+ workflow=workflow, status=HistoryStatusChoices.SUCCESS
+ )
+ data_fixture.create_automation_workflow_history(
+ workflow=workflow, status=HistoryStatusChoices.SUCCESS
+ )
+ data_fixture.create_automation_workflow_history(
+ workflow=workflow, status=HistoryStatusChoices.ERROR
+ )
+
+ url = reverse(API_URL_WORKFLOW_HISTORY, kwargs={"workflow_id": workflow.id})
+ response = api_client.get(url, **get_api_kwargs(token))
+
+ assert response.status_code == HTTP_200_OK
+ data = response.json()
+ assert data["count"] == 3
+ assert data["success_count"] == 2
+ assert data["fail_count"] == 1
diff --git a/backend/tests/baserow/contrib/automation/data_providers/test_data_provider_types.py b/backend/tests/baserow/contrib/automation/data_providers/test_data_provider_types.py
index ef7bfed141..e2c5329d16 100644
--- a/backend/tests/baserow/contrib/automation/data_providers/test_data_provider_types.py
+++ b/backend/tests/baserow/contrib/automation/data_providers/test_data_provider_types.py
@@ -17,6 +17,7 @@
def test_previous_node_data_provider_get_data_chunk(data_fixture):
workflow = data_fixture.create_automation_workflow()
workflow_history = AutomationHistoryHandler().create_workflow_history(
+ workflow,
workflow,
timezone.now(),
False,
@@ -78,6 +79,7 @@ def test_previous_node_data_provider_get_data_chunk(data_fixture):
assert exc.value.args[0] == "The previous node doesn't exist"
workflow_history_2 = AutomationHistoryHandler().create_workflow_history(
+ workflow,
workflow,
timezone.now(),
False,
@@ -132,6 +134,7 @@ def test_previous_node_data_provider_import_path(data_fixture):
def test_current_iteration_data_provider_get_data_chunk(data_fixture):
workflow = data_fixture.create_automation_workflow()
workflow_history = AutomationHistoryHandler().create_workflow_history(
+ workflow,
workflow,
timezone.now(),
False,
diff --git a/backend/tests/baserow/contrib/automation/history/test_history_handler.py b/backend/tests/baserow/contrib/automation/history/test_history_handler.py
index 1238f2645a..20bff66ca7 100644
--- a/backend/tests/baserow/contrib/automation/history/test_history_handler.py
+++ b/backend/tests/baserow/contrib/automation/history/test_history_handler.py
@@ -59,6 +59,7 @@ def test_create_workflow_history(data_fixture):
now = timezone.now()
history = AutomationHistoryHandler().create_workflow_history(
+ original_workflow,
original_workflow,
now,
False,
@@ -93,6 +94,7 @@ def test_get_workflow_histories_excludes_simulation_histories(data_fixture):
def test_get_workflow_history(data_fixture):
workflow = data_fixture.create_automation_workflow()
history = AutomationHistoryHandler().create_workflow_history(
+ workflow,
workflow,
timezone.now(),
False,
@@ -115,6 +117,7 @@ def test_get_workflow_history_does_not_exist(data_fixture):
def test_get_workflow_history_respects_base_queryset(data_fixture):
workflow = data_fixture.create_automation_workflow()
history = AutomationHistoryHandler().create_workflow_history(
+ workflow,
workflow,
timezone.now(),
False,
diff --git a/backend/tests/baserow/contrib/automation/history/utils.py b/backend/tests/baserow/contrib/automation/history/utils.py
index 804da2780f..0e2f113da7 100644
--- a/backend/tests/baserow/contrib/automation/history/utils.py
+++ b/backend/tests/baserow/contrib/automation/history/utils.py
@@ -7,13 +7,13 @@ def assert_history(
"""Helper to test AutomationWorkflowHistory objects."""
histories = list(
- AutomationWorkflowHistory.objects.filter(workflow=workflow).order_by(
+ AutomationWorkflowHistory.objects.filter(original_workflow=workflow).order_by(
"started_on", "id"
)
)
assert len(histories) == expected_count
if expected_count > 0:
history = histories[history_index]
- assert history.workflow == workflow
+ assert history.original_workflow == workflow
assert history.status == expected_status
assert history.message == expected_msg
diff --git a/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch_async.py b/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch_async.py
index 5f5806f3ee..373f01c3be 100644
--- a/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch_async.py
+++ b/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch_async.py
@@ -19,6 +19,7 @@
"baserow.contrib.automation.nodes.node_types.LocalBaserowRowsCreatedNodeTriggerType"
)
NODE_HANDLER_PATH = "baserow.contrib.automation.nodes.handler"
+TASKS_PATH = "baserow.contrib.automation.workflows.tasks"
def assert_dispatches_next_node(result, *expected_tasks):
@@ -205,6 +206,10 @@ def test_dispatch_node_service_error(data_fixture):
assert error in node_history.message
assert node_history.status == HistoryStatusChoices.ERROR
+ node_result = AutomationNodeResult.objects.get(node_history=node_history)
+ assert node_result.result == {}
+ assert node_result.iteration_path == ""
+
@pytest.mark.django_db
@patch(f"{TRIGGER_NODE_TYPE_PATH}.dispatch")
@@ -234,6 +239,10 @@ def test_dispatch_node_unexpected_error(mock_logger, mock_dispatch, data_fixture
assert error in node_history.message
assert node_history.status == HistoryStatusChoices.ERROR
+ node_result = AutomationNodeResult.objects.get(node_history=node_history)
+ assert node_result.result == {}
+ assert node_result.iteration_path == ""
+
@pytest.mark.django_db
@patch(f"{TRIGGER_NODE_TYPE_PATH}.dispatch")
@@ -269,6 +278,10 @@ def test_dispatch_node_expected_error(mock_logger, mock_dispatch, data_fixture):
assert error in node_history.message
assert node_history.status == HistoryStatusChoices.ERROR
+ node_result = AutomationNodeResult.objects.get(node_history=node_history)
+ assert node_result.result == {}
+ assert node_result.iteration_path == ""
+
@pytest.mark.django_db
def test_dispatch_node_dispatches_trigger(data_fixture):
@@ -283,6 +296,7 @@ def test_dispatch_node_dispatches_trigger(data_fixture):
)
handle_workflow_dispatch_done(history_id=workflow_history.id)
+
workflow_history.refresh_from_db()
assert workflow_history.message == ""
assert workflow_history.status == HistoryStatusChoices.SUCCESS
@@ -1489,3 +1503,78 @@ def test_dispatch_node_with_deleted_node(mock_logger, data_fixture):
"deleted before the task was executed."
)
mock_logger.warning.assert_called_once_with(expected_error)
+
+
+@pytest.mark.django_db
+@patch(f"{NODE_HANDLER_PATH}.automation_node_updated")
+def test_dispatch_node_simulation_copies_sample_data_to_draft_node(
+ mock_automation_node_updated,
+ data_fixture,
+):
+ data = create_workflow(data_fixture)
+ workflow = data["workflow"]
+ trigger_node = data["trigger_node"]
+
+ workflow_history = data_fixture.create_automation_workflow_history(
+ original_workflow=workflow,
+ workflow=workflow,
+ simulate_until_node=trigger_node,
+ is_test_run=True,
+ )
+
+ assert trigger_node.service.specific.sample_data is None
+ AutomationNodeHandler().dispatch_node(
+ trigger_node.id,
+ history_id=workflow_history.id,
+ )
+
+ # Ensure the sample_data is saved to the draft node's service
+ trigger_node.service.specific.refresh_from_db()
+ assert trigger_node.service.specific.sample_data is not None
+
+ # Ensure a signal is sent for the draft node
+ mock_automation_node_updated.send.assert_called_once_with(
+ ANY, user=None, node=trigger_node
+ )
+
+
+@pytest.mark.django_db
+@patch(f"{TASKS_PATH}.automation_workflow_dispatch_done")
+def test_handle_workflow_dispatch_done_sends_signal_on_success(
+ mock_dispatch_done_signal, data_fixture
+):
+ data = create_workflow(data_fixture)
+ workflow_history = data["workflow_history"]
+ assert workflow_history.status == HistoryStatusChoices.STARTED
+
+ handle_workflow_dispatch_done(history_id=workflow_history.id)
+
+ workflow_history.refresh_from_db()
+ assert workflow_history.status == HistoryStatusChoices.SUCCESS
+ mock_dispatch_done_signal.send.assert_called_once()
+ assert (
+ mock_dispatch_done_signal.send.call_args.kwargs["workflow_history"].id
+ == workflow_history.id
+ )
+
+
+@pytest.mark.django_db
+@patch(f"{TASKS_PATH}.automation_workflow_dispatch_done")
+def test_handle_workflow_dispatch_done_sends_signal_on_error(
+ mock_dispatch_done_signal, data_fixture
+):
+ data = create_workflow(data_fixture)
+ workflow_history = data["workflow_history"]
+ workflow_history.status = HistoryStatusChoices.ERROR
+ workflow_history.message = "Node failed"
+ workflow_history.save()
+
+ handle_workflow_dispatch_done(history_id=workflow_history.id)
+
+ workflow_history.refresh_from_db()
+ assert workflow_history.status == HistoryStatusChoices.ERROR
+ mock_dispatch_done_signal.send.assert_called_once()
+ assert (
+ mock_dispatch_done_signal.send.call_args.kwargs["workflow_history"].id
+ == workflow_history.id
+ )
diff --git a/backend/tests/baserow/contrib/automation/nodes/test_node_service.py b/backend/tests/baserow/contrib/automation/nodes/test_node_service.py
index 23fb8ada51..52f01b6046 100644
--- a/backend/tests/baserow/contrib/automation/nodes/test_node_service.py
+++ b/backend/tests/baserow/contrib/automation/nodes/test_node_service.py
@@ -11,6 +11,8 @@
from baserow.contrib.automation.nodes.registries import automation_node_type_registry
from baserow.contrib.automation.nodes.service import AutomationNodeService
from baserow.contrib.automation.nodes.trash_types import AutomationNodeTrashableItemType
+from baserow.contrib.automation.workflows.constants import WORKFLOW_DIRTY_CACHE_KEY
+from baserow.core.cache import global_cache
from baserow.core.exceptions import UserNotInWorkspace
from baserow.core.trash.handler import TrashHandler
from baserow.test_utils.fixtures import Fixtures
@@ -755,3 +757,91 @@ def test_move_node_invalid_reference_node(data_fixture: Fixtures):
)
assert exc.value.args[0] == f"The reference node {action2.id} can't have child"
+
+
+@pytest.mark.django_db
+def test_update_node_updates_workflow_dirty_cache(data_fixture):
+ """
+ When a node is updated, the workflow's dirty cache flag should be set
+ so that the test clone knows to create a new clone instead of
+ reusing the last one.
+ """
+
+ user = data_fixture.create_user()
+ node = data_fixture.create_automation_node(user=user)
+
+ cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(node.workflow.id)
+
+ assert global_cache.get(cache_key, default=False) is False
+
+ AutomationNodeService().update_node(user, node.id, label="foo label")
+
+ assert global_cache.get(cache_key, default=False) is True
+
+
+@pytest.mark.django_db
+def test_create_node_updates_workflow_dirty_cache(data_fixture):
+ """
+ When a node is created, the workflow's dirty cache flag should be set
+ so that the test clone knows to create a new clone instead of
+ reusing the last one.
+ """
+
+ user = data_fixture.create_user()
+ node = data_fixture.create_automation_node(user=user)
+ workflow = node.workflow
+
+ cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(workflow.id)
+
+ assert global_cache.get(cache_key, default=False) is False
+
+ AutomationNodeService().create_node(
+ user,
+ node_type=automation_node_type_registry.get("local_baserow_create_row"),
+ workflow=workflow,
+ reference_node_id=node.id,
+ position="south",
+ output="",
+ )
+
+ assert global_cache.get(cache_key, default=False) is True
+
+
+@pytest.mark.django_db
+def test_duplicate_node_updates_workflow_dirty_cache(data_fixture):
+ """
+ When a node is duplicated, the workflow's dirty cache flag should be set
+ so that the test clone knows to create a new clone instead of
+ reusing the last one.
+ """
+
+ user = data_fixture.create_user()
+ node = data_fixture.create_local_baserow_create_row_action_node(user=user)
+
+ cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(node.workflow.id)
+
+ assert global_cache.get(cache_key, default=False) is False
+
+ AutomationNodeService().duplicate_node(user, node.id)
+
+ assert global_cache.get(cache_key, default=False) is True
+
+
+@pytest.mark.django_db
+def test_delete_node_updates_workflow_dirty_cache(data_fixture):
+ """
+ When a node is deleted, the workflow's dirty cache flag should be set
+ so that the test clone knows to create a new clone instead of
+ reusing the last one.
+ """
+
+ user = data_fixture.create_user()
+ node = data_fixture.create_automation_node(user=user)
+
+ cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(node.workflow.id)
+
+ assert global_cache.get(cache_key, default=False) is False
+
+ AutomationNodeService().delete_node(user, node.id)
+
+ assert global_cache.get(cache_key, default=False) is True
diff --git a/backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py b/backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py
index 5c9756bb5e..7d1aac515a 100644
--- a/backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py
+++ b/backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py
@@ -10,14 +10,18 @@
from freezegun import freeze_time
from baserow.contrib.automation.history.constants import HistoryStatusChoices
-from baserow.contrib.automation.history.models import AutomationWorkflowHistory
-from baserow.contrib.automation.models import AutomationWorkflow
+from baserow.contrib.automation.history.models import (
+ AutomationNodeHistory,
+ AutomationWorkflowHistory,
+)
+from baserow.contrib.automation.models import Automation, AutomationWorkflow
from baserow.contrib.automation.nodes.node_types import (
CorePeriodicTriggerNodeType,
LocalBaserowRowsCreatedNodeTriggerType,
)
from baserow.contrib.automation.workflows.constants import (
ALLOW_TEST_RUN_MINUTES,
+ WORKFLOW_DIRTY_CACHE_KEY,
WorkflowState,
)
from baserow.contrib.automation.workflows.exceptions import (
@@ -28,7 +32,7 @@
AutomationWorkflowTooManyErrors,
)
from baserow.contrib.automation.workflows.handler import AutomationWorkflowHandler
-from baserow.core.cache import local_cache
+from baserow.core.cache import global_cache, local_cache
from baserow.core.notifications.models import Notification, NotificationRecipient
from baserow.core.registries import ImportExportConfig
from baserow.core.trash.handler import TrashHandler
@@ -429,13 +433,24 @@ def test_publish_returns_published_workflow(data_fixture):
@pytest.mark.django_db
def test_publish_cleans_up_old_workflows(data_fixture):
workflow = data_fixture.create_automation_workflow()
+ handler = AutomationWorkflowHandler()
+
+ test_clone_automation, _ = handler._clone_workflow(
+ workflow, WorkflowState.TEST_CLONE
+ )
+ test_clone_workflow = test_clone_automation.workflows.first()
+ data_fixture.create_automation_workflow_history(
+ original_workflow=workflow,
+ workflow=test_clone_workflow,
+ status=HistoryStatusChoices.SUCCESS,
+ )
- published_1 = AutomationWorkflowHandler().publish(workflow)
- published_2 = AutomationWorkflowHandler().publish(workflow)
- published_3 = AutomationWorkflowHandler().publish(workflow)
- published_4 = AutomationWorkflowHandler().publish(workflow)
+ published_1 = handler.publish(workflow)
+ published_2 = handler.publish(workflow)
+ published_3 = handler.publish(workflow)
+ published_4 = handler.publish(workflow)
- # The first two workflows should no longer exist
+ # The first two published workflows should no longer exist
assert AutomationWorkflow.objects_and_trash.filter(id=published_1.id).count() == 0
assert AutomationWorkflow.objects_and_trash.filter(id=published_2.id).count() == 0
@@ -446,6 +461,53 @@ def test_publish_cleans_up_old_workflows(data_fixture):
# The latest published workflow should be active
assert published_4.is_published is True
+ # The test clone should still exist and the state should still be correct
+ test_clone_workflow.refresh_from_db()
+ assert AutomationWorkflow.objects_and_trash.filter(
+ id=test_clone_workflow.id
+ ).exists()
+ assert test_clone_workflow.state == WorkflowState.TEST_CLONE
+
+
+@pytest.mark.django_db
+def test_publish_disables_live_workflow(data_fixture):
+ workflow = data_fixture.create_automation_workflow()
+ handler = AutomationWorkflowHandler()
+
+ # Publish the workflow
+ published = handler.publish(workflow)
+ data_fixture.create_automation_workflow_history(
+ original_workflow=workflow,
+ workflow=published,
+ status=HistoryStatusChoices.SUCCESS,
+ )
+ assert published.state == WorkflowState.LIVE
+
+ # Edit the draft to force _ensure_published_for_run to create a new clone
+ workflow.refresh_from_db()
+ workflow.save()
+
+ # Create a test clone to simulate a test run
+ test_clone_automation, _ = handler._clone_workflow(
+ workflow, WorkflowState.TEST_CLONE
+ )
+ test_clone_workflow = test_clone_automation.workflows.first()
+
+ # The test clone should be the newest automation
+ assert test_clone_automation.id > published.automation.id
+
+ # The previously published workflow should now be disabled
+ new_published = handler.publish(workflow)
+ published.refresh_from_db()
+ assert published.state == WorkflowState.DISABLED
+
+ # Make sure the newly published workflow is live
+ assert new_published.state == WorkflowState.LIVE
+
+ # The test clone should be unaffected
+ test_clone_workflow.refresh_from_db()
+ assert test_clone_workflow.state == WorkflowState.TEST_CLONE
+
@pytest.mark.django_db
def test_publish_only_exports_specific_workflow(data_fixture):
@@ -512,6 +574,27 @@ def test_get_published_workflow_returns_workflow(data_fixture):
assert result == published_workflow
+@pytest.mark.django_db
+def test_get_published_workflow_ignores_newer_test_clone(data_fixture):
+ workflow = data_fixture.create_automation_workflow()
+ handler = AutomationWorkflowHandler()
+
+ published_workflow = handler.publish(workflow)
+ test_clone_automation, _ = handler._clone_workflow(
+ workflow, WorkflowState.TEST_CLONE
+ )
+ test_clone_workflow = test_clone_automation.workflows.first()
+
+ # Sanity check that the newer clone would win without the TEST_CLONE exclusion.
+ assert test_clone_automation.id > published_workflow.automation.id
+ assert test_clone_workflow.state == WorkflowState.TEST_CLONE
+
+ result = handler.get_published_workflow(workflow, with_cache=False)
+
+ assert result == published_workflow
+ assert result != test_clone_workflow
+
+
@pytest.mark.django_db
def test_update_workflow_correctly_pauses_published_workflow(data_fixture):
user = data_fixture.create_user()
@@ -833,7 +916,9 @@ def test_async_start_workflow_creates_rate_limited_history_once_until_cache_rese
handler.async_start_workflow(published_workflow)
handler.async_start_workflow(published_workflow)
- histories = AutomationWorkflowHistory.objects.filter(workflow=original_workflow)
+ histories = AutomationWorkflowHistory.objects.filter(
+ original_workflow=original_workflow
+ )
assert histories.count() == 1
assert_history(
original_workflow,
@@ -847,7 +932,9 @@ def test_async_start_workflow_creates_rate_limited_history_once_until_cache_rese
with freeze_time("2026-01-26 13:00:06"):
handler.async_start_workflow(published_workflow)
- histories = AutomationWorkflowHistory.objects.filter(workflow=original_workflow)
+ histories = AutomationWorkflowHistory.objects.filter(
+ original_workflow=original_workflow
+ )
assert histories.count() == 2
@@ -1185,7 +1272,7 @@ def test_toggle_test_mode_on(
frozen_time = "2025-06-04 11:00"
with freeze_time(frozen_time):
- AutomationWorkflowHandler().toggle_test_run(workflow)
+ AutomationWorkflowHandler().toggle_test_run(workflow, None)
workflow.refresh_from_db()
@@ -1210,7 +1297,7 @@ def test_toggle_test_mode_on_immediate(
frozen_time = "2025-06-04 11:00"
with freeze_time(frozen_time):
- AutomationWorkflowHandler().toggle_test_run(workflow)
+ AutomationWorkflowHandler().toggle_test_run(workflow, None)
workflow.refresh_from_db()
@@ -1234,7 +1321,7 @@ def test_toggle_test_mode_off(
trigger_type=LocalBaserowRowsCreatedNodeTriggerType.type,
)
- AutomationWorkflowHandler().toggle_test_run(workflow)
+ AutomationWorkflowHandler().toggle_test_run(workflow, None)
workflow.refresh_from_db()
@@ -1436,12 +1523,13 @@ def test_async_start_workflow_with_simulate_until_node(
workflow.refresh_from_db()
history = workflow.workflow_histories.get()
+ cloned_trigger = history.workflow.get_trigger()
+ assert history.simulate_until_node_id == cloned_trigger.id
assert workflow.simulate_until_node is None
assert history.is_test_run is True
- assert history.simulate_until_node_id == trigger.id
mock_start_workflow_celery_task.delay.assert_called_once_with(
- workflow.id, history.id
+ history.workflow_id, history.id
)
@@ -1510,9 +1598,9 @@ def test_async_start_workflow_rate_limited_runs_eventually_disable_workflow(
AutomationWorkflowHandler().async_start_workflow(published_workflow)
histories = list(
- AutomationWorkflowHistory.objects.filter(workflow=original_workflow).order_by(
- "started_on", "id"
- )
+ AutomationWorkflowHistory.objects.filter(
+ original_workflow=original_workflow
+ ).order_by("started_on", "id")
)
assert len(histories) == 6
@@ -1553,8 +1641,12 @@ def test_async_start_workflow_rate_limited_runs_eventually_disable_workflow(
@pytest.mark.django_db
@patch(f"{WORKFLOWS_MODULE}.handler.transaction.on_commit")
@patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task")
+@patch(f"{WORKFLOWS_MODULE}.handler.automation_workflow_dispatch_started")
def test_async_start_workflow_queues_celery_task_on_commit(
- mock_start_workflow_celery_task, mock_on_commit, data_fixture
+ mock_automation_workflow_dispatch_started,
+ mock_start_workflow_celery_task,
+ mock_on_commit,
+ data_fixture,
):
workflow = data_fixture.create_automation_workflow()
@@ -1563,13 +1655,19 @@ def test_async_start_workflow_queues_celery_task_on_commit(
AutomationWorkflowHandler().async_start_workflow(workflow)
history = workflow.workflow_histories.get()
+ # Ensure the workflow_id is the cloned workflow, not the draft
+ assert history.workflow_id != workflow.id
mock_on_commit.assert_called_once()
mock_start_workflow_celery_task.delay.assert_not_called()
mock_on_commit.call_args.args[0]()
mock_start_workflow_celery_task.delay.assert_called_once_with(
- workflow.id, history.id
+ history.workflow_id, history.id
+ )
+ mock_automation_workflow_dispatch_started.send.assert_called_once_with(
+ sender=None,
+ workflow_history=history,
)
@@ -1670,14 +1768,27 @@ def test_before_run_marks_timed_out_started_history_as_failed(data_fixture):
workflow=original_workflow,
status=HistoryStatusChoices.STARTED,
)
+ node_history = AutomationNodeHistory.objects.create(
+ workflow_history=timed_out_history,
+ node=original_workflow.get_trigger(),
+ started_on=timed_out_history.started_on,
+ status=HistoryStatusChoices.STARTED,
+ )
with freeze_time("2026-03-10 12:00:00"):
AutomationWorkflowHandler().before_run(published_workflow)
- timed_out_history.refresh_from_db()
+ error_message = "This workflow took too long and was timed out."
+ timed_out_history.refresh_from_db()
assert timed_out_history.status == HistoryStatusChoices.ERROR
- assert timed_out_history.message == "This workflow took too long and was timed out."
+ assert timed_out_history.message == error_message
+ assert timed_out_history.completed_on is not None
+
+ node_history.refresh_from_db()
+ assert node_history.status == HistoryStatusChoices.ERROR
+ assert node_history.message == error_message
+ assert node_history.completed_on == timed_out_history.completed_on
@pytest.mark.django_db
@@ -1758,3 +1869,200 @@ def test_clear_old_history_excludes_started_workflows_max_days(data_fixture):
assert workflow.workflow_histories.filter(id=history_1.id).exists() is True
assert workflow.workflow_histories.filter(id=history_2.id).exists() is False
+
+
+@pytest.mark.django_db
+def test_ensure_published_for_run_creates_new_clone(data_fixture):
+ workflow = data_fixture.create_automation_workflow()
+
+ cloned_workflow = AutomationWorkflowHandler()._ensure_published_for_run(workflow)
+
+ # Ensure that the cloned workflow is a new workflow
+ assert cloned_workflow.id != workflow.id
+ assert cloned_workflow.automation.published_from == workflow
+ assert cloned_workflow.state == WorkflowState.TEST_CLONE
+
+
+@pytest.mark.django_db
+def test_ensure_published_for_run_creates_new_after_workflow_update(data_fixture):
+ workflow = data_fixture.create_automation_workflow()
+
+ handler = AutomationWorkflowHandler()
+
+ cloned_workflow_1 = handler._ensure_published_for_run(workflow)
+
+ # Set the dirty flag to trigger a new clone
+ cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(workflow.id)
+ global_cache.update(cache_key, lambda _: True)
+
+ cloned_workflow_2 = handler._ensure_published_for_run(workflow)
+
+ # Because the workflow was updated, a new clone should be created
+ assert cloned_workflow_1.id != cloned_workflow_2.id
+
+
+@pytest.mark.django_db
+def test_ensure_published_for_run_reuse_automation(data_fixture):
+ """
+ If a cloned automation exists and is still fresh (no edits to draft
+ since publish), we should reuse it rather than creating a new clone.
+ """
+
+ workflow = data_fixture.create_automation_workflow()
+
+ handler = AutomationWorkflowHandler()
+
+ cloned_workflow = handler._ensure_published_for_run(workflow)
+
+ second_cloned_workflow = handler._ensure_published_for_run(workflow)
+
+ assert cloned_workflow.automation_id == second_cloned_workflow.automation_id
+
+
+@pytest.mark.django_db
+def test_ensure_published_for_run_ignores_published_workflow_states(data_fixture):
+ workflow = data_fixture.create_automation_workflow()
+ handler = AutomationWorkflowHandler()
+
+ published_workflow = handler.publish(workflow)
+ clone = handler._ensure_published_for_run(workflow)
+ second_clone = handler._ensure_published_for_run(workflow)
+
+ assert published_workflow.state == WorkflowState.LIVE
+ assert clone.id != published_workflow.id
+ assert clone.state == WorkflowState.TEST_CLONE
+ assert clone.automation.published_from == workflow
+ assert second_clone.id == clone.id
+
+ handler.update_workflow(workflow, state=WorkflowState.PAUSED)
+ published_workflow.refresh_from_db()
+
+ clone_after = handler._ensure_published_for_run(workflow)
+ second_clone_after = handler._ensure_published_for_run(workflow)
+
+ assert published_workflow.state == WorkflowState.PAUSED
+ assert clone_after.id == clone.id
+ assert clone_after.id != published_workflow.id
+ assert clone_after.state == WorkflowState.TEST_CLONE
+ assert clone_after.automation.published_from == workflow
+ assert second_clone_after.id == clone_after.id
+
+
+@pytest.mark.django_db
+def test_publish_preserves_old_live_automation_with_history(data_fixture):
+ """
+ When re-publishing, old live automations that have history entries
+ pointing at them should not be deleted.
+ """
+
+ workflow = data_fixture.create_automation_workflow()
+ handler = AutomationWorkflowHandler()
+
+ published = handler.publish(workflow)
+ published_automation = published.automation
+
+ # Create a history entry pointing to the first published automation
+ data_fixture.create_automation_workflow_history(
+ original_workflow=workflow,
+ workflow=published,
+ status=HistoryStatusChoices.SUCCESS,
+ )
+
+ # Publish twice. Normally, this would deleted the oldest published entry.
+ handler.publish(workflow)
+ handler.publish(workflow)
+
+ # The first automation shouldn't be deleted because it has history entries
+ assert Automation.objects.filter(id=published_automation.id).exists()
+
+
+@override_settings(
+ AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES=100,
+ AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS=1,
+)
+@pytest.mark.django_db
+def test_clear_old_history_deletes_orphaned_automations(data_fixture):
+ """
+ When history entries are cleaned up, any published automations
+ that no longer have history entries pointing at them should be deleted.
+ """
+
+ workflow = data_fixture.create_automation_workflow()
+ handler = AutomationWorkflowHandler()
+
+ with freeze_time("2026-04-20 11:00:00"):
+ cloned_workflow = handler._ensure_published_for_run(workflow)
+
+ clone_automation_id = cloned_workflow.automation_id
+
+ handler.publish(workflow)
+
+ with freeze_time("2026-04-20 12:00:00"):
+ data_fixture.create_automation_workflow_history(
+ original_workflow=workflow,
+ workflow=cloned_workflow,
+ status=HistoryStatusChoices.SUCCESS,
+ )
+
+ # 12 hours later but within 1 day, so history survives
+ with freeze_time("2026-04-21 00:00:00"):
+ handler._clear_old_history(workflow)
+
+ assert Automation.objects.filter(id=clone_automation_id).exists()
+
+ # 2 days later, so history should have been deleted, and the cloned
+ # automation should be pruned as well.
+ with freeze_time("2026-04-22 12:00:00"):
+ handler._clear_old_history(workflow)
+
+ assert not Automation.objects.filter(id=clone_automation_id).exists()
+
+
+@pytest.mark.django_db
+def test_clear_old_history_keeps_live_published_automation_when_newer_test_clone_exists(
+ data_fixture,
+):
+ workflow = data_fixture.create_automation_workflow()
+ handler = AutomationWorkflowHandler()
+
+ with freeze_time("2026-04-27 12:00:00"):
+ published_workflow = handler.publish(workflow)
+ test_clone_workflow = handler._ensure_published_for_run(workflow)
+
+ assert test_clone_workflow.automation_id != published_workflow.automation_id
+ assert test_clone_workflow.automation_id > published_workflow.automation_id
+
+ with freeze_time("2026-04-27 12:01:00"):
+ handler._clear_old_history(workflow)
+
+ assert Automation.objects.filter(id=published_workflow.automation_id).exists()
+ assert not Automation.objects.filter(id=test_clone_workflow.automation_id).exists()
+
+
+@pytest.mark.django_db
+@patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task")
+def test_async_start_workflow_test_run_creates_test_clone(
+ mock_start_workflow_celery_task, data_fixture, django_capture_on_commit_callbacks
+):
+ """
+ When async_start_workflow is called, it should call the celery task
+ using a history that is based on a cloned workflow, not the draft.
+ """
+
+ workflow = data_fixture.create_automation_workflow()
+
+ with django_capture_on_commit_callbacks(execute=True):
+ AutomationWorkflowHandler().async_start_workflow(workflow)
+
+ history = workflow.workflow_histories.get()
+
+ # History's workflow should be a clone, not the draft
+ assert history.original_workflow == workflow
+ assert history.is_test_run is True
+ assert history.workflow_id != workflow.id
+ assert history.workflow.automation.published_from == workflow
+ assert history.workflow.state == WorkflowState.TEST_CLONE
+
+ mock_start_workflow_celery_task.delay.assert_called_once_with(
+ history.workflow_id, history.id
+ )
diff --git a/backend/tests/baserow/contrib/integrations/slack/test_slack_write_message_service_type.py b/backend/tests/baserow/contrib/integrations/slack/test_slack_write_message_service_type.py
index 22041deb6b..0f8c414e20 100644
--- a/backend/tests/baserow/contrib/integrations/slack/test_slack_write_message_service_type.py
+++ b/backend/tests/baserow/contrib/integrations/slack/test_slack_write_message_service_type.py
@@ -142,6 +142,7 @@ def test_dispatch_slack_write_message_with_formulas(data_fixture):
application = data_fixture.create_automation_application(user=user)
workflow = data_fixture.create_automation_workflow(automation=application)
workflow_history = AutomationHistoryHandler().create_workflow_history(
+ workflow,
workflow,
timezone.now(),
False,
diff --git a/changelog/entries/unreleased/bug/stop_infinite_dispatchdatasources_refetch_loop_in_page_edito.json b/changelog/entries/unreleased/bug/stop_infinite_dispatchdatasources_refetch_loop_in_page_edito.json
new file mode 100644
index 0000000000..82991e86f6
--- /dev/null
+++ b/changelog/entries/unreleased/bug/stop_infinite_dispatchdatasources_refetch_loop_in_page_edito.json
@@ -0,0 +1,9 @@
+{
+ "type": "bug",
+ "message": "stop infinite `/dispatch-data-sources/` refetch loop in page editor",
+ "issue_origin": "github",
+ "issue_number": null,
+ "domain": "builder",
+ "bullet_points": [],
+ "created_at": "2026-04-22"
+}
\ No newline at end of file
diff --git a/web-frontend/modules/automation/components/workflow/sidePanels/HistorySection.vue b/web-frontend/modules/automation/components/workflow/sidePanels/HistorySection.vue
deleted file mode 100644
index 113b645880..0000000000
--- a/web-frontend/modules/automation/components/workflow/sidePanels/HistorySection.vue
+++ /dev/null
@@ -1,117 +0,0 @@
-
-
+
+ {{ historyTitlePrefix }}{{ statusTitle }}
+
+
+ {{ humanCompletedDate }}
+
+