Skip to content

Commit eaed88b

Browse files
committed
Merge branch 'wa-fix-periodic-trigger-on-test' into 'develop'
Fix periodic trigger in simulation See merge request baserow/baserow!3760
2 parents 5e8d7d1 + 37afdf5 commit eaed88b

40 files changed

+1010
-852
lines changed

backend/src/baserow/contrib/automation/api/nodes/views.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from drf_spectacular.utils import OpenApiParameter, extend_schema
77
from rest_framework.permissions import AllowAny, IsAuthenticated
88
from rest_framework.response import Response
9+
from rest_framework.status import HTTP_202_ACCEPTED
910
from rest_framework.views import APIView
1011

1112
from baserow.api.decorators import (
@@ -434,15 +435,10 @@ class SimulateDispatchAutomationNodeView(APIView):
434435
}
435436
)
436437
def post(self, request, node_id: int):
437-
updated_node = AutomationNodeService().simulate_dispatch_node(
438-
request.user, node_id
438+
AutomationWorkflowService().toggle_test_run(
439+
request.user, simulate_until_node_id=node_id
439440
)
440-
441-
serializer = automation_node_type_registry.get_serializer(
442-
updated_node, AutomationNodeSerializer
443-
)
444-
445-
return Response(serializer.data)
441+
return Response(status=HTTP_202_ACCEPTED)
446442

447443

448444
class MoveAutomationNodeView(APIView):

backend/src/baserow/contrib/automation/api/workflows/serializers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class Meta:
2525
"order",
2626
"automation_id",
2727
"allow_test_run_until",
28+
"simulate_until_node_id",
2829
"published_on",
2930
"state",
3031
)

