Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 142 additions & 98 deletions apps/trigger/serializers/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,84 +101,23 @@ def validate_input_field_list(self, value):
return value


class TriggerTaskCreateRequest(serializers.Serializer):
source_type = serializers.ChoiceField(required=True, choices=TriggerTaskTypeChoices)
source_id = serializers.CharField(required=True, label=_('source_id'))
is_active = serializers.BooleanField(required=False, label=_('Is active'))
meta = serializers.DictField(default=dict, required=False)
parameter = serializers.DictField(default=dict, required=False)
class TriggerValidationMixin:

def validate(self, attrs):
source_type = attrs.get('source_type')
parameter = attrs.get('parameter')
if source_type == TriggerTaskTypeChoices.APPLICATION:
serializer = ApplicationTaskParameterSerializer(data=parameter)
serializer.is_valid(raise_exception=True)
attrs['parameter'] = serializer.validated_data
if source_type == TriggerTaskTypeChoices.TOOL:
serializer = ToolTaskParameterSerializer(data=parameter)
serializer.is_valid(raise_exception=True)
attrs['parameter'] = serializer.validated_data

return attrs


class TriggerTaskEditRequest(serializers.Serializer):
source_type = serializers.ChoiceField(required=False, choices=TriggerTaskTypeChoices)
source_id = serializers.CharField(required=False, label=_('source_id'))
is_active = serializers.BooleanField(required=False, label=_('Is active'))
meta = serializers.DictField(default=dict, required=False)
parameter = serializers.DictField(default=dict, required=False)

def validate(self, attrs):
source_type = attrs.get('source_type')
parameter = attrs.get('parameter')
if source_type == TriggerTaskTypeChoices.APPLICATION:
serializer = ApplicationTaskParameterSerializer(data=parameter)
serializer.is_valid(raise_exception=True)
attrs['parameter'] = serializer.validated_data
if source_type == TriggerTaskTypeChoices.TOOL:
serializer = ToolTaskParameterSerializer(data=parameter)
serializer.is_valid(raise_exception=True)
attrs['parameter'] = serializer.validated_data
# trigger_setting 校验
trigger_type = attrs.get('trigger_type')
trigger_setting = attrs.get('trigger_setting')

if trigger_type and trigger_setting:
if trigger_type == TriggerTypeChoices.SCHEDULED:
self._validate_scheduled_setting(trigger_setting)
elif trigger_type == TriggerTypeChoices.EVENT:
self._validate_event_setting(trigger_setting)
else:
raise AppApiException(500, _('Error trigger type'))

return attrs


class TriggerEditRequest(serializers.Serializer):
name = serializers.CharField(required=False, label=_('trigger name'))
desc = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('trigger description'))
trigger_type = serializers.ChoiceField(required=False, choices=TriggerTypeChoices)
trigger_setting = serializers.DictField(required=False, label=_("trigger setting"))
meta = serializers.DictField(default=dict, required=False)
trigger_task = TriggerTaskEditRequest(many=True, required=False)


class TriggerCreateRequest(serializers.Serializer):
id = serializers.UUIDField(required=True, label=_("Trigger ID"))
name = serializers.CharField(required=True, label=_('trigger name'))
desc = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('trigger description'))
trigger_type = serializers.ChoiceField(required=True, choices=TriggerTypeChoices)
trigger_setting = serializers.DictField(required=True, label=_("trigger setting"))
meta = serializers.DictField(default=dict, required=False)
is_active = serializers.BooleanField(required=False, label=_('Is active'))
trigger_task = TriggerTaskCreateRequest(many=True)

def is_valid(self, *, raise_exception=False):
super().is_valid(raise_exception=True)
trigger_type = self.data.get('trigger_type')
trigger_setting = self.data.get('trigger_setting', {})

if trigger_type == TriggerTypeChoices.SCHEDULED:
self._validate_scheduled_setting(trigger_setting)

elif trigger_type == TriggerTypeChoices.EVENT:
self._validate_event_setting(trigger_setting)
else:
raise AppApiException(500, _('Error trigger type'))

return True

