From 6e7fe6e57b03561a6417c6800287acea8730f62c Mon Sep 17 00:00:00 2001 From: zhangzhanwei Date: Tue, 27 Jan 2026 15:24:07 +0800 Subject: [PATCH] feat: Trigger task status --- apps/trigger/serializers/trigger.py | 240 ++++++++++++++++------------ 1 file changed, 142 insertions(+), 98 deletions(-) diff --git a/apps/trigger/serializers/trigger.py b/apps/trigger/serializers/trigger.py index 6dc63cc644d..17bf7605bef 100644 --- a/apps/trigger/serializers/trigger.py +++ b/apps/trigger/serializers/trigger.py @@ -101,84 +101,23 @@ def validate_input_field_list(self, value): return value -class TriggerTaskCreateRequest(serializers.Serializer): - source_type = serializers.ChoiceField(required=True, choices=TriggerTaskTypeChoices) - source_id = serializers.CharField(required=True, label=_('source_id')) - is_active = serializers.BooleanField(required=False, label=_('Is active')) - meta = serializers.DictField(default=dict, required=False) - parameter = serializers.DictField(default=dict, required=False) +class TriggerValidationMixin: def validate(self, attrs): - source_type = attrs.get('source_type') - parameter = attrs.get('parameter') - if source_type == TriggerTaskTypeChoices.APPLICATION: - serializer = ApplicationTaskParameterSerializer(data=parameter) - serializer.is_valid(raise_exception=True) - attrs['parameter'] = serializer.validated_data - if source_type == TriggerTaskTypeChoices.TOOL: - serializer = ToolTaskParameterSerializer(data=parameter) - serializer.is_valid(raise_exception=True) - attrs['parameter'] = serializer.validated_data - - return attrs - - -class TriggerTaskEditRequest(serializers.Serializer): - source_type = serializers.ChoiceField(required=False, choices=TriggerTaskTypeChoices) - source_id = serializers.CharField(required=False, label=_('source_id')) - is_active = serializers.BooleanField(required=False, label=_('Is active')) - meta = serializers.DictField(default=dict, required=False) - parameter = serializers.DictField(default=dict, required=False) - - def validate(self, attrs): - source_type = attrs.get('source_type') - parameter = attrs.get('parameter') - if source_type == TriggerTaskTypeChoices.APPLICATION: - serializer = ApplicationTaskParameterSerializer(data=parameter) - serializer.is_valid(raise_exception=True) - attrs['parameter'] = serializer.validated_data - if source_type == TriggerTaskTypeChoices.TOOL: - serializer = ToolTaskParameterSerializer(data=parameter) - serializer.is_valid(raise_exception=True) - attrs['parameter'] = serializer.validated_data + # trigger_setting 校验 + trigger_type = attrs.get('trigger_type') + trigger_setting = attrs.get('trigger_setting') + + if trigger_type and trigger_setting: + if trigger_type == TriggerTypeChoices.SCHEDULED: + self._validate_scheduled_setting(trigger_setting) + elif trigger_type == TriggerTypeChoices.EVENT: + self._validate_event_setting(trigger_setting) + else: + raise AppApiException(500, _('Error trigger type')) return attrs - -class TriggerEditRequest(serializers.Serializer): - name = serializers.CharField(required=False, label=_('trigger name')) - desc = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('trigger description')) - trigger_type = serializers.ChoiceField(required=False, choices=TriggerTypeChoices) - trigger_setting = serializers.DictField(required=False, label=_("trigger setting")) - meta = serializers.DictField(default=dict, required=False) - trigger_task = TriggerTaskEditRequest(many=True, required=False) - - -class TriggerCreateRequest(serializers.Serializer): - id = serializers.UUIDField(required=True, label=_("Trigger ID")) - name = serializers.CharField(required=True, label=_('trigger name')) - desc = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('trigger description')) - trigger_type = serializers.ChoiceField(required=True, choices=TriggerTypeChoices) - trigger_setting = serializers.DictField(required=True, label=_("trigger setting")) - meta = serializers.DictField(default=dict, required=False) - is_active = serializers.BooleanField(required=False, label=_('Is active')) - trigger_task = TriggerTaskCreateRequest(many=True) - - def is_valid(self, *, raise_exception=False): - super().is_valid(raise_exception=True) - trigger_type = self.data.get('trigger_type') - trigger_setting = self.data.get('trigger_setting', {}) - - if trigger_type == TriggerTypeChoices.SCHEDULED: - self._validate_scheduled_setting(trigger_setting) - - elif trigger_type == TriggerTypeChoices.EVENT: - self._validate_event_setting(trigger_setting) - else: - raise AppApiException(500, _('Error trigger type')) - - return True - @staticmethod def _validate_required_field(setting, field_name, trigger_type): if field_name not in setting: @@ -288,6 +227,70 @@ def _validate_event_setting(setting): }) +class TriggerTaskCreateRequest(serializers.Serializer): + source_type = serializers.ChoiceField(required=True, choices=TriggerTaskTypeChoices) + source_id = serializers.CharField(required=True, label=_('source_id')) + is_active = serializers.BooleanField(required=False, label=_('Is active')) + meta = serializers.DictField(default=dict, required=False) + parameter = serializers.DictField(default=dict, required=False) + + def validate(self, attrs): + source_type = attrs.get('source_type') + parameter = attrs.get('parameter') + if source_type == TriggerTaskTypeChoices.APPLICATION: + serializer = ApplicationTaskParameterSerializer(data=parameter) + serializer.is_valid(raise_exception=True) + attrs['parameter'] = serializer.validated_data + if source_type == TriggerTaskTypeChoices.TOOL: + serializer = ToolTaskParameterSerializer(data=parameter) + serializer.is_valid(raise_exception=True) + attrs['parameter'] = serializer.validated_data + + return attrs + + +class TriggerTaskEditRequest(serializers.Serializer): + source_type = serializers.ChoiceField(required=False, choices=TriggerTaskTypeChoices) + source_id = serializers.CharField(required=False, label=_('source_id')) + is_active = serializers.BooleanField(required=False, label=_('Is active')) + meta = serializers.DictField(default=dict, required=False) + parameter = serializers.DictField(default=dict, required=False) + + def validate(self, attrs): + source_type = attrs.get('source_type') + parameter = attrs.get('parameter') + if source_type == TriggerTaskTypeChoices.APPLICATION: + serializer = ApplicationTaskParameterSerializer(data=parameter) + serializer.is_valid(raise_exception=True) + attrs['parameter'] = serializer.validated_data + if source_type == TriggerTaskTypeChoices.TOOL: + serializer = ToolTaskParameterSerializer(data=parameter) + serializer.is_valid(raise_exception=True) + attrs['parameter'] = serializer.validated_data + + return attrs + + +class TriggerEditRequest(TriggerValidationMixin, serializers.Serializer): + name = serializers.CharField(required=False, label=_('trigger name')) + desc = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('trigger description')) + trigger_type = serializers.ChoiceField(required=False, choices=TriggerTypeChoices) + trigger_setting = serializers.DictField(required=False, label=_("trigger setting")) + meta = serializers.DictField(default=dict, required=False) + trigger_task = TriggerTaskEditRequest(many=True, required=False) + + +class TriggerCreateRequest(TriggerValidationMixin, serializers.Serializer): + id = serializers.UUIDField(required=True, label=_("Trigger ID")) + name = serializers.CharField(required=True, label=_('trigger name')) + desc = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('trigger description')) + trigger_type = serializers.ChoiceField(required=True, choices=TriggerTypeChoices) + trigger_setting = serializers.DictField(required=True, label=_("trigger setting")) + meta = serializers.DictField(default=dict, required=False) + is_active = serializers.BooleanField(required=False, label=_('Is active')) + trigger_task = TriggerTaskCreateRequest(many=True) + + class TriggerModelSerializer(serializers.ModelSerializer): class Meta: model = Trigger @@ -347,29 +350,60 @@ def insert(self, instance, with_valid=True): trigger_tasks = valid_data.get('trigger_task') if trigger_tasks: + + is_active_map = self.batch_get_source_active_status(trigger_tasks) + trigger_task_models = [ - self.to_trigger_task_model(trigger_id, task) for task in - trigger_tasks + TriggerTask( + id=uuid.uuid7(), + trigger_id=trigger_id, + source_type=task_data.get('source_type'), + source_id=task_data.get('source_id'), + is_active=is_active_map.get((task_data.get('source_type'), task_data.get('source_id'))) or False, + parameter=task_data.get('parameter', {}), + meta=task_data.get('meta', {}) + ) + for task_data in trigger_tasks ] + TriggerTask.objects.bulk_create(trigger_task_models) else: raise AppApiException(500, _('Trigger task can not be empty')) return TriggerResponse(trigger_model).data - def to_trigger_task_model(self, trigger_id: str, task_data: dict): - source_type = task_data.get('source_type') - source_id = task_data.get('source_id') - is_active = self.is_active_source(source_type, source_id) - return TriggerTask( - id=uuid.uuid7(), - trigger_id=trigger_id, - source_type=source_type, - source_id=source_id, - is_active=is_active, - parameter=task_data.get('parameter', {}), - meta=task_data.get('meta', {}) - ) + @staticmethod + def batch_get_source_active_status(trigger_tasks: list) -> Dict[tuple, bool]: + """ + 批量查询所有 source 的 is_active 状态 + 返回: {(source_type, source_id): is_active} + """ + config = { + TriggerTaskTypeChoices.APPLICATION: (Application, 'is_publish'), + TriggerTaskTypeChoices.TOOL: (Tool, 'is_active'), + } + source_ids_by_type = {} + + for task_data in trigger_tasks: + source_type = task_data.get('source_type') + source_id = task_data.get('source_id') + + if source_type not in config: + raise AppApiException(500, _('Error source type')) + + if source_type not in source_ids_by_type: + source_ids_by_type[source_type] = [] + source_ids_by_type[source_type].append(source_id) + + is_active_map = {} + for source_type, source_ids in source_ids_by_type.items(): + source_model, field = config[source_type] + source_query_set = QuerySet(source_model).filter(id__in=source_ids).values('id', field) + + for source in source_query_set: + is_active_map[(source_type, str(source['id']))] = source[field] + + return is_active_map @staticmethod def is_active_source(source_type: str, source_id: str): @@ -459,36 +493,46 @@ def edit(self, instance: Dict, with_valid=True): self.is_valid() TriggerEditRequest(data=instance).is_valid(raise_exception=True) trigger_id = self.data.get('trigger_id') - trigger = Trigger.objects.filter(workspace_id=self.data.get('workspace_id'), id=trigger_id).first() + workspace_id = self.data.get('workspace_id') + trigger = Trigger.objects.filter(workspace_id=workspace_id, id=trigger_id).first() if not trigger: raise serializers.ValidationError(_('Trigger not found')) - trigger_edit_field_list = ['name', 'desc', 'trigger_type', 'trigger_setting', 'meta', 'is_active'] + trigger_direct_edit_field_list = ['name', 'desc', 'trigger_type', 'trigger_setting', 'meta', 'is_active'] + trigger_deploy_edit_field_list = ['trigger_type', 'trigger_setting', 'is_active'] + # is need to redeploy + need_redeploy = any(field in instance for field in trigger_deploy_edit_field_list) - for field in trigger_edit_field_list: + for field in trigger_direct_edit_field_list: if field in instance: trigger.__setattr__(field, instance.get(field)) trigger.save() # 处理trigger task trigger_tasks = instance.get('trigger_task') - if trigger_tasks: - TriggerTask.objects.filter(trigger_id=trigger_id).delete() + + if trigger_tasks is not None: + is_active_map = TriggerSerializer.batch_get_source_active_status(trigger_tasks) + trigger_task_model_list = [TriggerTask( id=task_data.get('id') or uuid.uuid7(), trigger_id=trigger_id, source_type=task_data.get('source_type'), source_id=task_data.get('source_id'), - is_active=task_data.get('is_active', False), + is_active=is_active_map.get((task_data.get('source_type'), task_data.get('source_id'))) or False, parameter=task_data.get('parameter', []), meta=task_data.get('meta', {}) ) for task_data in trigger_tasks] + + TriggerTask.objects.filter(trigger_id=trigger_id).delete() + TriggerTask.objects.bulk_create(trigger_task_model_list) # 重新部署触发器任务 - if trigger.is_active: - deploy(TriggerModelSerializer(trigger).data, **{}) - else: - undeploy(TriggerModelSerializer(trigger).data, **{}) + if need_redeploy: + if trigger.is_active: + deploy(TriggerModelSerializer(trigger).data, **{}) + else: + undeploy(TriggerModelSerializer(trigger).data, **{}) return self.one(with_valid=False)