Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions backend/src/baserow/api/jobs/serializers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from django.utils.functional import lazy

from drf_spectacular.extensions import OpenApiSerializerExtension
from drf_spectacular.plumbing import force_instance
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
Expand Down Expand Up @@ -42,6 +44,8 @@ class Meta:
"progress_percentage",
"state",
"human_readable_error",
"created_on",
"updated_on",
)
extra_kwargs = {
"id": {"read_only": True},
Expand All @@ -63,9 +67,31 @@ class Meta:
fields = ("user_id", "type")


class JobTypeFiltersSerializer(serializers.Serializer):
"""
Base serializer for job type-specific filters. This serves as the base class
for all job type filter serializers and uses 'type' as a discriminator field.
"""

type = serializers.ChoiceField(
choices=lazy(job_type_registry.get_types, list)(),
required=True,
help_text="The type of job to filter for. Determines which additional filter fields are available.",
)


class ListJobQuerySerializer(serializers.Serializer):
states = serializers.CharField(required=False)
job_ids = serializers.CharField(required=False)
type = serializers.ChoiceField(
choices=lazy(job_type_registry.get_types, list)(),
required=False,
help_text="The type of job to filter for. Determines which additional filter fields are available.",
)
offset = serializers.IntegerField(required=False, min_value=0)
limit = serializers.IntegerField(
required=False, min_value=1, max_value=100, default=20
)

def validate_states(self, value):
if not value:
Expand Down Expand Up @@ -95,3 +121,95 @@ def validate_job_ids(self, value):
f"Job id {job_id} is not a valid integer."
)
return validated_job_ids

def validate(self, attrs):
job_type_name = attrs.get("type")

# Collect type-specific filters in a separate dict
type_filters = {}

if job_type_name:
job_type = job_type_registry.get(job_type_name)
filters_serializer_class = job_type.get_filters_serializer()

if filters_serializer_class:
filters_data = {}

# Add any type-specific fields from initial_data
filters_serializer = filters_serializer_class()

for field_name in filters_serializer.fields.keys():
if field_name in self.initial_data:
filters_data[field_name] = self.initial_data[field_name]

# Validate using the type-specific serializer
filters_serializer = filters_serializer_class(data=filters_data)
if filters_serializer.is_valid():
for field_name, value in filters_serializer.validated_data.items():
# if the field starts with the job_type name to disambiguate
# the query parameter, remove it
field_key = field_name
if field_name.startswith(f"{job_type.type}_"):
field_key = field_name[len(job_type.type) + 1 :]
type_filters[field_key] = value
else:
raise serializers.ValidationError(filters_serializer.errors)

# Add type_filters dict to attrs for easy access in the view
attrs["type_filters"] = type_filters
attrs["job_type_name"] = job_type_name

return attrs


class ListJobQuerySerializerExtension(OpenApiSerializerExtension):
"""
Custom OpenAPI serializer extension that dynamically adds type-specific filter
fields to the ListJobQuerySerializer based on the job registry. This creates a flat
parameter list where type-specific fields appear when the corresponding type is
selected, since it's not possible to use a discriminator in query parameters.
"""

target_class = "baserow.api.jobs.serializers.ListJobQuerySerializer"

def map_serializer(self, auto_schema, direction):
"""
Generate the schema by adding all type-specific fields from job filters
serializers to the base ListJobQuerySerializer properties.
"""

schema = auto_schema._map_serializer(
self.target, direction, bypass_extensions=True
)

properties = schema.get("properties", {})
base_field_names = set(ListJobQuerySerializer().fields.keys())

# Collect all type-specific fields from job registry
for job_type in job_type_registry.get_all():
filters_serializer_class = job_type.get_filters_serializer()
if (
not filters_serializer_class
or filters_serializer_class == JobTypeFiltersSerializer
):
continue

serializer = force_instance(filters_serializer_class)

for field_name, field in serializer.fields.items():
# Skip base fields and the type field
if field_name in base_field_names or field_name == "type":
continue

