From a52550dd34a581ad098e0aa8029330f3470bd953 Mon Sep 17 00:00:00 2001 From: Tsering Paljor Date: Mon, 27 Apr 2026 11:41:11 +0700 Subject: [PATCH 1/2] feat: Automation Node history (#4787) * WIP - node history * Add migrations * Fix test * Re-generate migrations * Node history should show result, not payload. * Ensure nested node history scrolls * get_previous_positions() may return None * Fix get_iteration() to use the new iteration_path * The scrollbar should be lighter, hide the background container. * Only show expandable if the node history is completed * Optimize node history endpoint * Return the node type and label from the backend, so that we don't have to lookup published node which the frontend doesn't know about * Use history status icons from design spec * Remove debugging code * Display workflow run error * Don't expand history if workflow is in progress * Find the most specific error * Remove old HistorySection and unused translations * Remove old _NodeHistory.vue, replaced by NodeHistory.vue * Ensure node type label is truncated * Align node type header icon * Ensure Node Explorer's content name is truncated * Add router label to node type, ensure max-width for label * Make sure header expand icon is centered * Truncate error preview * Ensure node result is created when there is an error. This is needed to show the error in the node history. * Lint fix * Ensure parent nodes show error indicator (red text) if child has error. * Clean up styles * Clean up * Handle case when router node has error * Don't render Context or Modal if not final node * Document success_count and fail_count in the API response * When workflow times out, ensure child node histories are also updated. * Remove unused css definitions * Use computed props * Add test to check query count for workflow history * Update test to ensure node history is updated when workflow times out * use a single timezone.now() instance * getNodeType should be a computed prop * getHistoryIconPath should be a computed prop * Add test for node history in api response * Add test for success/fail counts * Add assertion to existing tests to ensure node result is created on workflow error * Update the workflow when a node is modified * Add original_workflow to AutomationWorkflowHistory model, migration, and backfill original_workflow * Delete any published automations that don't have history. * Create a snapshot of the history via publish flow. * Add id to automationnodehistory ordering. Update migrations. * Fix tests * Ensure sample data is saved from snapshot to the draft workflow. * Fix test * Add tests for snapshots * Remove unnecessary guard. * Handle list vs flat result data. * Ensure workflow is updated for snapshot when we CRUD a node. * Make the edge detection more generic. * Add new history_clone workflow state choice * Refactor common logic to new _clone_workflow(). * Simplify async_start_workflow() * Exclude history clones from being deleted during publishing. * Ensure active_published only filters for live workflows * Refactor existing tests + write new ones * Fix test * Add real-time for dispatch start/end, and update history in frontend. * Increase size of the triple-dot link that reveals the Show result button. * Ensure original_workflow is ZDM compatible * Squash local migrations * Simplify * Add spinner when history is loading. * Improve clean_up_previously_published_automations() logic * Use global cache instead of relying on updated_on for cloned workflow. * Rename history clone to test clone. * Create clone only for test, not for simulation. * Update docstring/comments. * Move clone creation to toggle_test_run() * Post-migration fixes * Ensure async_start_workflow() always receives a draf or live workflow. For tests, it should get/create a clone. * Fix state confusion * Add filter to prevent deleting newly created test clone. * Fix test --------- Co-authored-by: Jeremie Pardou <571533+jrmi@users.noreply.github.com> --- .../automation/api/workflows/serializers.py | 83 ++++- .../contrib/automation/api/workflows/views.py | 29 +- .../contrib/automation/history/handler.py | 17 +- .../contrib/automation/history/models.py | 10 +- ...kflowhistory_original_workflow_and_more.py | 49 +++ .../src/baserow/contrib/automation/models.py | 10 +- .../contrib/automation/nodes/handler.py | 20 +- .../contrib/automation/nodes/service.py | 14 + .../contrib/automation/workflows/constants.py | 4 + .../contrib/automation/workflows/handler.py | 241 +++++++++--- .../contrib/automation/workflows/signals.py | 2 + .../contrib/automation/workflows/tasks.py | 10 +- .../automation/workflows/ws/signals.py | 40 ++ .../test_utils/fixtures/automation_history.py | 1 + .../fixtures/automation_workflow_history.py | 42 ++- .../api/workflows/test_workflow_views.py | 159 ++++++++ .../test_data_provider_types.py | 3 + .../history/test_history_handler.py | 3 + .../contrib/automation/history/utils.py | 4 +- .../nodes/test_node_dispatch_async.py | 89 +++++ .../automation/nodes/test_node_service.py | 90 +++++ .../workflows/test_workflow_handler.py | 352 ++++++++++++++++-- .../test_slack_write_message_service_type.py | 1 + .../workflow/sidePanels/HistorySection.vue | 117 ------ .../workflow/sidePanels/HistorySidePanel.vue | 56 ++- .../workflow/sidePanels/NodeHistory.vue | 335 +++++++++++++++++ .../workflow/sidePanels/WorkflowHistory.vue | 218 +++++++++++ .../modules/automation/locales/en.json | 13 +- web-frontend/modules/automation/nodeTypes.js | 2 +- web-frontend/modules/automation/realtime.js | 24 ++ .../core/assets/images/history-disabled.svg | 5 + .../core/assets/images/history-failed.svg | 5 + .../core/assets/images/history-success.svg | 4 + .../components/automation/workflow/all.scss | 3 +- .../automation/workflow/history_section.scss | 27 -- .../workflow/history_side_panel.scss | 37 ++ .../automation/workflow/node_history.scss | 106 ++++++ .../automation/workflow/workflow_history.scss | 41 ++ .../node_explorer/node_explorer_content.scss | 2 + .../services/CoreRouterServiceForm.vue | 2 +- 40 files changed, 2015 insertions(+), 255 deletions(-) create mode 100644 backend/src/baserow/contrib/automation/migrations/0028_automationworkflowhistory_original_workflow_and_more.py delete mode 100644 web-frontend/modules/automation/components/workflow/sidePanels/HistorySection.vue create mode 100644 web-frontend/modules/automation/components/workflow/sidePanels/NodeHistory.vue create mode 100644 web-frontend/modules/automation/components/workflow/sidePanels/WorkflowHistory.vue create mode 100644 web-frontend/modules/core/assets/images/history-disabled.svg create mode 100644 web-frontend/modules/core/assets/images/history-failed.svg create mode 100644 web-frontend/modules/core/assets/images/history-success.svg delete mode 100644 web-frontend/modules/core/assets/scss/components/automation/workflow/history_section.scss create mode 100644 web-frontend/modules/core/assets/scss/components/automation/workflow/node_history.scss create mode 100644 web-frontend/modules/core/assets/scss/components/automation/workflow/workflow_history.scss diff --git a/backend/src/baserow/contrib/automation/api/workflows/serializers.py b/backend/src/baserow/contrib/automation/api/workflows/serializers.py index 4707515688..5f3249a869 100644 --- a/backend/src/baserow/contrib/automation/api/workflows/serializers.py +++ b/backend/src/baserow/contrib/automation/api/workflows/serializers.py @@ -2,7 +2,10 @@ from drf_spectacular.utils import extend_schema_field from rest_framework import serializers +from baserow.api.pagination import PageNumberPagination from baserow.contrib.automation.models import ( + AutomationHistory, + AutomationNodeHistory, AutomationWorkflow, AutomationWorkflowHistory, ) @@ -103,14 +106,88 @@ class OrderAutomationWorkflowsSerializer(serializers.Serializer): ) -class AutomationWorkflowHistorySerializer(serializers.ModelSerializer): +class AutomationHistorySerializer(serializers.ModelSerializer): class Meta: - model = AutomationWorkflowHistory + model = AutomationHistory fields = ( "id", "started_on", "completed_on", - "is_test_run", "message", "status", ) + + +class AutomationNodeHistorySerializer(AutomationHistorySerializer): + parent_node_id = serializers.SerializerMethodField() + iteration = serializers.SerializerMethodField() + result = serializers.SerializerMethodField() + node_type = serializers.SerializerMethodField() + node_label = serializers.SerializerMethodField() + + class Meta: + model = AutomationNodeHistory + fields = AutomationHistorySerializer.Meta.fields + ( + "workflow_history", + "node", + "node_type", + "node_label", + "parent_node_id", + "iteration", + "result", + ) + + def _get_first_node_result(self, obj): + results = obj.node_results.all() + return results[0] if results else None + + @extend_schema_field(OpenApiTypes.STR) + def get_node_type(self, obj): + return obj.node.get_type().type + + @extend_schema_field(OpenApiTypes.STR) + def get_node_label(self, obj): + return obj.node.label + + @extend_schema_field(OpenApiTypes.INT) + def get_parent_node_id(self, obj): + parent_nodes = obj.node.get_parent_nodes() + if not parent_nodes: + return None + return parent_nodes[-1].id + + @extend_schema_field(OpenApiTypes.INT) + def get_iteration(self, obj): + result = self._get_first_node_result(obj) + if result is None: + return None + + if result.iteration_path: + return int(result.iteration_path.rsplit(".", 1)[-1]) + + return 0 + + def get_result(self, obj): + result = self._get_first_node_result(obj) + return result.result if result else {} + + +class AutomationWorkflowHistorySerializer(AutomationHistorySerializer): + node_histories = AutomationNodeHistorySerializer(read_only=True, many=True) + + class Meta: + model = AutomationWorkflowHistory + fields = AutomationHistorySerializer.Meta.fields + ( + "is_test_run", + "event_payload", + "simulate_until_node", + "node_histories", + ) + + +class AutomationWorkflowHistoryPagination(PageNumberPagination): + def get_paginated_response(self, data, *, success_count: int, fail_count: int): + response = super().get_paginated_response(data) + response.data["success_count"] = success_count + response.data["fail_count"] = fail_count + return response diff --git a/backend/src/baserow/contrib/automation/api/workflows/views.py b/backend/src/baserow/contrib/automation/api/workflows/views.py index cc6201526b..696469e624 100644 --- a/backend/src/baserow/contrib/automation/api/workflows/views.py +++ b/backend/src/baserow/contrib/automation/api/workflows/views.py @@ -2,11 +2,13 @@ from django.conf import settings from django.db import transaction +from django.db.models import Count, Q from drf_spectacular.types import OpenApiTypes from drf_spectacular.utils import OpenApiParameter, extend_schema from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response +from rest_framework.serializers import IntegerField from rest_framework.status import HTTP_202_ACCEPTED from rest_framework.views import APIView @@ -14,7 +16,6 @@ from baserow.api.decorators import map_exceptions, validate_body from baserow.api.jobs.errors import ERROR_MAX_JOB_COUNT_EXCEEDED from baserow.api.jobs.serializers import JobSerializer -from baserow.api.pagination import PageNumberPagination from baserow.api.schemas import CLIENT_SESSION_ID_SCHEMA_PARAMETER, get_error_schema from baserow.api.serializers import get_example_pagination_serializer_class from baserow.contrib.automation.api.workflows.errors import ( @@ -23,12 +24,14 @@ ERROR_AUTOMATION_WORKFLOW_NOTIFICATION_RECIPIENTS_INVALID, ) from baserow.contrib.automation.api.workflows.serializers import ( + AutomationWorkflowHistoryPagination, AutomationWorkflowHistorySerializer, AutomationWorkflowSerializer, CreateAutomationWorkflowSerializer, OrderAutomationWorkflowsSerializer, UpdateAutomationWorkflowSerializer, ) +from baserow.contrib.automation.history.constants import HistoryStatusChoices from baserow.contrib.automation.history.service import AutomationHistoryService from baserow.contrib.automation.workflows.actions import ( CreateAutomationWorkflowActionType, @@ -231,7 +234,15 @@ class AutomationWorkflowHistoryView(APIView): description="Retrieve the history for a workflow.", responses={ 200: get_example_pagination_serializer_class( - AutomationWorkflowHistorySerializer + AutomationWorkflowHistorySerializer, + additional_fields={ + "success_count": IntegerField( + help_text="The total number of successful workflow runs." + ), + "fail_count": IntegerField( + help_text="The total number of failed workflow runs." + ), + }, ), 404: get_error_schema( [ @@ -251,16 +262,26 @@ def get(self, request, workflow_id: int): request.user, workflow_id ) - paginator = PageNumberPagination( + counts = queryset.aggregate( + success_count=Count("id", filter=Q(status=HistoryStatusChoices.SUCCESS)), + fail_count=Count("id", filter=Q(status=HistoryStatusChoices.ERROR)), + ) + + paginator = AutomationWorkflowHistoryPagination( limit_page_size=settings.AUTOMATION_HISTORY_PAGE_SIZE_LIMIT ) + page = paginator.paginate_queryset(queryset, request, self) serializer = AutomationWorkflowHistorySerializer( page, many=True, ) - return paginator.get_paginated_response(serializer.data) + return paginator.get_paginated_response( + serializer.data, + success_count=counts["success_count"], + fail_count=counts["fail_count"], + ) class OrderAutomationWorkflowsView(APIView): diff --git a/backend/src/baserow/contrib/automation/history/handler.py b/backend/src/baserow/contrib/automation/history/handler.py index dac591f612..ad59be4cf7 100644 --- a/backend/src/baserow/contrib/automation/history/handler.py +++ b/backend/src/baserow/contrib/automation/history/handler.py @@ -1,7 +1,7 @@ from datetime import datetime from typing import Dict, List, Optional, Union -from django.db.models import QuerySet +from django.db.models import Prefetch, QuerySet from baserow.contrib.automation.history.constants import HistoryStatusChoices from baserow.contrib.automation.history.exceptions import ( @@ -31,9 +31,18 @@ def get_workflow_histories( base_queryset = AutomationWorkflowHistory.objects.all() return base_queryset.filter( - workflow=workflow, + original_workflow=workflow, simulate_until_node__isnull=True, - ).prefetch_related("workflow__automation__workspace") + ).prefetch_related( + Prefetch( + "node_histories", + queryset=AutomationNodeHistory.objects.select_related( + "node", "node__workflow" + ) + .prefetch_related("node_results") + .order_by("started_on"), + ), + ) def get_workflow_history( self, history_id: int, base_queryset: Optional[QuerySet] = None @@ -60,6 +69,7 @@ def get_workflow_history( def create_workflow_history( self, + original_workflow: AutomationWorkflow, workflow: AutomationWorkflow, started_on: datetime, is_test_run: bool, @@ -73,6 +83,7 @@ def create_workflow_history( return AutomationWorkflowHistory.objects.create( workflow=workflow, + original_workflow=original_workflow, started_on=started_on, is_test_run=is_test_run, simulate_until_node=simulate_until_node, diff --git a/backend/src/baserow/contrib/automation/history/models.py b/backend/src/baserow/contrib/automation/history/models.py index ed8d8d65f7..67514ef6ef 100644 --- a/backend/src/baserow/contrib/automation/history/models.py +++ b/backend/src/baserow/contrib/automation/history/models.py @@ -20,10 +20,18 @@ class Meta: class AutomationWorkflowHistory(AutomationHistory): - workflow = models.ForeignKey( + original_workflow = models.ForeignKey( "automation.AutomationWorkflow", on_delete=models.CASCADE, related_name="workflow_histories", + # TODO ZDM: Make non-nullable after next release and add backfill + # migration. See: https://github.com/baserow/baserow/issues/5236 + null=True, + ) + workflow = models.ForeignKey( + "automation.AutomationWorkflow", + on_delete=models.CASCADE, + related_name="cloned_workflow_histories", ) simulate_until_node = models.ForeignKey( "automation.AutomationNode", diff --git a/backend/src/baserow/contrib/automation/migrations/0028_automationworkflowhistory_original_workflow_and_more.py b/backend/src/baserow/contrib/automation/migrations/0028_automationworkflowhistory_original_workflow_and_more.py new file mode 100644 index 0000000000..d6743065eb --- /dev/null +++ b/backend/src/baserow/contrib/automation/migrations/0028_automationworkflowhistory_original_workflow_and_more.py @@ -0,0 +1,49 @@ +import django.db.models.deletion +from django.db import migrations, models + + +def backfill_original_workflow(apps, schema_editor): + AutomationWorkflowHistory = apps.get_model( + "automation", "AutomationWorkflowHistory" + ) + AutomationWorkflowHistory.objects.filter(original_workflow__isnull=True).update( + original_workflow_id=models.F("workflow_id") + ) + + +class Migration(migrations.Migration): + + dependencies = [ + ("automation", "0027_alter_automationnodehistory_options_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="automationworkflowhistory", + name="original_workflow", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="workflow_histories", + to="automation.automationworkflow", + ), + ), + migrations.RunPython( + backfill_original_workflow, + reverse_code=migrations.RunPython.noop, + ), + migrations.AlterField( + model_name="automationworkflowhistory", + name="workflow", + field=models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="cloned_workflow_histories", + to="automation.automationworkflow", + ), + ), + migrations.AlterField( + model_name='automationworkflow', + name='state', + field=models.CharField(choices=[('draft', 'Draft'), ('live', 'Live'), ('paused', 'Paused'), ('disabled', 'Disabled'), ('test_clone', 'Test Clone')], db_default='draft', default='draft', max_length=20), + ), + ] diff --git a/backend/src/baserow/contrib/automation/models.py b/backend/src/baserow/contrib/automation/models.py index 2c241f89a4..ed94950a4f 100644 --- a/backend/src/baserow/contrib/automation/models.py +++ b/backend/src/baserow/contrib/automation/models.py @@ -1,6 +1,11 @@ from django.db import models -from baserow.contrib.automation.history.models import AutomationWorkflowHistory +from baserow.contrib.automation.history.models import ( + AutomationHistory, + AutomationNodeHistory, + AutomationNodeResult, + AutomationWorkflowHistory, +) from baserow.contrib.automation.workflows.models import ( AutomationWorkflow, DuplicateAutomationWorkflowJob, @@ -11,7 +16,10 @@ "Automation", "AutomationWorkflow", "DuplicateAutomationWorkflowJob", + "AutomationHistory", "AutomationWorkflowHistory", + "AutomationNodeHistory", + "AutomationNodeResult", ] diff --git a/backend/src/baserow/contrib/automation/nodes/handler.py b/backend/src/baserow/contrib/automation/nodes/handler.py index dca96d342f..3dd90b36b8 100644 --- a/backend/src/baserow/contrib/automation/nodes/handler.py +++ b/backend/src/baserow/contrib/automation/nodes/handler.py @@ -367,6 +367,7 @@ def import_node_only( def _handle_workflow_error( self, node_history: AutomationNodeHistory, + iteration_path: str, error: str, ) -> None: now = timezone.now() @@ -380,6 +381,11 @@ def _handle_workflow_error( node_history.status = HistoryStatusChoices.ERROR node_history.save() + AutomationHistoryHandler().create_node_result( + node_history=node_history, + iteration_path=iteration_path, + ) + def _handle_simulation_notify( self, simulate_until_node: AutomationNode | None, node: AutomationNode ) -> bool: @@ -392,9 +398,11 @@ def _handle_simulation_notify( """ if simulate_until_node and simulate_until_node.id == node.id: - node.service.specific.refresh_from_db(fields=["sample_data"]) + service = node.service.specific + service.refresh_from_db(fields=["sample_data"]) automation_node_updated.send(self, user=None, node=node) return True + return False def dispatch_node( @@ -469,14 +477,14 @@ def dispatch_node( simulate_until_node=workflow_history.simulate_until_node, current_iterations=current_iterations, ) - + iteration_path = dispatch_context.get_iteration_path(node) node_type: Type[AutomationNodeActionNodeType] = node.get_type() try: dispatch_result = node_type.dispatch(node, dispatch_context) except ServiceImproperlyConfiguredDispatchException as e: error = f"The node {node.id} is misconfigured and cannot be dispatched. {str(e)}" - self._handle_workflow_error(node_history, error) + self._handle_workflow_error(node_history, iteration_path, error) self._handle_simulation_notify(simulate_until_node, node) return None except UnexpectedDispatchException as e: @@ -485,7 +493,7 @@ def dispatch_node( f"Error while running workflow {original_workflow.id}. Error: {str(e)}" ) logger.warning(error) - self._handle_workflow_error(node_history, error) + self._handle_workflow_error(node_history, iteration_path, error) self._handle_simulation_notify(simulate_until_node, node) return None except Exception as e: @@ -496,7 +504,7 @@ def dispatch_node( f"Error: {str(e)}" ) logger.exception(error) - self._handle_workflow_error(node_history, error) + self._handle_workflow_error(node_history, iteration_path, error) self._handle_simulation_notify(simulate_until_node, node) return None @@ -508,7 +516,7 @@ def dispatch_node( history_handler.create_node_result( node_history=node_history, result=dispatch_result.data, - iteration_path=dispatch_context.get_iteration_path(node), + iteration_path=iteration_path, ) to_chain = [] diff --git a/backend/src/baserow/contrib/automation/nodes/service.py b/backend/src/baserow/contrib/automation/nodes/service.py index 6fa3a5493c..bad1be30cb 100644 --- a/backend/src/baserow/contrib/automation/nodes/service.py +++ b/backend/src/baserow/contrib/automation/nodes/service.py @@ -34,7 +34,9 @@ ReplacedAutomationNode, UpdatedAutomationNode, ) +from baserow.contrib.automation.workflows.constants import WORKFLOW_DIRTY_CACHE_KEY from baserow.contrib.automation.workflows.signals import automation_workflow_updated +from baserow.core.cache import global_cache from baserow.core.handler import CoreHandler from baserow.core.integrations.handler import IntegrationHandler from baserow.core.trash.handler import TrashHandler @@ -196,6 +198,9 @@ def create_node( workflow.get_graph().insert(new_node, reference_node, position, output) + cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(workflow.id) + global_cache.update(cache_key, lambda _: True) + automation_node_created.send( self, node=new_node, @@ -245,6 +250,9 @@ def update_node( # Now export the 'new' node values, since everything has been updated. new_node_values = node_type.export_prepared_values(node) + cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(node.workflow.id) + global_cache.update(cache_key, lambda _: True) + automation_node_updated.send(self, user=user, node=updated_node) return UpdatedAutomationNode( @@ -285,6 +293,9 @@ def delete_node( node, ) + cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(workflow.id) + global_cache.update(cache_key, lambda _: True) + automation_node_deleted.send( self, workflow=workflow, @@ -325,6 +336,9 @@ def duplicate_node( workflow.get_graph().insert(duplicated_node, source_node, "south", "") + cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(workflow.id) + global_cache.update(cache_key, lambda _: True) + automation_node_created.send( self, node=duplicated_node, diff --git a/backend/src/baserow/contrib/automation/workflows/constants.py b/backend/src/baserow/contrib/automation/workflows/constants.py index 8d3ce14714..e05ae9fbc6 100644 --- a/backend/src/baserow/contrib/automation/workflows/constants.py +++ b/backend/src/baserow/contrib/automation/workflows/constants.py @@ -9,3 +9,7 @@ class WorkflowState(models.TextChoices): LIVE = "live" PAUSED = "paused" DISABLED = "disabled" + TEST_CLONE = "test_clone" + + +WORKFLOW_DIRTY_CACHE_KEY = "wa_workflow_dirty_{}" diff --git a/backend/src/baserow/contrib/automation/workflows/handler.py b/backend/src/baserow/contrib/automation/workflows/handler.py index b06383e4f7..c5af5f0b47 100644 --- a/backend/src/baserow/contrib/automation/workflows/handler.py +++ b/backend/src/baserow/contrib/automation/workflows/handler.py @@ -1,6 +1,6 @@ from collections import defaultdict from datetime import timedelta -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple from zipfile import ZipFile from django.conf import settings @@ -22,7 +22,10 @@ ) from baserow.contrib.automation.history.constants import HistoryStatusChoices from baserow.contrib.automation.history.handler import AutomationHistoryHandler -from baserow.contrib.automation.history.models import AutomationWorkflowHistory +from baserow.contrib.automation.history.models import ( + AutomationNodeHistory, + AutomationWorkflowHistory, +) from baserow.contrib.automation.models import Automation from baserow.contrib.automation.nodes.handler import AutomationNodeHandler from baserow.contrib.automation.nodes.models import AutomationNode @@ -32,6 +35,7 @@ from baserow.contrib.automation.types import AutomationWorkflowDict from baserow.contrib.automation.workflows.constants import ( ALLOW_TEST_RUN_MINUTES, + WORKFLOW_DIRTY_CACHE_KEY, WorkflowState, ) from baserow.contrib.automation.workflows.exceptions import ( @@ -42,7 +46,10 @@ AutomationWorkflowTooManyErrors, ) from baserow.contrib.automation.workflows.models import AutomationWorkflow -from baserow.contrib.automation.workflows.signals import automation_workflow_updated +from baserow.contrib.automation.workflows.signals import ( + automation_workflow_dispatch_started, + automation_workflow_updated, +) from baserow.contrib.automation.workflows.tasks import ( handle_workflow_dispatch_done, start_workflow_celery_task, @@ -125,8 +132,13 @@ def get_published_workflow( def _get_published_workflow( workflow: AutomationWorkflow, ) -> Optional[AutomationWorkflow]: - latest_published = workflow.published_to.order_by("-id").first() - return latest_published.workflows.first() if latest_published else None + latest_published = ( + AutomationWorkflow.objects.exclude(state=WorkflowState.TEST_CLONE) + .filter(automation__published_from=workflow) + .order_by("-automation__id") + .first() + ) + return latest_published if with_cache: return local_cache.get( @@ -143,6 +155,7 @@ def _invalidate_workflow_caches(self, workflow: AutomationWorkflow) -> None: global_cache.invalidate( self._get_workflow_history_rate_limit_cache_key(original_workflow) ) + global_cache.invalidate(WORKFLOW_DIRTY_CACHE_KEY.format(original_workflow.id)) def get_workflows( self, automation: Automation, base_queryset: Optional[QuerySet] = None @@ -609,39 +622,37 @@ def clean_up_previously_published_automations( if len(published_automations) > 1: # Delete all but the last published automation - ids_to_delete = [a.id for a in published_automations[:-1]] - Automation.objects.filter(id__in=ids_to_delete).delete() - - # Disable the last published workflow - if published_workflow := published_automations[-1].workflows.first(): - published_workflow.state = WorkflowState.DISABLED - published_workflow.save(update_fields=["state"]) - - def publish( + if ids_to_delete := [ + a.id + for a in published_automations[:-1] + # Exclude any automations that have any history entries + if not AutomationWorkflowHistory.objects.filter( + workflow__automation=a + ).exists() + ]: + Automation.objects.filter(id__in=ids_to_delete).delete() + + # Disable any live workflows + AutomationWorkflow.objects.filter( + automation__published_from=workflow, + state=WorkflowState.LIVE, + ).update(state=WorkflowState.DISABLED) + + def _clone_workflow( self, workflow: AutomationWorkflow, - progress: Optional[Progress] = None, - ) -> AutomationWorkflow: + state: WorkflowState | None = None, + progress: Progress | None = None, + ) -> Tuple[Automation, Dict]: """ - Publishes an Automation and a specific workflow. If the automation was - already published, the previous versions are deleted and a new one - is created. - - When an automation is published, a clone of the current version is - created to avoid further modifications to the original automation - which could affect the published version. + Creates a clone of the workflow's automation via the export/import process. - :param workflow: The workflow to be published. - :param progress: An object to track the publishing progress. - :return: The published workflow. + :param workflow: The workflow to clone. + :param state: An optional WorkflowState to assign; defaults to draft. + :param progress: An optional progress tracker. + :return: The cloned Automation and the id_mapping dict. """ - # Make sure we are the only process to update the automation workflow - # to prevent race conditions. - workflow = self.get_workflow(workflow.id, for_update=True) - - self.clean_up_previously_published_automations(workflow) - import_export_config = ImportExportConfig( include_permission_data=True, reduce_disk_space_usage=False, @@ -659,8 +670,8 @@ def publish( workflows=[workflow], ) - # Manually set the published status for the newly created workflow. - exported_automation["workflows"][0]["state"] = WorkflowState.LIVE + if state: + exported_automation["workflows"][0]["state"] = state progress_builder = None if progress: @@ -669,7 +680,7 @@ def publish( id_mapping = {"import_workspace_id": workflow.automation.workspace.id} - duplicate_automation = application_type.import_serialized( + cloned_automation = application_type.import_serialized( None, exported_automation, import_export_config, @@ -679,12 +690,42 @@ def publish( progress_builder=progress_builder, ) - duplicate_automation.published_from = workflow - duplicate_automation.save(update_fields=["published_from"]) + cloned_automation.published_from = workflow + cloned_automation.save(update_fields=["published_from"]) + + return cloned_automation, id_mapping + + def publish( + self, + workflow: AutomationWorkflow, + progress: Optional[Progress] = None, + ) -> AutomationWorkflow: + """ + Publishes an Automation and a specific workflow. If the automation was + already published, the previous versions are deleted and a new one + is created. + + When an automation is published, a clone of the current version is + created to avoid further modifications to the original automation + which could affect the published version. + + :param workflow: The workflow to be published. + :param progress: An object to track the publishing progress. + :return: The published workflow. + """ + + # Make sure we are the only process to update the automation workflow + # to prevent race conditions. + workflow = self.get_workflow(workflow.id, for_update=True) + self.clean_up_previously_published_automations(workflow) + + cloned_automation, id_mapping = self._clone_workflow( + workflow, WorkflowState.LIVE, progress + ) self._invalidate_workflow_caches(workflow) - return duplicate_automation.workflows.first() + return cloned_automation.workflows.first() def disable_workflow(self, workflow: AutomationWorkflow) -> None: """ @@ -761,7 +802,9 @@ def reset_workflow_temporary_states(self, workflow): automation_workflow_updated.send(self, user=None, workflow=workflow) def toggle_test_run( - self, workflow: AutomationWorkflow, simulate_until_node: bool = None + self, + workflow: AutomationWorkflow, + simulate_until_node: AutomationNode | None, ): """ Trigger a test run if none is in progress or cancel the planned run. If the @@ -785,7 +828,6 @@ def toggle_test_run( # If the service related to the trigger can immediately be tested # we immediately trigger the workflow run self.async_start_workflow(workflow) - else: AutomationWorkflowHandler().set_workflow_temporary_states( workflow, simulate_until_node=simulate_until_node @@ -818,6 +860,9 @@ def _clear_old_history(self, original_workflow: AutomationWorkflow) -> None: It will delete any history entries that are older than MAX_HISTORY_DAYS and only keep the most recent MAX_HISTORY_ENTRIES entries. + + TODO: refactor this once https://github.com/baserow/baserow/pull/5166 + is merged in. """ oldest_history_date = timezone.now() - timedelta( @@ -836,6 +881,28 @@ def _clear_old_history(self, original_workflow: AutomationWorkflow) -> None: status=HistoryStatusChoices.STARTED ).exclude(id__in=history_ids_to_keep).delete() + # Clean up published automations that no longer have any history entries + active_published = self.get_published_workflow( + original_workflow, with_cache=False + ) + empty_published = ( + Automation.objects.filter( + published_from=original_workflow, + ) + .exclude(workflows__cloned_workflow_histories__isnull=False) + # _ensure_published_for_run() is called to potentially create a + # test clone just before calling before_run(). We need to + # ensure the newly created clone isn't deleted prematurely. + .exclude( + workflows__state=WorkflowState.TEST_CLONE, + created_on__gte=timezone.now() - timedelta(seconds=5), + ) + ) + if active_published: + empty_published = empty_published.exclude(id=active_published.automation_id) + + empty_published.delete() + def _mark_failure_for_timed_out_history( self, original_workflow: AutomationWorkflow ) -> None: @@ -844,16 +911,74 @@ def _mark_failure_for_timed_out_history( is marked as failed. """ - max_history_date = timezone.now() - timedelta( + now = timezone.now() + max_history_date = now - timedelta( hours=settings.AUTOMATION_WORKFLOW_TIMEOUT_HOURS ) - original_workflow.workflow_histories.filter( - status=HistoryStatusChoices.STARTED, started_on__lt=max_history_date + + error = "This workflow took too long and was timed out." + + workflow_history_ids = list( + original_workflow.workflow_histories.filter( + status=HistoryStatusChoices.STARTED, + started_on__lt=max_history_date, + ).values_list("id", flat=True) + ) + + if not workflow_history_ids: + return + + AutomationWorkflowHistory.objects.filter( + id__in=workflow_history_ids, + ).update( + status=HistoryStatusChoices.ERROR, + message=error, + completed_on=now, + ) + + AutomationNodeHistory.objects.filter( + workflow_history_id__in=workflow_history_ids, + status=HistoryStatusChoices.STARTED, ).update( status=HistoryStatusChoices.ERROR, - message="This workflow took too long and was timed out.", + message=error, + completed_on=now, ) + def _ensure_published_for_run( + self, workflow: AutomationWorkflow + ) -> AutomationWorkflow: + """ + Ensure an up-to-date cloned automation exists for test runs. + + The cloned workflow is used by the history, which decouples the + history's workflow from the draft or published workflow. + """ + + latest_clone = ( + AutomationWorkflow.objects.filter( + state=WorkflowState.TEST_CLONE, automation__published_from=workflow + ) + .order_by("-automation__id") + .first() + ) + + is_dirty_key = WORKFLOW_DIRTY_CACHE_KEY.format(workflow.id) + is_dirty = global_cache.get(is_dirty_key, default=False) + + # If the clone exists and the workflow hasn't been updated since, + # use that existing clone's workflow. + if not is_dirty and latest_clone: + cloned_workflow = latest_clone + else: + cloned_automation, _ = self._clone_workflow( + workflow, WorkflowState.TEST_CLONE + ) + cloned_workflow = cloned_automation.workflows.first() + global_cache.invalidate(is_dirty_key) + + return cloned_workflow + def _get_workflow_history_rate_limit_cache_key( self, original_workflow: AutomationWorkflow ) -> str: @@ -1009,14 +1134,21 @@ def async_start_workflow( history_status = HistoryStatusChoices.ERROR create_history_entry = True - original_workflow = workflow.get_original() - simulate_until_node = ( workflow.get_graph().get_node(workflow.simulate_until_node_id) if workflow.simulate_until_node_id else None ) + is_test_run = ( + workflow.is_original() or workflow.state == WorkflowState.TEST_CLONE + ) + + if workflow.is_original() and simulate_until_node is None: + workflow = self._ensure_published_for_run(workflow) + + original_workflow = workflow.get_original() + try: self.before_run(workflow) except AutomationWorkflowRateLimited as e: @@ -1056,8 +1188,9 @@ def async_start_workflow( now = timezone.now() AutomationHistoryHandler().create_workflow_history( - original_workflow, - is_test_run=original_workflow == workflow, + original_workflow=original_workflow, + workflow=workflow, + is_test_run=is_test_run, started_on=now, completed_on=now, message=error, @@ -1065,18 +1198,20 @@ def async_start_workflow( ) return - # If the currently running workflow is an unpublished workflow then we are - # testing it. - is_test_run = original_workflow == workflow - history = AutomationHistoryHandler().create_workflow_history( - original_workflow, + original_workflow=original_workflow, + workflow=workflow, started_on=timezone.now(), is_test_run=is_test_run, event_payload=event_payload, simulate_until_node=simulate_until_node, ) + automation_workflow_dispatch_started.send( + sender=None, + workflow_history=history, + ) + transaction.on_commit( lambda: start_workflow_celery_task.delay(workflow.id, history.id) ) diff --git a/backend/src/baserow/contrib/automation/workflows/signals.py b/backend/src/baserow/contrib/automation/workflows/signals.py index b6588ab3fe..232431ba45 100644 --- a/backend/src/baserow/contrib/automation/workflows/signals.py +++ b/backend/src/baserow/contrib/automation/workflows/signals.py @@ -5,3 +5,5 @@ automation_workflow_updated = Signal() automation_workflow_published = Signal() automation_workflows_reordered = Signal() +automation_workflow_dispatch_started = Signal() +automation_workflow_dispatch_done = Signal() diff --git a/backend/src/baserow/contrib/automation/workflows/tasks.py b/backend/src/baserow/contrib/automation/workflows/tasks.py index a3749dae9f..ae4f2c09df 100644 --- a/backend/src/baserow/contrib/automation/workflows/tasks.py +++ b/backend/src/baserow/contrib/automation/workflows/tasks.py @@ -8,6 +8,9 @@ from baserow.contrib.automation.history.constants import HistoryStatusChoices from baserow.contrib.automation.history.handler import AutomationHistoryHandler from baserow.contrib.automation.history.models import AutomationWorkflowHistory +from baserow.contrib.automation.workflows.signals import ( + automation_workflow_dispatch_done, +) from baserow.core.db import atomic_with_retry_on_deadlock @@ -54,7 +57,6 @@ def handle_workflow_dispatch_done( AutomationWorkflowHistory.objects.filter( id=history_id, simulate_until_node_id=simulate_until_node_id ).delete() - else: # Only update the history if it's still started. # If the workflow history was marked as failed by a specific node, we @@ -66,3 +68,9 @@ def handle_workflow_dispatch_done( status=HistoryStatusChoices.SUCCESS, completed_on=timezone.now(), ) + + history = AutomationWorkflowHistory.objects.get(id=history_id) + automation_workflow_dispatch_done.send( + sender=None, + workflow_history=history, + ) diff --git a/backend/src/baserow/contrib/automation/workflows/ws/signals.py b/backend/src/baserow/contrib/automation/workflows/ws/signals.py index 6a0c42f428..867f9c40d2 100644 --- a/backend/src/baserow/contrib/automation/workflows/ws/signals.py +++ b/backend/src/baserow/contrib/automation/workflows/ws/signals.py @@ -21,6 +21,8 @@ from baserow.contrib.automation.workflows.signals import ( automation_workflow_created, automation_workflow_deleted, + automation_workflow_dispatch_done, + automation_workflow_dispatch_started, automation_workflow_published, automation_workflow_updated, automation_workflows_reordered, @@ -126,3 +128,41 @@ def workflow_reordered( getattr(user, "web_socket_id", None), ) ) + + +@receiver(automation_workflow_dispatch_started) +def workflow_dispatch_started(sender, workflow_history, **kwargs): + workflow = workflow_history.original_workflow + transaction.on_commit( + lambda: broadcast_to_permitted_users.delay( + workflow.automation.workspace_id, + ReadAutomationWorkflowOperationType.type, + AutomationWorkflowObjectScopeType.type, + workflow.id, + { + "type": "automation_workflow_dispatch_started", + "workflow_id": workflow.id, + "history_id": workflow_history.id, + }, + None, + ) + ) + + +@receiver(automation_workflow_dispatch_done) +def workflow_dispatch_done(sender, workflow_history, **kwargs): + workflow = workflow_history.original_workflow + transaction.on_commit( + lambda: broadcast_to_permitted_users.delay( + workflow.automation.workspace_id, + ReadAutomationWorkflowOperationType.type, + AutomationWorkflowObjectScopeType.type, + workflow.id, + { + "type": "automation_workflow_dispatch_done", + "workflow_id": workflow.id, + "history_id": workflow_history.id, + }, + None, + ) + ) diff --git a/backend/src/baserow/test_utils/fixtures/automation_history.py b/backend/src/baserow/test_utils/fixtures/automation_history.py index 894bdd83cd..f2930e32e9 100644 --- a/backend/src/baserow/test_utils/fixtures/automation_history.py +++ b/backend/src/baserow/test_utils/fixtures/automation_history.py @@ -42,6 +42,7 @@ def create_workflow_history(self, user=None, **kwargs): ) history = AutomationHistoryHandler().create_workflow_history( + original_workflow=original_workflow, workflow=original_workflow, started_on=started_on, is_test_run=is_test_run, diff --git a/backend/src/baserow/test_utils/fixtures/automation_workflow_history.py b/backend/src/baserow/test_utils/fixtures/automation_workflow_history.py index 16f82ab663..b72142f02a 100644 --- a/backend/src/baserow/test_utils/fixtures/automation_workflow_history.py +++ b/backend/src/baserow/test_utils/fixtures/automation_workflow_history.py @@ -1,11 +1,18 @@ from django.utils import timezone from baserow.contrib.automation.history.constants import HistoryStatusChoices -from baserow.contrib.automation.history.models import AutomationWorkflowHistory +from baserow.contrib.automation.history.models import ( + AutomationNodeHistory, + AutomationNodeResult, + AutomationWorkflowHistory, +) class AutomationWorkflowHistoryFixtures: def create_automation_workflow_history(self, user=None, **kwargs): + if "original_workflow" not in kwargs: + kwargs["original_workflow"] = kwargs["workflow"] + if "workflow" not in kwargs: user = user or self.create_user() kwargs["workflow"] = self.create_automation_workflow(user=user) @@ -20,3 +27,36 @@ def create_automation_workflow_history(self, user=None, **kwargs): kwargs["is_test_run"] = False return AutomationWorkflowHistory.objects.create(**kwargs) + + def create_automation_node_history(self, user=None, **kwargs): + user = user or self.create_user() + + if "workflow_history" not in kwargs: + kwargs["workflow_history"] = self.create_automation_workflow_history( + user=user, **kwargs + ) + + if "node" not in kwargs: + kwargs["node"] = self.create_automation_node( + user=user, + workflow=kwargs["workflow_history"].workflow, + **kwargs, + ) + + if "started_on" not in kwargs: + kwargs["started_on"] = timezone.now() + + if "status" not in kwargs: + kwargs["status"] = HistoryStatusChoices.STARTED + + return AutomationNodeHistory.objects.create(**kwargs) + + def create_automation_node_result(self, user=None, **kwargs): + user = user or self.create_user() + + if "node_history" not in kwargs: + kwargs["node_history"] = self.create_automation_node_history( + user=user, **kwargs + ) + + return AutomationNodeResult.objects.create(**kwargs) diff --git a/backend/tests/baserow/contrib/automation/api/workflows/test_workflow_views.py b/backend/tests/baserow/contrib/automation/api/workflows/test_workflow_views.py index 2f5ef19d68..e71cbac17d 100644 --- a/backend/tests/baserow/contrib/automation/api/workflows/test_workflow_views.py +++ b/backend/tests/baserow/contrib/automation/api/workflows/test_workflow_views.py @@ -1,5 +1,6 @@ import datetime +from django.db.models import Count, Q from django.urls import reverse from django.utils import timezone @@ -14,8 +15,14 @@ HTTP_404_NOT_FOUND, ) +from baserow.contrib.automation.api.workflows.serializers import ( + AutomationWorkflowHistorySerializer, +) +from baserow.contrib.automation.history.constants import HistoryStatusChoices +from baserow.contrib.automation.history.handler import AutomationHistoryHandler from baserow.contrib.automation.workflows.constants import ALLOW_TEST_RUN_MINUTES from baserow.contrib.database.rows.handler import RowHandler +from baserow.core.cache import local_cache from baserow.test_utils.helpers import AnyInt, AnyStr from tests.baserow.contrib.automation.api.utils import get_api_kwargs @@ -635,6 +642,8 @@ def test_get_workflow_histories(api_client, data_fixture): assert response.status_code == HTTP_200_OK assert response.json() == { "count": 1, + "fail_count": 0, + "success_count": 1, "next": None, "previous": None, "results": [ @@ -645,6 +654,9 @@ def test_get_workflow_histories(api_client, data_fixture): "is_test_run": False, "message": "", "status": "success", + "event_payload": None, + "node_histories": [], + "simulate_until_node": None, }, ], } @@ -706,3 +718,150 @@ def test_rename_workflow_using_existing_workflow_name(api_client, data_fixture): assert workflow_1.name == "test1" workflow_2.refresh_from_db() assert workflow_2.name == "test1" + + +@pytest.mark.django_db +def test_get_workflow_histories_query_count(data_fixture, django_assert_num_queries): + user = data_fixture.create_user() + workflow = data_fixture.create_automation_workflow(user=user) + trigger = workflow.get_trigger() + + handler = AutomationHistoryHandler() + + def _create_histories(count): + for _ in range(count): + workflow_history = handler.create_workflow_history( + original_workflow=workflow, + workflow=workflow, + started_on=timezone.now(), + is_test_run=False, + ) + node_history = handler.create_node_history( + workflow_history=workflow_history, + node=trigger, + started_on=timezone.now(), + ) + handler.create_node_result( + node_history=node_history, + result={"foo": "bar"}, + ) + + _create_histories(3) + local_cache.clear() + + expected_queries = 10 + with django_assert_num_queries(expected_queries): + queryset = handler.get_workflow_histories(workflow) + queryset.aggregate( + success_count=Count("id", filter=Q(status=HistoryStatusChoices.SUCCESS)), + fail_count=Count("id", filter=Q(status=HistoryStatusChoices.ERROR)), + ) + AutomationWorkflowHistorySerializer(list(queryset), many=True).data + + _create_histories(3) + local_cache.clear() + + with django_assert_num_queries(expected_queries): + queryset = handler.get_workflow_histories(workflow) + queryset.aggregate( + success_count=Count("id", filter=Q(status=HistoryStatusChoices.SUCCESS)), + fail_count=Count("id", filter=Q(status=HistoryStatusChoices.ERROR)), + ) + AutomationWorkflowHistorySerializer(list(queryset), many=True).data + + +@pytest.mark.django_db +def test_get_workflow_histories_with_node_histories(api_client, data_fixture): + user, token = data_fixture.create_user_and_token() + workflow = data_fixture.create_automation_workflow(user=user) + trigger = workflow.get_trigger() + action_node = data_fixture.create_local_baserow_create_row_action_node( + user=user, workflow=workflow, label="My Action" + ) + + now = timezone.now() + workflow_history = data_fixture.create_automation_workflow_history( + user=user, + workflow=workflow, + status=HistoryStatusChoices.SUCCESS, + completed_on=now, + ) + node_history_1 = data_fixture.create_automation_node_history( + user=user, + workflow_history=workflow_history, + node=trigger, + completed_on=now, + ) + node_result_1 = data_fixture.create_automation_node_result( + user=user, + node_history=node_history_1, + result={"rows": [1, 2]}, + ) + node_history_2 = data_fixture.create_automation_node_history( + user=user, + workflow_history=workflow_history, + node=action_node, + completed_on=now, + ) + node_result_2 = data_fixture.create_automation_node_result( + user=user, + node_history=node_history_2, + result={"created_row_id": 99}, + iteration_path="0.2.1", + ) + + url = reverse(API_URL_WORKFLOW_HISTORY, kwargs={"workflow_id": workflow.id}) + response = api_client.get(url, **get_api_kwargs(token)) + + assert response.status_code == HTTP_200_OK + data = response.json() + + assert data["count"] == 1 + assert data["success_count"] == 1 + assert data["fail_count"] == 0 + + w_history = data["results"][0] + assert w_history["id"] == workflow_history.id + assert w_history["status"] == "success" + assert len(w_history["node_histories"]) == 2 + + n_history_1 = w_history["node_histories"][0] + assert n_history_1["node"] == trigger.id + assert n_history_1["workflow_history"] == workflow_history.id + assert n_history_1["node_type"] == trigger.get_type().type + assert n_history_1["node_label"] == trigger.label + assert n_history_1["parent_node_id"] is None + assert n_history_1["iteration"] == 0 + assert n_history_1["result"] == {"rows": [1, 2]} + + n_history_2 = w_history["node_histories"][1] + assert n_history_2["node"] == action_node.id + assert n_history_2["workflow_history"] == workflow_history.id + assert n_history_2["node_type"] == action_node.get_type().type + assert n_history_2["node_label"] == "My Action" + assert n_history_2["iteration"] == 1 + assert n_history_2["result"] == {"created_row_id": 99} + + +@pytest.mark.django_db +def test_get_workflow_histories_has_success_and_fail_counts(api_client, data_fixture): + user, token = data_fixture.create_user_and_token() + workflow = data_fixture.create_automation_workflow(user=user) + data_fixture.create_automation_workflow_history( + workflow=workflow, status=HistoryStatusChoices.SUCCESS + ) + data_fixture.create_automation_workflow_history( + workflow=workflow, status=HistoryStatusChoices.SUCCESS + ) + data_fixture.create_automation_workflow_history( + workflow=workflow, status=HistoryStatusChoices.ERROR + ) + + url = reverse(API_URL_WORKFLOW_HISTORY, kwargs={"workflow_id": workflow.id}) + response = api_client.get(url, **get_api_kwargs(token)) + + assert response.status_code == HTTP_200_OK + data = response.json() + assert data["count"] == 3 + assert data["success_count"] == 2 + assert data["fail_count"] == 1 diff --git a/backend/tests/baserow/contrib/automation/data_providers/test_data_provider_types.py b/backend/tests/baserow/contrib/automation/data_providers/test_data_provider_types.py index ef7bfed141..e2c5329d16 100644 --- a/backend/tests/baserow/contrib/automation/data_providers/test_data_provider_types.py +++ b/backend/tests/baserow/contrib/automation/data_providers/test_data_provider_types.py @@ -17,6 +17,7 @@ def test_previous_node_data_provider_get_data_chunk(data_fixture): workflow = data_fixture.create_automation_workflow() workflow_history = AutomationHistoryHandler().create_workflow_history( + workflow, workflow, timezone.now(), False, @@ -78,6 +79,7 @@ def test_previous_node_data_provider_get_data_chunk(data_fixture): assert exc.value.args[0] == "The previous node doesn't exist" workflow_history_2 = AutomationHistoryHandler().create_workflow_history( + workflow, workflow, timezone.now(), False, @@ -132,6 +134,7 @@ def test_previous_node_data_provider_import_path(data_fixture): def test_current_iteration_data_provider_get_data_chunk(data_fixture): workflow = data_fixture.create_automation_workflow() workflow_history = AutomationHistoryHandler().create_workflow_history( + workflow, workflow, timezone.now(), False, diff --git a/backend/tests/baserow/contrib/automation/history/test_history_handler.py b/backend/tests/baserow/contrib/automation/history/test_history_handler.py index 1238f2645a..20bff66ca7 100644 --- a/backend/tests/baserow/contrib/automation/history/test_history_handler.py +++ b/backend/tests/baserow/contrib/automation/history/test_history_handler.py @@ -59,6 +59,7 @@ def test_create_workflow_history(data_fixture): now = timezone.now() history = AutomationHistoryHandler().create_workflow_history( + original_workflow, original_workflow, now, False, @@ -93,6 +94,7 @@ def test_get_workflow_histories_excludes_simulation_histories(data_fixture): def test_get_workflow_history(data_fixture): workflow = data_fixture.create_automation_workflow() history = AutomationHistoryHandler().create_workflow_history( + workflow, workflow, timezone.now(), False, @@ -115,6 +117,7 @@ def test_get_workflow_history_does_not_exist(data_fixture): def test_get_workflow_history_respects_base_queryset(data_fixture): workflow = data_fixture.create_automation_workflow() history = AutomationHistoryHandler().create_workflow_history( + workflow, workflow, timezone.now(), False, diff --git a/backend/tests/baserow/contrib/automation/history/utils.py b/backend/tests/baserow/contrib/automation/history/utils.py index 804da2780f..0e2f113da7 100644 --- a/backend/tests/baserow/contrib/automation/history/utils.py +++ b/backend/tests/baserow/contrib/automation/history/utils.py @@ -7,13 +7,13 @@ def assert_history( """Helper to test AutomationWorkflowHistory objects.""" histories = list( - AutomationWorkflowHistory.objects.filter(workflow=workflow).order_by( + AutomationWorkflowHistory.objects.filter(original_workflow=workflow).order_by( "started_on", "id" ) ) assert len(histories) == expected_count if expected_count > 0: history = histories[history_index] - assert history.workflow == workflow + assert history.original_workflow == workflow assert history.status == expected_status assert history.message == expected_msg diff --git a/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch_async.py b/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch_async.py index 5f5806f3ee..373f01c3be 100644 --- a/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch_async.py +++ b/backend/tests/baserow/contrib/automation/nodes/test_node_dispatch_async.py @@ -19,6 +19,7 @@ "baserow.contrib.automation.nodes.node_types.LocalBaserowRowsCreatedNodeTriggerType" ) NODE_HANDLER_PATH = "baserow.contrib.automation.nodes.handler" +TASKS_PATH = "baserow.contrib.automation.workflows.tasks" def assert_dispatches_next_node(result, *expected_tasks): @@ -205,6 +206,10 @@ def test_dispatch_node_service_error(data_fixture): assert error in node_history.message assert node_history.status == HistoryStatusChoices.ERROR + node_result = AutomationNodeResult.objects.get(node_history=node_history) + assert node_result.result == {} + assert node_result.iteration_path == "" + @pytest.mark.django_db @patch(f"{TRIGGER_NODE_TYPE_PATH}.dispatch") @@ -234,6 +239,10 @@ def test_dispatch_node_unexpected_error(mock_logger, mock_dispatch, data_fixture assert error in node_history.message assert node_history.status == HistoryStatusChoices.ERROR + node_result = AutomationNodeResult.objects.get(node_history=node_history) + assert node_result.result == {} + assert node_result.iteration_path == "" + @pytest.mark.django_db @patch(f"{TRIGGER_NODE_TYPE_PATH}.dispatch") @@ -269,6 +278,10 @@ def test_dispatch_node_expected_error(mock_logger, mock_dispatch, data_fixture): assert error in node_history.message assert node_history.status == HistoryStatusChoices.ERROR + node_result = AutomationNodeResult.objects.get(node_history=node_history) + assert node_result.result == {} + assert node_result.iteration_path == "" + @pytest.mark.django_db def test_dispatch_node_dispatches_trigger(data_fixture): @@ -283,6 +296,7 @@ def test_dispatch_node_dispatches_trigger(data_fixture): ) handle_workflow_dispatch_done(history_id=workflow_history.id) + workflow_history.refresh_from_db() assert workflow_history.message == "" assert workflow_history.status == HistoryStatusChoices.SUCCESS @@ -1489,3 +1503,78 @@ def test_dispatch_node_with_deleted_node(mock_logger, data_fixture): "deleted before the task was executed." ) mock_logger.warning.assert_called_once_with(expected_error) + + +@pytest.mark.django_db +@patch(f"{NODE_HANDLER_PATH}.automation_node_updated") +def test_dispatch_node_simulation_copies_sample_data_to_draft_node( + mock_automation_node_updated, + data_fixture, +): + data = create_workflow(data_fixture) + workflow = data["workflow"] + trigger_node = data["trigger_node"] + + workflow_history = data_fixture.create_automation_workflow_history( + original_workflow=workflow, + workflow=workflow, + simulate_until_node=trigger_node, + is_test_run=True, + ) + + assert trigger_node.service.specific.sample_data is None + AutomationNodeHandler().dispatch_node( + trigger_node.id, + history_id=workflow_history.id, + ) + + # Ensure the sample_data is saved to the draft node's service + trigger_node.service.specific.refresh_from_db() + assert trigger_node.service.specific.sample_data is not None + + # Ensure a signal is sent for the draft node + mock_automation_node_updated.send.assert_called_once_with( + ANY, user=None, node=trigger_node + ) + + +@pytest.mark.django_db +@patch(f"{TASKS_PATH}.automation_workflow_dispatch_done") +def test_handle_workflow_dispatch_done_sends_signal_on_success( + mock_dispatch_done_signal, data_fixture +): + data = create_workflow(data_fixture) + workflow_history = data["workflow_history"] + assert workflow_history.status == HistoryStatusChoices.STARTED + + handle_workflow_dispatch_done(history_id=workflow_history.id) + + workflow_history.refresh_from_db() + assert workflow_history.status == HistoryStatusChoices.SUCCESS + mock_dispatch_done_signal.send.assert_called_once() + assert ( + mock_dispatch_done_signal.send.call_args.kwargs["workflow_history"].id + == workflow_history.id + ) + + +@pytest.mark.django_db +@patch(f"{TASKS_PATH}.automation_workflow_dispatch_done") +def test_handle_workflow_dispatch_done_sends_signal_on_error( + mock_dispatch_done_signal, data_fixture +): + data = create_workflow(data_fixture) + workflow_history = data["workflow_history"] + workflow_history.status = HistoryStatusChoices.ERROR + workflow_history.message = "Node failed" + workflow_history.save() + + handle_workflow_dispatch_done(history_id=workflow_history.id) + + workflow_history.refresh_from_db() + assert workflow_history.status == HistoryStatusChoices.ERROR + mock_dispatch_done_signal.send.assert_called_once() + assert ( + mock_dispatch_done_signal.send.call_args.kwargs["workflow_history"].id + == workflow_history.id + ) diff --git a/backend/tests/baserow/contrib/automation/nodes/test_node_service.py b/backend/tests/baserow/contrib/automation/nodes/test_node_service.py index 23fb8ada51..52f01b6046 100644 --- a/backend/tests/baserow/contrib/automation/nodes/test_node_service.py +++ b/backend/tests/baserow/contrib/automation/nodes/test_node_service.py @@ -11,6 +11,8 @@ from baserow.contrib.automation.nodes.registries import automation_node_type_registry from baserow.contrib.automation.nodes.service import AutomationNodeService from baserow.contrib.automation.nodes.trash_types import AutomationNodeTrashableItemType +from baserow.contrib.automation.workflows.constants import WORKFLOW_DIRTY_CACHE_KEY +from baserow.core.cache import global_cache from baserow.core.exceptions import UserNotInWorkspace from baserow.core.trash.handler import TrashHandler from baserow.test_utils.fixtures import Fixtures @@ -755,3 +757,91 @@ def test_move_node_invalid_reference_node(data_fixture: Fixtures): ) assert exc.value.args[0] == f"The reference node {action2.id} can't have child" + + +@pytest.mark.django_db +def test_update_node_updates_workflow_dirty_cache(data_fixture): + """ + When a node is updated, the workflow's dirty cache flag should be set + so that the test clone knows to create a new clone instead of + reusing the last one. + """ + + user = data_fixture.create_user() + node = data_fixture.create_automation_node(user=user) + + cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(node.workflow.id) + + assert global_cache.get(cache_key, default=False) is False + + AutomationNodeService().update_node(user, node.id, label="foo label") + + assert global_cache.get(cache_key, default=False) is True + + +@pytest.mark.django_db +def test_create_node_updates_workflow_dirty_cache(data_fixture): + """ + When a node is created, the workflow's dirty cache flag should be set + so that the test clone knows to create a new clone instead of + reusing the last one. + """ + + user = data_fixture.create_user() + node = data_fixture.create_automation_node(user=user) + workflow = node.workflow + + cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(workflow.id) + + assert global_cache.get(cache_key, default=False) is False + + AutomationNodeService().create_node( + user, + node_type=automation_node_type_registry.get("local_baserow_create_row"), + workflow=workflow, + reference_node_id=node.id, + position="south", + output="", + ) + + assert global_cache.get(cache_key, default=False) is True + + +@pytest.mark.django_db +def test_duplicate_node_updates_workflow_dirty_cache(data_fixture): + """ + When a node is duplicated, the workflow's dirty cache flag should be set + so that the test clone knows to create a new clone instead of + reusing the last one. + """ + + user = data_fixture.create_user() + node = data_fixture.create_local_baserow_create_row_action_node(user=user) + + cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(node.workflow.id) + + assert global_cache.get(cache_key, default=False) is False + + AutomationNodeService().duplicate_node(user, node.id) + + assert global_cache.get(cache_key, default=False) is True + + +@pytest.mark.django_db +def test_delete_node_updates_workflow_dirty_cache(data_fixture): + """ + When a node is deleted, the workflow's dirty cache flag should be set + so that the test clone knows to create a new clone instead of + reusing the last one. + """ + + user = data_fixture.create_user() + node = data_fixture.create_automation_node(user=user) + + cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(node.workflow.id) + + assert global_cache.get(cache_key, default=False) is False + + AutomationNodeService().delete_node(user, node.id) + + assert global_cache.get(cache_key, default=False) is True diff --git a/backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py b/backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py index 5c9756bb5e..7d1aac515a 100644 --- a/backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py +++ b/backend/tests/baserow/contrib/automation/workflows/test_workflow_handler.py @@ -10,14 +10,18 @@ from freezegun import freeze_time from baserow.contrib.automation.history.constants import HistoryStatusChoices -from baserow.contrib.automation.history.models import AutomationWorkflowHistory -from baserow.contrib.automation.models import AutomationWorkflow +from baserow.contrib.automation.history.models import ( + AutomationNodeHistory, + AutomationWorkflowHistory, +) +from baserow.contrib.automation.models import Automation, AutomationWorkflow from baserow.contrib.automation.nodes.node_types import ( CorePeriodicTriggerNodeType, LocalBaserowRowsCreatedNodeTriggerType, ) from baserow.contrib.automation.workflows.constants import ( ALLOW_TEST_RUN_MINUTES, + WORKFLOW_DIRTY_CACHE_KEY, WorkflowState, ) from baserow.contrib.automation.workflows.exceptions import ( @@ -28,7 +32,7 @@ AutomationWorkflowTooManyErrors, ) from baserow.contrib.automation.workflows.handler import AutomationWorkflowHandler -from baserow.core.cache import local_cache +from baserow.core.cache import global_cache, local_cache from baserow.core.notifications.models import Notification, NotificationRecipient from baserow.core.registries import ImportExportConfig from baserow.core.trash.handler import TrashHandler @@ -429,13 +433,24 @@ def test_publish_returns_published_workflow(data_fixture): @pytest.mark.django_db def test_publish_cleans_up_old_workflows(data_fixture): workflow = data_fixture.create_automation_workflow() + handler = AutomationWorkflowHandler() + + test_clone_automation, _ = handler._clone_workflow( + workflow, WorkflowState.TEST_CLONE + ) + test_clone_workflow = test_clone_automation.workflows.first() + data_fixture.create_automation_workflow_history( + original_workflow=workflow, + workflow=test_clone_workflow, + status=HistoryStatusChoices.SUCCESS, + ) - published_1 = AutomationWorkflowHandler().publish(workflow) - published_2 = AutomationWorkflowHandler().publish(workflow) - published_3 = AutomationWorkflowHandler().publish(workflow) - published_4 = AutomationWorkflowHandler().publish(workflow) + published_1 = handler.publish(workflow) + published_2 = handler.publish(workflow) + published_3 = handler.publish(workflow) + published_4 = handler.publish(workflow) - # The first two workflows should no longer exist + # The first two published workflows should no longer exist assert AutomationWorkflow.objects_and_trash.filter(id=published_1.id).count() == 0 assert AutomationWorkflow.objects_and_trash.filter(id=published_2.id).count() == 0 @@ -446,6 +461,53 @@ def test_publish_cleans_up_old_workflows(data_fixture): # The latest published workflow should be active assert published_4.is_published is True + # The test clone should still exist and the state should still be correct + test_clone_workflow.refresh_from_db() + assert AutomationWorkflow.objects_and_trash.filter( + id=test_clone_workflow.id + ).exists() + assert test_clone_workflow.state == WorkflowState.TEST_CLONE + + +@pytest.mark.django_db +def test_publish_disables_live_workflow(data_fixture): + workflow = data_fixture.create_automation_workflow() + handler = AutomationWorkflowHandler() + + # Publish the workflow + published = handler.publish(workflow) + data_fixture.create_automation_workflow_history( + original_workflow=workflow, + workflow=published, + status=HistoryStatusChoices.SUCCESS, + ) + assert published.state == WorkflowState.LIVE + + # Edit the draft to force _ensure_published_for_run to create a new clone + workflow.refresh_from_db() + workflow.save() + + # Create a test clone to simulate a test run + test_clone_automation, _ = handler._clone_workflow( + workflow, WorkflowState.TEST_CLONE + ) + test_clone_workflow = test_clone_automation.workflows.first() + + # The test clone should be the newest automation + assert test_clone_automation.id > published.automation.id + + # The previously published workflow should now be disabled + new_published = handler.publish(workflow) + published.refresh_from_db() + assert published.state == WorkflowState.DISABLED + + # Make sure the newly published workflow is live + assert new_published.state == WorkflowState.LIVE + + # The test clone should be unaffected + test_clone_workflow.refresh_from_db() + assert test_clone_workflow.state == WorkflowState.TEST_CLONE + @pytest.mark.django_db def test_publish_only_exports_specific_workflow(data_fixture): @@ -512,6 +574,27 @@ def test_get_published_workflow_returns_workflow(data_fixture): assert result == published_workflow +@pytest.mark.django_db +def test_get_published_workflow_ignores_newer_test_clone(data_fixture): + workflow = data_fixture.create_automation_workflow() + handler = AutomationWorkflowHandler() + + published_workflow = handler.publish(workflow) + test_clone_automation, _ = handler._clone_workflow( + workflow, WorkflowState.TEST_CLONE + ) + test_clone_workflow = test_clone_automation.workflows.first() + + # Sanity check that the newer clone would win without the TEST_CLONE exclusion. + assert test_clone_automation.id > published_workflow.automation.id + assert test_clone_workflow.state == WorkflowState.TEST_CLONE + + result = handler.get_published_workflow(workflow, with_cache=False) + + assert result == published_workflow + assert result != test_clone_workflow + + @pytest.mark.django_db def test_update_workflow_correctly_pauses_published_workflow(data_fixture): user = data_fixture.create_user() @@ -833,7 +916,9 @@ def test_async_start_workflow_creates_rate_limited_history_once_until_cache_rese handler.async_start_workflow(published_workflow) handler.async_start_workflow(published_workflow) - histories = AutomationWorkflowHistory.objects.filter(workflow=original_workflow) + histories = AutomationWorkflowHistory.objects.filter( + original_workflow=original_workflow + ) assert histories.count() == 1 assert_history( original_workflow, @@ -847,7 +932,9 @@ def test_async_start_workflow_creates_rate_limited_history_once_until_cache_rese with freeze_time("2026-01-26 13:00:06"): handler.async_start_workflow(published_workflow) - histories = AutomationWorkflowHistory.objects.filter(workflow=original_workflow) + histories = AutomationWorkflowHistory.objects.filter( + original_workflow=original_workflow + ) assert histories.count() == 2 @@ -1185,7 +1272,7 @@ def test_toggle_test_mode_on( frozen_time = "2025-06-04 11:00" with freeze_time(frozen_time): - AutomationWorkflowHandler().toggle_test_run(workflow) + AutomationWorkflowHandler().toggle_test_run(workflow, None) workflow.refresh_from_db() @@ -1210,7 +1297,7 @@ def test_toggle_test_mode_on_immediate( frozen_time = "2025-06-04 11:00" with freeze_time(frozen_time): - AutomationWorkflowHandler().toggle_test_run(workflow) + AutomationWorkflowHandler().toggle_test_run(workflow, None) workflow.refresh_from_db() @@ -1234,7 +1321,7 @@ def test_toggle_test_mode_off( trigger_type=LocalBaserowRowsCreatedNodeTriggerType.type, ) - AutomationWorkflowHandler().toggle_test_run(workflow) + AutomationWorkflowHandler().toggle_test_run(workflow, None) workflow.refresh_from_db() @@ -1436,12 +1523,13 @@ def test_async_start_workflow_with_simulate_until_node( workflow.refresh_from_db() history = workflow.workflow_histories.get() + cloned_trigger = history.workflow.get_trigger() + assert history.simulate_until_node_id == cloned_trigger.id assert workflow.simulate_until_node is None assert history.is_test_run is True - assert history.simulate_until_node_id == trigger.id mock_start_workflow_celery_task.delay.assert_called_once_with( - workflow.id, history.id + history.workflow_id, history.id ) @@ -1510,9 +1598,9 @@ def test_async_start_workflow_rate_limited_runs_eventually_disable_workflow( AutomationWorkflowHandler().async_start_workflow(published_workflow) histories = list( - AutomationWorkflowHistory.objects.filter(workflow=original_workflow).order_by( - "started_on", "id" - ) + AutomationWorkflowHistory.objects.filter( + original_workflow=original_workflow + ).order_by("started_on", "id") ) assert len(histories) == 6 @@ -1553,8 +1641,12 @@ def test_async_start_workflow_rate_limited_runs_eventually_disable_workflow( @pytest.mark.django_db @patch(f"{WORKFLOWS_MODULE}.handler.transaction.on_commit") @patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task") +@patch(f"{WORKFLOWS_MODULE}.handler.automation_workflow_dispatch_started") def test_async_start_workflow_queues_celery_task_on_commit( - mock_start_workflow_celery_task, mock_on_commit, data_fixture + mock_automation_workflow_dispatch_started, + mock_start_workflow_celery_task, + mock_on_commit, + data_fixture, ): workflow = data_fixture.create_automation_workflow() @@ -1563,13 +1655,19 @@ def test_async_start_workflow_queues_celery_task_on_commit( AutomationWorkflowHandler().async_start_workflow(workflow) history = workflow.workflow_histories.get() + # Ensure the workflow_id is the cloned workflow, not the draft + assert history.workflow_id != workflow.id mock_on_commit.assert_called_once() mock_start_workflow_celery_task.delay.assert_not_called() mock_on_commit.call_args.args[0]() mock_start_workflow_celery_task.delay.assert_called_once_with( - workflow.id, history.id + history.workflow_id, history.id + ) + mock_automation_workflow_dispatch_started.send.assert_called_once_with( + sender=None, + workflow_history=history, ) @@ -1670,14 +1768,27 @@ def test_before_run_marks_timed_out_started_history_as_failed(data_fixture): workflow=original_workflow, status=HistoryStatusChoices.STARTED, ) + node_history = AutomationNodeHistory.objects.create( + workflow_history=timed_out_history, + node=original_workflow.get_trigger(), + started_on=timed_out_history.started_on, + status=HistoryStatusChoices.STARTED, + ) with freeze_time("2026-03-10 12:00:00"): AutomationWorkflowHandler().before_run(published_workflow) - timed_out_history.refresh_from_db() + error_message = "This workflow took too long and was timed out." + timed_out_history.refresh_from_db() assert timed_out_history.status == HistoryStatusChoices.ERROR - assert timed_out_history.message == "This workflow took too long and was timed out." + assert timed_out_history.message == error_message + assert timed_out_history.completed_on is not None + + node_history.refresh_from_db() + assert node_history.status == HistoryStatusChoices.ERROR + assert node_history.message == error_message + assert node_history.completed_on == timed_out_history.completed_on @pytest.mark.django_db @@ -1758,3 +1869,200 @@ def test_clear_old_history_excludes_started_workflows_max_days(data_fixture): assert workflow.workflow_histories.filter(id=history_1.id).exists() is True assert workflow.workflow_histories.filter(id=history_2.id).exists() is False + + +@pytest.mark.django_db +def test_ensure_published_for_run_creates_new_clone(data_fixture): + workflow = data_fixture.create_automation_workflow() + + cloned_workflow = AutomationWorkflowHandler()._ensure_published_for_run(workflow) + + # Ensure that the cloned workflow is a new workflow + assert cloned_workflow.id != workflow.id + assert cloned_workflow.automation.published_from == workflow + assert cloned_workflow.state == WorkflowState.TEST_CLONE + + +@pytest.mark.django_db +def test_ensure_published_for_run_creates_new_after_workflow_update(data_fixture): + workflow = data_fixture.create_automation_workflow() + + handler = AutomationWorkflowHandler() + + cloned_workflow_1 = handler._ensure_published_for_run(workflow) + + # Set the dirty flag to trigger a new clone + cache_key = WORKFLOW_DIRTY_CACHE_KEY.format(workflow.id) + global_cache.update(cache_key, lambda _: True) + + cloned_workflow_2 = handler._ensure_published_for_run(workflow) + + # Because the workflow was updated, a new clone should be created + assert cloned_workflow_1.id != cloned_workflow_2.id + + +@pytest.mark.django_db +def test_ensure_published_for_run_reuse_automation(data_fixture): + """ + If a cloned automation exists and is still fresh (no edits to draft + since publish), we should reuse it rather than creating a new clone. + """ + + workflow = data_fixture.create_automation_workflow() + + handler = AutomationWorkflowHandler() + + cloned_workflow = handler._ensure_published_for_run(workflow) + + second_cloned_workflow = handler._ensure_published_for_run(workflow) + + assert cloned_workflow.automation_id == second_cloned_workflow.automation_id + + +@pytest.mark.django_db +def test_ensure_published_for_run_ignores_published_workflow_states(data_fixture): + workflow = data_fixture.create_automation_workflow() + handler = AutomationWorkflowHandler() + + published_workflow = handler.publish(workflow) + clone = handler._ensure_published_for_run(workflow) + second_clone = handler._ensure_published_for_run(workflow) + + assert published_workflow.state == WorkflowState.LIVE + assert clone.id != published_workflow.id + assert clone.state == WorkflowState.TEST_CLONE + assert clone.automation.published_from == workflow + assert second_clone.id == clone.id + + handler.update_workflow(workflow, state=WorkflowState.PAUSED) + published_workflow.refresh_from_db() + + clone_after = handler._ensure_published_for_run(workflow) + second_clone_after = handler._ensure_published_for_run(workflow) + + assert published_workflow.state == WorkflowState.PAUSED + assert clone_after.id == clone.id + assert clone_after.id != published_workflow.id + assert clone_after.state == WorkflowState.TEST_CLONE + assert clone_after.automation.published_from == workflow + assert second_clone_after.id == clone_after.id + + +@pytest.mark.django_db +def test_publish_preserves_old_live_automation_with_history(data_fixture): + """ + When re-publishing, old live automations that have history entries + pointing at them should not be deleted. + """ + + workflow = data_fixture.create_automation_workflow() + handler = AutomationWorkflowHandler() + + published = handler.publish(workflow) + published_automation = published.automation + + # Create a history entry pointing to the first published automation + data_fixture.create_automation_workflow_history( + original_workflow=workflow, + workflow=published, + status=HistoryStatusChoices.SUCCESS, + ) + + # Publish twice. Normally, this would deleted the oldest published entry. + handler.publish(workflow) + handler.publish(workflow) + + # The first automation shouldn't be deleted because it has history entries + assert Automation.objects.filter(id=published_automation.id).exists() + + +@override_settings( + AUTOMATION_WORKFLOW_HISTORY_MAX_ENTRIES=100, + AUTOMATION_WORKFLOW_HISTORY_MAX_DAYS=1, +) +@pytest.mark.django_db +def test_clear_old_history_deletes_orphaned_automations(data_fixture): + """ + When history entries are cleaned up, any published automations + that no longer have history entries pointing at them should be deleted. + """ + + workflow = data_fixture.create_automation_workflow() + handler = AutomationWorkflowHandler() + + with freeze_time("2026-04-20 11:00:00"): + cloned_workflow = handler._ensure_published_for_run(workflow) + + clone_automation_id = cloned_workflow.automation_id + + handler.publish(workflow) + + with freeze_time("2026-04-20 12:00:00"): + data_fixture.create_automation_workflow_history( + original_workflow=workflow, + workflow=cloned_workflow, + status=HistoryStatusChoices.SUCCESS, + ) + + # 12 hours later but within 1 day, so history survives + with freeze_time("2026-04-21 00:00:00"): + handler._clear_old_history(workflow) + + assert Automation.objects.filter(id=clone_automation_id).exists() + + # 2 days later, so history should have been deleted, and the cloned + # automation should be pruned as well. + with freeze_time("2026-04-22 12:00:00"): + handler._clear_old_history(workflow) + + assert not Automation.objects.filter(id=clone_automation_id).exists() + + +@pytest.mark.django_db +def test_clear_old_history_keeps_live_published_automation_when_newer_test_clone_exists( + data_fixture, +): + workflow = data_fixture.create_automation_workflow() + handler = AutomationWorkflowHandler() + + with freeze_time("2026-04-27 12:00:00"): + published_workflow = handler.publish(workflow) + test_clone_workflow = handler._ensure_published_for_run(workflow) + + assert test_clone_workflow.automation_id != published_workflow.automation_id + assert test_clone_workflow.automation_id > published_workflow.automation_id + + with freeze_time("2026-04-27 12:01:00"): + handler._clear_old_history(workflow) + + assert Automation.objects.filter(id=published_workflow.automation_id).exists() + assert not Automation.objects.filter(id=test_clone_workflow.automation_id).exists() + + +@pytest.mark.django_db +@patch(f"{WORKFLOWS_MODULE}.handler.start_workflow_celery_task") +def test_async_start_workflow_test_run_creates_test_clone( + mock_start_workflow_celery_task, data_fixture, django_capture_on_commit_callbacks +): + """ + When async_start_workflow is called, it should call the celery task + using a history that is based on a cloned workflow, not the draft. + """ + + workflow = data_fixture.create_automation_workflow() + + with django_capture_on_commit_callbacks(execute=True): + AutomationWorkflowHandler().async_start_workflow(workflow) + + history = workflow.workflow_histories.get() + + # History's workflow should be a clone, not the draft + assert history.original_workflow == workflow + assert history.is_test_run is True + assert history.workflow_id != workflow.id + assert history.workflow.automation.published_from == workflow + assert history.workflow.state == WorkflowState.TEST_CLONE + + mock_start_workflow_celery_task.delay.assert_called_once_with( + history.workflow_id, history.id + ) diff --git a/backend/tests/baserow/contrib/integrations/slack/test_slack_write_message_service_type.py b/backend/tests/baserow/contrib/integrations/slack/test_slack_write_message_service_type.py index 22041deb6b..0f8c414e20 100644 --- a/backend/tests/baserow/contrib/integrations/slack/test_slack_write_message_service_type.py +++ b/backend/tests/baserow/contrib/integrations/slack/test_slack_write_message_service_type.py @@ -142,6 +142,7 @@ def test_dispatch_slack_write_message_with_formulas(data_fixture): application = data_fixture.create_automation_application(user=user) workflow = data_fixture.create_automation_workflow(automation=application) workflow_history = AutomationHistoryHandler().create_workflow_history( + workflow, workflow, timezone.now(), False, diff --git a/web-frontend/modules/automation/components/workflow/sidePanels/HistorySection.vue b/web-frontend/modules/automation/components/workflow/sidePanels/HistorySection.vue deleted file mode 100644 index 113b645880..0000000000 --- a/web-frontend/modules/automation/components/workflow/sidePanels/HistorySection.vue +++ /dev/null @@ -1,117 +0,0 @@ - - - diff --git a/web-frontend/modules/automation/components/workflow/sidePanels/HistorySidePanel.vue b/web-frontend/modules/automation/components/workflow/sidePanels/HistorySidePanel.vue index 5fad2ac00f..e369a782b5 100644 --- a/web-frontend/modules/automation/components/workflow/sidePanels/HistorySidePanel.vue +++ b/web-frontend/modules/automation/components/workflow/sidePanels/HistorySidePanel.vue @@ -1,5 +1,11 @@