@staticmethod
def _validate_required_field(setting, field_name, trigger_type):
if field_name not in setting:
Expand Down Expand Up @@ -288,6 +227,70 @@ def _validate_event_setting(setting):
})


class TriggerTaskCreateRequest(serializers.Serializer):
source_type = serializers.ChoiceField(required=True, choices=TriggerTaskTypeChoices)
source_id = serializers.CharField(required=True, label=_('source_id'))
is_active = serializers.BooleanField(required=False, label=_('Is active'))
meta = serializers.DictField(default=dict, required=False)
parameter = serializers.DictField(default=dict, required=False)

def validate(self, attrs):
source_type = attrs.get('source_type')
parameter = attrs.get('parameter')
if source_type == TriggerTaskTypeChoices.APPLICATION:
serializer = ApplicationTaskParameterSerializer(data=parameter)
serializer.is_valid(raise_exception=True)
attrs['parameter'] = serializer.validated_data
if source_type == TriggerTaskTypeChoices.TOOL:
serializer = ToolTaskParameterSerializer(data=parameter)
serializer.is_valid(raise_exception=True)
attrs['parameter'] = serializer.validated_data

return attrs


class TriggerTaskEditRequest(serializers.Serializer):
source_type = serializers.ChoiceField(required=False, choices=TriggerTaskTypeChoices)
source_id = serializers.CharField(required=False, label=_('source_id'))
is_active = serializers.BooleanField(required=False, label=_('Is active'))
meta = serializers.DictField(default=dict, required=False)
parameter = serializers.DictField(default=dict, required=False)

def validate(self, attrs):
source_type = attrs.get('source_type')
parameter = attrs.get('parameter')
if source_type == TriggerTaskTypeChoices.APPLICATION:
serializer = ApplicationTaskParameterSerializer(data=parameter)
serializer.is_valid(raise_exception=True)
attrs['parameter'] = serializer.validated_data
if source_type == TriggerTaskTypeChoices.TOOL:
serializer = ToolTaskParameterSerializer(data=parameter)
serializer.is_valid(raise_exception=True)
attrs['parameter'] = serializer.validated_data

return attrs


class TriggerEditRequest(TriggerValidationMixin, serializers.Serializer):
name = serializers.CharField(required=False, label=_('trigger name'))
desc = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('trigger description'))
trigger_type = serializers.ChoiceField(required=False, choices=TriggerTypeChoices)
trigger_setting = serializers.DictField(required=False, label=_("trigger setting"))
meta = serializers.DictField(default=dict, required=False)
trigger_task = TriggerTaskEditRequest(many=True, required=False)


class TriggerCreateRequest(TriggerValidationMixin, serializers.Serializer):
id = serializers.UUIDField(required=True, label=_("Trigger ID"))
name = serializers.CharField(required=True, label=_('trigger name'))
desc = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('trigger description'))
trigger_type = serializers.ChoiceField(required=True, choices=TriggerTypeChoices)
trigger_setting = serializers.DictField(required=True, label=_("trigger setting"))
meta = serializers.DictField(default=dict, required=False)
is_active = serializers.BooleanField(required=False, label=_('Is active'))
trigger_task = TriggerTaskCreateRequest(many=True)


class TriggerModelSerializer(serializers.ModelSerializer):
class Meta:
model = Trigger
Expand Down Expand Up @@ -347,29 +350,60 @@ def insert(self, instance, with_valid=True):

trigger_tasks = valid_data.get('trigger_task')
if trigger_tasks:

is_active_map = self.batch_get_source_active_status(trigger_tasks)

trigger_task_models = [
self.to_trigger_task_model(trigger_id, task) for task in
trigger_tasks
TriggerTask(
id=uuid.uuid7(),
trigger_id=trigger_id,
source_type=task_data.get('source_type'),
source_id=task_data.get('source_id'),
is_active=is_active_map.get((task_data.get('source_type'), task_data.get('source_id'))) or False,
parameter=task_data.get('parameter', {}),
meta=task_data.get('meta', {})
)
for task_data in trigger_tasks
]

TriggerTask.objects.bulk_create(trigger_task_models)
else:
raise AppApiException(500, _('Trigger task can not be empty'))