field_schema = auto_schema._map_serializer_field(field, direction)

help_text = field_schema.get("description", "")
field_schema[
"description"
] = f"**[Only for type='{job_type.type}']** {help_text}"

if field_name not in properties:
properties[field_name] = field_schema

schema["properties"] = properties
return schema
53 changes: 25 additions & 28 deletions backend/src/baserow/api/jobs/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)
from baserow.api.schemas import get_error_schema
from baserow.api.utils import DiscriminatorCustomFieldsMappingSerializer
from baserow.core.db import specific_iterator
from baserow.core.jobs.exceptions import (
JobDoesNotExist,
JobNotCancellable,
Expand All @@ -33,30 +34,15 @@ class JobsView(APIView):
permission_classes = (IsAuthenticated,)

@extend_schema(
parameters=[
OpenApiParameter(
name="states",
location=OpenApiParameter.QUERY,
type=OpenApiTypes.STR,
description="A comma separated list of jobs state to look for. "
"The only possible values are: `pending`, `finished`, `failed` and `cancelled`. "
"It's possible to exclude a state by prefixing it with a `!`. ",
),
OpenApiParameter(
name="job_ids",
location=OpenApiParameter.QUERY,
type=OpenApiTypes.STR,
description="A comma separated list of job ids in the desired order."
"The jobs will be returned in the same order as the ids."
"If a job id is not found it will be ignored.",
),
],
parameters=[ListJobQuerySerializer],
tags=["Jobs"],
operation_id="list_job",
description=(
"List all existing jobs. Jobs are task executed asynchronously in the "
"background. You can use the `get_job` endpoint to read the current"
"progress of a the job."
"background. You can use the `get_job` endpoint to read the current "
"progress of the job. The available query parameters depend on the job type "
"selected via the `type` parameter. Each job type may support additional "
"type-specific filter parameters."
),
responses={
200: DiscriminatorCustomFieldsMappingSerializer(
Expand All @@ -68,22 +54,33 @@ class JobsView(APIView):
def get(self, request, query_params):
states = query_params.get("states", None)
job_ids = query_params.get("job_ids", None)
offset = query_params.get("offset", 0)
limit = query_params.get("limit", 20)

# Get job type and filters from the validated data
job_type_name = query_params.get("job_type_name", None)
type_filters = query_params.get("type_filters", {})

base_model = None
if job_type_name:
job_type = job_type_registry.get(job_type_name)
base_model = job_type.model_class

jobs = JobHandler.get_jobs_for_user(
request.user, filter_states=states, filter_ids=job_ids
)
request.user,
filter_states=states,
filter_ids=job_ids,
base_model=base_model,
type_filters=type_filters if type_filters else None,
)[offset : offset + limit]

# FIXME: job.specific makes a query for each job to get the specific instance.
# As long as we have max_count=1 for each job type, there's not much we can do,
# but this should be optimized in the future if we allow multiple jobs of the
# same type.
serialized_jobs = [
job_type_registry.get_serializer(
job.specific,
job,
JobSerializer,
context={"request": request},
).data
for job in jobs
for job in specific_iterator(jobs)
]
return Response({"jobs": serialized_jobs})

Expand Down
6 changes: 4 additions & 2 deletions backend/src/baserow/contrib/database/table/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,10 @@ def list_workspace_tables(

table_qs = base_queryset if base_queryset else Table.objects.all()

table_qs = table_qs.filter(database__workspace=workspace).select_related(
"database__workspace", "data_sync"
table_qs = (
table_qs.filter(database__workspace=workspace)
.select_related("database__workspace", "data_sync")
.order_by("database_id", "order", "id")
)

if not include_trashed:
Expand Down
15 changes: 13 additions & 2 deletions backend/src/baserow/core/jobs/handler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from datetime import datetime, timedelta, timezone
from typing import List, Optional, Type
from typing import Any, Dict, List, Optional, Type

from django.conf import settings
from django.contrib.auth.models import AbstractUser
Expand Down Expand Up @@ -112,6 +112,8 @@ def get_jobs_for_user(
user: AbstractUser,
filter_states: Optional[List[str]],
filter_ids: Optional[List[int]],
base_model: Optional[Type[AnyJob]] = None,
type_filters: Optional[Dict[str, Any]] = None,
) -> QuerySet:
"""
Returns all jobs belonging to the specified user.
Expand All @@ -120,9 +122,15 @@ def get_jobs_for_user(
:param filter_states: A list of states that the jobs should have, or not
have if prefixed with a !.
:param filter_ids: A list of specific job ids to return.
:param base_model: An optional Job model.
:param type_filters: Optional type-specific filters (e.g., field_id for
GenerateAIValuesJob).
:return: A QuerySet with the filtered jobs for the user.
"""

if base_model is None:
base_model = Job

def get_job_states_filter(states):
states_q = Q()
for state in states:
Expand All @@ -132,14 +140,17 @@ def get_job_states_filter(states):
states_q |= Q(state=state)
return states_q

queryset = Job.objects.filter(user=user).order_by("-updated_on")
queryset = base_model.objects.filter(user=user).order_by("-id")

if filter_states:
queryset = queryset.filter(get_job_states_filter(filter_states))

if filter_ids:
queryset = queryset.filter(id__in=filter_ids)

if type_filters:
queryset = queryset.filter(**type_filters)

return queryset.select_related("content_type")

@classmethod
Expand Down
16 changes: 15 additions & 1 deletion backend/src/baserow/core/jobs/registries.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Dict
from typing import Any, Dict, Type

from django.contrib.auth.models import AbstractUser

Expand Down Expand Up @@ -164,6 +164,20 @@ def response_serializer_class(self):
meta_ref_name=f"{self.__class__.__name__}ResponseSerializer",
)

def get_filters_serializer(self) -> Type[serializers.Serializer] | None:
"""
This method enables job types to define custom filters for job listing
operations. Since query parameters cannot utilize Discriminator fields and must
be flattened, all filter field names should be prefixed with the job type name
followed by an underscore to prevent naming conflicts between different job
types.

:return: A serializer class extending JobTypeFiltersSerializer, or None if no
type-specific filters are needed.
"""

return None


class JobTypeRegistry(
CustomFieldsRegistryMixin,
Expand Down
17 changes: 17 additions & 0 deletions backend/src/baserow/test_utils/fixtures/job.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Type

from rest_framework import serializers
from rest_framework.status import HTTP_404_NOT_FOUND

Expand All @@ -16,6 +18,16 @@ class TestException(Exception):
...


class TmpJobType1FiltersSerializer(serializers.Serializer):
"""Just for testing: expose a filter on progress_percentage"""

tmp_job_type_1_progress_percentage = serializers.IntegerField(
min_value=0,
required=False,
help_text="Filter by the progress percentage.",
)


class TmpJobType1(JobType):
type = "tmp_job_type_1"

Expand Down Expand Up @@ -51,6 +63,11 @@ def prepare_values(self, values, user):
def run(self, job, progress):
pass

def get_filters_serializer(self) -> Type[serializers.Serializer] | None:
"""Returns the filters serializer for this job type."""

return TmpJobType1FiltersSerializer


class TmpJobType2(JobType):
type = "tmp_job_type_2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def test_exporting_empty_workspace(
job_id = response_json["id"]
assert response_json == {
"created_on": run_time,
"updated_on": run_time,
"exported_file_name": None,
"human_readable_error": "",
"id": job_id,
Expand Down Expand Up @@ -200,6 +201,7 @@ def test_exporting_workspace_with_single_empty_database(
job_id = response_json["id"]
assert response_json == {
"created_on": run_time,
"updated_on": run_time,
"exported_file_name": None,
"human_readable_error": "",
"id": job_id,
Expand Down
Loading
Loading