backend/src/baserow/contrib/automation/api/workflows/urls.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from baserow.contrib.automation.api.workflows.views import (
44
AsyncAutomationDuplicateWorkflowView,
55
AsyncPublishAutomationWorkflowView,
6+
AutomationTestWorkflowView,
67
AutomationWorkflowHistoryView,
78
AutomationWorkflowsView,
89
AutomationWorkflowView,
@@ -36,6 +37,11 @@
3637
AsyncPublishAutomationWorkflowView.as_view(),
3738
name="async_publish",
3839
),
40+
re_path(
41+
r"(?P<workflow_id>[0-9]+)/test/$",
42+
AutomationTestWorkflowView.as_view(),
43+
name="test",
44+
),
3945
re_path(
4046
r"(?P<workflow_id>[0-9]+)/history/$",
4147
AutomationWorkflowHistoryView.as_view(),

backend/src/baserow/contrib/automation/api/workflows/views.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,3 +406,45 @@ def post(self, request, workflow_id: int):
406406
serializer = job_type_registry.get_serializer(job, JobSerializer)
407407

408408
return Response(serializer.data, status=HTTP_202_ACCEPTED)
409+
410+
411+
class AutomationTestWorkflowView(APIView):
412+
permission_classes = (IsAuthenticated,)
413+
414+
@extend_schema(
415+
parameters=[
416+
OpenApiParameter(
417+
name="workflow_id",
418+
location=OpenApiParameter.PATH,
419+
type=OpenApiTypes.INT,
420+
description="The workflow id the user wants to test.",
421+
),
422+
CLIENT_SESSION_ID_SCHEMA_PARAMETER,
423+
],
424+
tags=[AUTOMATION_WORKFLOWS_TAG],
425+
operation_id="test_automation_workflow",
426+
description=(
427+
"This endpoint plan the execution of a test run for this workflow."
428+
),
429+
request=None,
430+
responses={
431+
202: HTTP_202_ACCEPTED,
432+
404: get_error_schema(["ERROR_AUTOMATION_WORKFLOW_DOES_NOT_EXIST"]),
433+
},
434+
)
435+
@transaction.atomic
436+
@map_exceptions(
437+
{
438+
AutomationWorkflowDoesNotExist: ERROR_AUTOMATION_WORKFLOW_DOES_NOT_EXIST,
439+
}
440+
)
441+
def post(self, request, workflow_id: int):
442+
"""
443+
Start the workflow asynchronously in test mode.
444+
"""
445+
446+
AutomationWorkflowService().toggle_test_run(
447+
request.user, workflow_id=workflow_id
448+
)
449+
450+
return Response(status=HTTP_202_ACCEPTED)

backend/src/baserow/contrib/automation/automation_dispatch_context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ def __init__(
3535
self.workflow = workflow
3636
self.previous_nodes_results: Dict[int, Any] = {}
3737
self.dispatch_history: List[int] = []
38-
self.event_payload = event_payload
3938
self.simulate_until_node = simulate_until_node
4039

4140
services = (
@@ -53,6 +52,7 @@ def __init__(
5352
update_sample_data_for=services,
5453
use_sample_data=bool(self.simulate_until_node),
5554
force_outputs=force_outputs,
55+
event_payload=event_payload,
5656
)
5757

5858
def clone(self, **kwargs):

backend/src/baserow/contrib/automation/nodes/handler.py

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from collections import defaultdict
2-
from typing import Any, Dict, Iterable, List, Optional, Union
2+
from typing import Any, Dict, Iterable, List, Optional, Type, Union
33

44
from django.core.files.storage import Storage
55
from django.db.models import QuerySet
@@ -10,29 +10,34 @@
1010
from baserow.contrib.automation.models import AutomationWorkflow
1111
from baserow.contrib.automation.nodes.exceptions import (
1212
AutomationNodeDoesNotExist,
13-
AutomationNodeError,
13+
AutomationNodeMisconfiguredService,
1414
AutomationNodeNotInWorkflow,
15-
AutomationNodeSimulateDispatchError,
1615
)
1716
from baserow.contrib.automation.nodes.models import AutomationActionNode, AutomationNode
18-
from baserow.contrib.automation.nodes.node_types import AutomationNodeType
17+
from baserow.contrib.automation.nodes.node_types import (
18+
AutomationNodeActionNodeType,
19+
AutomationNodeType,
20+
)
1921
from baserow.contrib.automation.nodes.registries import automation_node_type_registry
2022
from baserow.contrib.automation.nodes.types import (
2123
AutomationNodeDict,
2224
AutomationNodeDuplication,
2325
AutomationNodeMove,
2426
NextAutomationNodeValues,
2527
)
26-
from baserow.contrib.automation.workflows.runner import AutomationWorkflowRunner
2728
from baserow.core.cache import local_cache
2829
from baserow.core.db import specific_iterator
2930
from baserow.core.exceptions import IdDoesNotExist
30-
from baserow.core.services.exceptions import UnexpectedDispatchException
31+
from baserow.core.services.exceptions import (
32+
ServiceImproperlyConfiguredDispatchException,
33+
)
3134
from baserow.core.services.handler import ServiceHandler
3235
from baserow.core.services.models import Service
3336
from baserow.core.storage import ExportZipFile
3437
from baserow.core.utils import MirrorDict, extract_allowed
3538

39+
from .signals import automation_node_updated
40+
3641

3742
class AutomationNodeHandler:
3843
allowed_fields = ["label", "service", "previous_node_id", "previous_node_output"]
@@ -612,33 +617,36 @@ def import_node_only(
612617

613618
return node_instance
614619

615-
def simulate_dispatch_node(self, node: AutomationNode) -> AutomationNode:
620+
def dispatch_node(
621+
self, node: "AutomationNode", dispatch_context: AutomationDispatchContext
622+
):
616623
"""
617-
Simulates a dispatch of the provided node. This will cause the node's
618-
`service.sample_data` to be populated.
624+
Dispatch one node and recursively dispatch the next nodes.
619625
620-
:param node: The node to simulate the dispatch for.
621-
:return: The updated node.
626+
:param node: The node to start with.
627+
:param dispatch_context: The context in which the workflow is being dispatched,
628+
which contains the event payload and other relevant data.
622629
"""
623630

624-
if node.get_type().is_workflow_trigger:
625-
node.workflow.simulate_until_node = node
626-
node.workflow.save()
627-
return node
628-
629-
dispatch_context = AutomationDispatchContext(
630-
node.workflow,
631-
simulate_until_node=node.specific,
632-
)
633-
631+
node_type: Type[AutomationNodeActionNodeType] = node.get_type()
634632
try:
635-
AutomationWorkflowRunner().run(node.workflow, dispatch_context)
636-
except (
637-
AutomationNodeError,
638-
UnexpectedDispatchException,
639-
) as e:
640-
raise AutomationNodeSimulateDispatchError(str(e))
641-
642-
node.refresh_from_db()
643-
644-
return node
633+
dispatch_result = node_type.dispatch(node, dispatch_context)
634+
dispatch_context.after_dispatch(node, dispatch_result)
635+
636+
# Return early if this is a simulated dispatch
637+
if until_node := dispatch_context.simulate_until_node:
638+
if until_node.id == node.id:
639+
# sample_data was updated as it's a simulation we should tell to
640+
# the frontend
641+
node.service.refresh_from_db(fields=["sample_data"])
642+
automation_node_updated.send(self, user=None, node=node)
643+
return
644+
645+
next_nodes = node.get_next_nodes(dispatch_result.output_uid)
646+
647+
for next_node in next_nodes:
648+
self.dispatch_node(next_node, dispatch_context)
649+
except ServiceImproperlyConfiguredDispatchException as e:
650+
raise AutomationNodeMisconfiguredService(
651+
f"The node {node.id} has a misconfigured service."
652+
) from e

backend/src/baserow/contrib/automation/nodes/node_types.py

Lines changed: 5 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@
77
from django.utils import timezone
88
from django.utils.translation import gettext as _
99

10-
from baserow.contrib.automation.automation_dispatch_context import (
11-
AutomationDispatchContext,
12-
)
1310
from baserow.contrib.automation.nodes.exceptions import (
1411
AutomationNodeMisconfiguredService,
1512
AutomationNodeNotDeletable,
@@ -53,32 +50,11 @@
5350
)
5451
from baserow.core.db import specific_iterator
5552
from baserow.core.registry import Instance
56-
from baserow.core.services.handler import ServiceHandler
5753
from baserow.core.services.models import Service
5854
from baserow.core.services.registries import service_type_registry
59-
from baserow.core.services.types import DispatchResult
60-
61-
62-
class ServiceDispatchableAutomationNodeTypeMixin:
63-
"""
64-
A mixin for automation node types that can be dispatched using the service.
65-
This mixin provides a default implementation of the `dispatch` method that uses
66-
the `ServiceHandler` to dispatch the service associated with the node.
67-
"""
68-
69-
def dispatch(
70-
self,
71-
automation_node: AutomationNode,
72-
dispatch_context: AutomationDispatchContext,
73-
) -> DispatchResult:
74-
return ServiceHandler().dispatch_service(
75-
automation_node.service.specific, dispatch_context
76-
)
7755

7856

79-
class AutomationNodeActionNodeType(
80-
ServiceDispatchableAutomationNodeTypeMixin, AutomationNodeType
81-
):
57+
class AutomationNodeActionNodeType(AutomationNodeType):
8258
is_workflow_action = True
8359

8460

@@ -238,19 +214,6 @@ class AutomationNodeTriggerType(AutomationNodeType):
238214

239215
is_workflow_trigger = True
240216

241-
def dispatch(
242-
self,
243-
node: AutomationNode,
244-
dispatch_context: AutomationDispatchContext,
245-
) -> DispatchResult:
246-
if dispatch_context.use_sample_data:
247-
if sample_data := node.service.get_type().get_sample_data(
248-
node.service.specific
249-
):
250-
return DispatchResult(**sample_data)
251-
252-
return DispatchResult(data=dispatch_context.event_payload)
253-
254217
def after_register(self):
255218
service_type_registry.get(self.service_type).start_listening(self.on_event)
256219
return super().after_register()
@@ -298,31 +261,14 @@ def on_event(
298261

299262
for trigger in triggers:
300263
workflow = trigger.workflow
301-
simulate_until_node_id = (
302-
trigger.workflow.simulate_until_node.id
303-
if trigger.workflow.simulate_until_node
304-
else None
305-
)
306-
AutomationWorkflowHandler().run_workflow(
264+
265+
AutomationWorkflowHandler().async_start_workflow(
307266
workflow,
308267
event_payload,
309-
simulate_until_node_id,
310268
)
311269

312-
save_sample_data = False
313-
if workflow.allow_test_run_until:
314-
workflow.allow_test_run_until = None
315-
workflow.save(update_fields=["allow_test_run_until"])
316-
save_sample_data = True
317-
318-
if workflow.simulate_until_node and not workflow.is_published:
319-
workflow.simulate_until_node = None
320-
workflow.save(update_fields=["simulate_until_node"])
321-
save_sample_data = True
322-
323-
if save_sample_data:
324-
trigger.service.sample_data = {"data": event_payload}
325-
trigger.service.save()
270+
# We don't want subsequent events to trigger a new test run
271+
AutomationWorkflowHandler().reset_workflow_temporary_states(workflow)
326272

327273

328274
class LocalBaserowRowsCreatedNodeTriggerType(AutomationNodeTriggerType):
@@ -343,32 +289,7 @@ class LocalBaserowRowsDeletedNodeTriggerType(AutomationNodeTriggerType):
343289
service_type = LocalBaserowRowsDeletedServiceType.type
344290

345291

346-
class AutomationNodeImmediateTriggerTypeMixin:
347-
# On a workflow test run, this node type will be immediately dispatched,
348-
# it does not wait for an initial internal or external event to occur.
349-
immediate_dispatch = True
350-
351-
def dispatch(
352-
self,
353-
automation_node: AutomationNode,
354-
dispatch_context: AutomationDispatchContext,
355-
) -> DispatchResult:
356-
"""
357-
Immediate nodes are generally able to generate their own payload so unless
358-
the context has an event payload, we directly try to generate one from
359-
the service.
360-
"""
361-
362-
if dispatch_context.event_payload is not None:
363-
return super().dispatch(automation_node, dispatch_context)
364-
365-
return ServiceHandler().dispatch_service(
366-
automation_node.service.specific, dispatch_context
367-
)
368-
369-
370292
class CorePeriodicTriggerNodeType(
371-
AutomationNodeImmediateTriggerTypeMixin,
372293
AutomationNodeTriggerType,
373294
):
374295
type = "periodic"

backend/src/baserow/contrib/automation/nodes/receivers.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,10 @@
1-
from django.db import transaction
21
from django.db.models.signals import post_delete
3-
from django.dispatch import receiver
42

53
from baserow.contrib.automation.nodes.models import AutomationNode
6-
from baserow.contrib.automation.workflows.handler import AutomationWorkflowHandler
7-
from baserow.contrib.automation.workflows.models import AutomationWorkflow
8-
from baserow.contrib.automation.workflows.signals import automation_workflow_updated
94
from baserow.core.services.handler import ServiceHandler
105
from baserow.core.services.models import Service
116

127

13-
@receiver(automation_workflow_updated)
14-
def on_workflow_updated_test_run_dispatch_immediate_triggers(
15-
sender, workflow: AutomationWorkflow, **kwargs
16-
):
17-
def run_workflow():
18-
if workflow.allow_test_run_until:
19-
trigger = workflow.get_trigger()
20-
# A subset of triggers support immediate test run dispatching, if this
21-
# `node_type` supports it, we'll immediately run the workflow with
22-
# the pre-defined data.
23-
if trigger.get_type().immediate_dispatch:
24-
AutomationWorkflowHandler().run_workflow(workflow, None)
25-
26-
# We need to wait the commit otherwise the celery task workflow instance doesn't
27-
# have the allow_test_run_until property saved
28-
transaction.on_commit(run_workflow)
29-
30-
318
def after_permanently_deleted(sender, instance, **kwargs):
329
"""
3310
Delete the service related to the node.

0 commit comments

Comments
 (0)