return TriggerResponse(trigger_model).data

def to_trigger_task_model(self, trigger_id: str, task_data: dict):
source_type = task_data.get('source_type')
source_id = task_data.get('source_id')
is_active = self.is_active_source(source_type, source_id)
return TriggerTask(
id=uuid.uuid7(),
trigger_id=trigger_id,
source_type=source_type,
source_id=source_id,
is_active=is_active,
parameter=task_data.get('parameter', {}),
meta=task_data.get('meta', {})
)
@staticmethod
def batch_get_source_active_status(trigger_tasks: list) -> Dict[tuple, bool]:
"""
批量查询所有 source 的 is_active 状态
返回: {(source_type, source_id): is_active}
"""
config = {
TriggerTaskTypeChoices.APPLICATION: (Application, 'is_publish'),
TriggerTaskTypeChoices.TOOL: (Tool, 'is_active'),
}
source_ids_by_type = {}

for task_data in trigger_tasks:
source_type = task_data.get('source_type')
source_id = task_data.get('source_id')

if source_type not in config:
raise AppApiException(500, _('Error source type'))

if source_type not in source_ids_by_type:
source_ids_by_type[source_type] = []
source_ids_by_type[source_type].append(source_id)

is_active_map = {}
for source_type, source_ids in source_ids_by_type.items():
source_model, field = config[source_type]
source_query_set = QuerySet(source_model).filter(id__in=source_ids).values('id', field)

for source in source_query_set:
is_active_map[(source_type, str(source['id']))] = source[field]

return is_active_map

@staticmethod
def is_active_source(source_type: str, source_id: str):
Expand Down Expand Up @@ -459,36 +493,46 @@ def edit(self, instance: Dict, with_valid=True):
self.is_valid()
TriggerEditRequest(data=instance).is_valid(raise_exception=True)
trigger_id = self.data.get('trigger_id')
trigger = Trigger.objects.filter(workspace_id=self.data.get('workspace_id'), id=trigger_id).first()
workspace_id = self.data.get('workspace_id')
trigger = Trigger.objects.filter(workspace_id=workspace_id, id=trigger_id).first()
if not trigger:
raise serializers.ValidationError(_('Trigger not found'))

trigger_edit_field_list = ['name', 'desc', 'trigger_type', 'trigger_setting', 'meta', 'is_active']
trigger_direct_edit_field_list = ['name', 'desc', 'trigger_type', 'trigger_setting', 'meta', 'is_active']
trigger_deploy_edit_field_list = ['trigger_type', 'trigger_setting', 'is_active']
# is need to redeploy
need_redeploy = any(field in instance for field in trigger_deploy_edit_field_list)

for field in trigger_edit_field_list:
for field in trigger_direct_edit_field_list:
if field in instance:
trigger.__setattr__(field, instance.get(field))
trigger.save()
# 处理trigger task
trigger_tasks = instance.get('trigger_task')
if trigger_tasks:
TriggerTask.objects.filter(trigger_id=trigger_id).delete()

if trigger_tasks is not None:
is_active_map = TriggerSerializer.batch_get_source_active_status(trigger_tasks)

trigger_task_model_list = [TriggerTask(
id=task_data.get('id') or uuid.uuid7(),
trigger_id=trigger_id,
source_type=task_data.get('source_type'),
source_id=task_data.get('source_id'),
is_active=task_data.get('is_active', False),
is_active=is_active_map.get((task_data.get('source_type'), task_data.get('source_id'))) or False,
parameter=task_data.get('parameter', []),
meta=task_data.get('meta', {})
) for task_data in trigger_tasks]

TriggerTask.objects.filter(trigger_id=trigger_id).delete()

TriggerTask.objects.bulk_create(trigger_task_model_list)

# 重新部署触发器任务
if trigger.is_active:
deploy(TriggerModelSerializer(trigger).data, **{})
else:
undeploy(TriggerModelSerializer(trigger).data, **{})
if need_redeploy:
if trigger.is_active:
deploy(TriggerModelSerializer(trigger).data, **{})
else:
undeploy(TriggerModelSerializer(trigger).data, **{})

return self.one(with_valid=False)

Expand Down