From bd072d12929f4663d71da5ea3227915d0eba3314 Mon Sep 17 00:00:00 2001 From: Cezary Date: Wed, 3 Dec 2025 21:18:21 +0100 Subject: [PATCH 1/2] Run AI values generation in paralllel #4227 (#4263) run AI field values generation in parallel --- .env.example | 1 + ...7_run_ai_field_generation_in_parallel.json | 9 + docker-compose.yml | 1 + docs/installation/configuration.md | 1 + .../src/baserow_premium/api/fields/views.py | 1 - .../config/settings/settings.py | 7 + .../src/baserow_premium/fields/job_types.py | 474 ++++++++++++++---- .../src/baserow_premium/fields/models.py | 9 + .../baserow_premium/generative_ai/managers.py | 2 +- .../test_generate_ai_values_job_type.py | 5 +- .../test_ai_parallel_execution.py | 124 +++++ .../generative_ai/test_managers.py | 12 +- 12 files changed, 538 insertions(+), 108 deletions(-) create mode 100644 changelog/entries/unreleased/feature/4227_run_ai_field_generation_in_parallel.json create mode 100644 premium/backend/tests/baserow_premium_tests/generative_ai/test_ai_parallel_execution.py diff --git a/.env.example b/.env.example index 76f11bcd45..ec9813293c 100644 --- a/.env.example +++ b/.env.example @@ -60,6 +60,7 @@ DATABASE_NAME=baserow # BASEROW_OPENAI_UPLOADED_FILE_SIZE_LIMIT_MB= # BASEROW_MAX_IMPORT_FILE_SIZE_MB= # BASEROW_UNIQUE_ROW_VALUES_SIZE_LIMIT= +# BASEROW_AI_FIELD_MAX_CONCURRENT_GENERATIONS= # BASEROW_AUTOMATION_HISTORY_PAGE_SIZE_LIMIT= # BASEROW_AUTOMATION_WORKFLOW_RATE_LIMIT_MAX_RUNS= diff --git a/changelog/entries/unreleased/feature/4227_run_ai_field_generation_in_parallel.json b/changelog/entries/unreleased/feature/4227_run_ai_field_generation_in_parallel.json new file mode 100644 index 0000000000..c86f98a153 --- /dev/null +++ b/changelog/entries/unreleased/feature/4227_run_ai_field_generation_in_parallel.json @@ -0,0 +1,9 @@ +{ + "type": "feature", + "message": "Run AI field generation in parallel", + "issue_origin": "github", + "issue_number": 4227, + "domain": "database", + "bullet_points": [], + "created_at": "2025-11-20" +} \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 4ca33e3bc6..6aab6ad511 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -211,6 +211,7 @@ x-backend-variables: BASEROW_MISTRAL_MODELS: BASEROW_OLLAMA_HOST: BASEROW_OLLAMA_MODELS: + BASEROW_AI_FIELD_MAX_CONCURRENT_GENERATIONS: BASEROW_SERVE_FILES_THROUGH_BACKEND: BASEROW_SERVE_FILES_THROUGH_BACKEND_PERMISSION: BASEROW_SERVE_FILES_THROUGH_BACKEND_EXPIRE_SECONDS: diff --git a/docs/installation/configuration.md b/docs/installation/configuration.md index 76f0f07cc5..d52e8ed27f 100644 --- a/docs/installation/configuration.md +++ b/docs/installation/configuration.md @@ -148,6 +148,7 @@ The installation methods referred to in the variable descriptions are: | BASEROW\_MISTRAL\_MODELS | Provide a comma separated list of Mistral models (https://docs.mistral.ai/getting-started/models/models_overview/) that you would like to enable in the instance (e.g. `mistral-large-latest,mistral-small-latest`). Note that this only works if an Mistral API key is set. If this variable is not provided, the user won't be able to choose a model. | | | BASEROW\_OLLAMA\_HOST | Provide an OLLAMA host to allow using OLLAMA for generative AI features like the AI field. | | | BASEROW\_OLLAMA\_MODELS | Provide a comma separated list of Ollama models (https://ollama.com/library) that you would like to enable in the instance (e.g. `llama2`). Note that this only works if an Ollama host is set. If this variable is not provided, the user won't be able to choose a model. | | +| BASEROW\_AI\_FIELD\_MAX\_CONCURRENT\_GENERATIONS | If AI field values are recalculated in a large number (i.e. recalculating whole table, empty rows, or a selection of rows), this controls the number of concurrent requests issued to AI model to generate values. | 5 | ### Backend Misc Configuration | Name | Description | Defaults | diff --git a/premium/backend/src/baserow_premium/api/fields/views.py b/premium/backend/src/baserow_premium/api/fields/views.py index e47d2113e7..9465ae40f7 100644 --- a/premium/backend/src/baserow_premium/api/fields/views.py +++ b/premium/backend/src/baserow_premium/api/fields/views.py @@ -129,7 +129,6 @@ def post(self, request: Request, field_id: int, data) -> Response: context=ai_field.table, ) - GenerateAIValuesJobType().get_valid_generative_ai_model_type_or_raise(ai_field) job = JobHandler().create_and_start_job( request.user, GenerateAIValuesJobType.type, diff --git a/premium/backend/src/baserow_premium/config/settings/settings.py b/premium/backend/src/baserow_premium/config/settings/settings.py index 104e7505cd..8066f325b2 100644 --- a/premium/backend/src/baserow_premium/config/settings/settings.py +++ b/premium/backend/src/baserow_premium/config/settings/settings.py @@ -2,6 +2,8 @@ from django.core.exceptions import ImproperlyConfigured +from baserow.config.settings.utils import try_int + def setup(settings): """ @@ -33,3 +35,8 @@ def setup(settings): settings.BASEROW_PREMIUM_GROUPED_AGGREGATE_SERVICE_MAX_AGG_BUCKETS = ( BASEROW_PREMIUM_GROUPED_AGGREGATE_SERVICE_MAX_AGG_BUCKETS ) + + # Used to limit thread pool size for running AI field generation in parallel + settings.BASEROW_AI_FIELD_MAX_CONCURRENT_GENERATIONS = try_int( + os.getenv("BASEROW_AI_FIELD_MAX_CONCURRENT_GENERATIONS"), 5 + ) diff --git a/premium/backend/src/baserow_premium/fields/job_types.py b/premium/backend/src/baserow_premium/fields/job_types.py index 4bd96f1e87..6eb056d7ba 100644 --- a/premium/backend/src/baserow_premium/fields/job_types.py +++ b/premium/backend/src/baserow_premium/fields/job_types.py @@ -1,8 +1,13 @@ -from typing import Type +from collections.abc import Iterator +from concurrent.futures import Executor, ThreadPoolExecutor +from queue import Empty, Queue +from typing import Any, Type +from django.contrib.auth.models import AbstractUser from django.db.models import QuerySet from baserow_premium.generative_ai.managers import AIFileManager +from loguru import logger from rest_framework import serializers from baserow.api.errors import ERROR_GROUP_DOES_NOT_EXIST, ERROR_USER_NOT_IN_GROUP @@ -23,7 +28,10 @@ from baserow.core.exceptions import UserNotInWorkspace, WorkspaceDoesNotExist from baserow.core.formula import resolve_formula from baserow.core.formula.registries import formula_runtime_function_registry -from baserow.core.generative_ai.exceptions import ModelDoesNotBelongToType +from baserow.core.generative_ai.exceptions import ( + GenerativeAIPromptError, + ModelDoesNotBelongToType, +) from baserow.core.generative_ai.registries import ( GenerativeAIWithFilesModelType, generative_ai_model_type_registry, @@ -32,7 +40,7 @@ from baserow.core.job_types import _empty_transaction_context from baserow.core.jobs.exceptions import MaxJobCountExceeded from baserow.core.jobs.registries import JobType -from baserow.core.utils import ChildProgressBuilder +from baserow.core.utils import ChildProgressBuilder, Progress from .models import AIField, GenerateAIValuesJob from .registries import ai_field_output_registry @@ -50,6 +58,27 @@ class GenerateAIValuesJobFiltersSerializer(serializers.Serializer): ) +def get_valid_generative_ai_model_type_or_raise(ai_field: AIField): + """ + Returns the generative AI model type for the given AI field if the model belongs + to the type. Otherwise, raises a ModelDoesNotBelongToType exception. + + :param ai_field: The AI field to check. + :return: The generative AI model type. + :raises ModelDoesNotBelongToType: If the model does not belong to the type. + """ + + generative_ai_model_type = generative_ai_model_type_registry.get( + ai_field.ai_generative_ai_type + ) + workspace = ai_field.table.database.workspace + ai_models = generative_ai_model_type.get_enabled_models(workspace=workspace) + + if ai_field.ai_generative_ai_model not in ai_models: + raise ModelDoesNotBelongToType(model_name=ai_field.ai_generative_ai_model) + return generative_ai_model_type + + class GenerateAIValuesJobType(JobType): type = "generate_ai_values" model_class = GenerateAIValuesJob @@ -163,26 +192,6 @@ def _filter_empty_values( **{f"{ai_field.db_column}__isnull": True} ) | queryset.filter(**{ai_field.db_column: ""}) - def get_valid_generative_ai_model_type_or_raise(self, ai_field: AIField): - """ - Returns the generative AI model type for the given AI field if the model belongs - to the type. Otherwise, raises a ModelDoesNotBelongToType exception. - - :param ai_field: The AI field to check. - :return: The generative AI model type. - :raises ModelDoesNotBelongToType: If the model does not belong to the type. - """ - - generative_ai_model_type = generative_ai_model_type_registry.get( - ai_field.ai_generative_ai_type - ) - workspace = ai_field.table.database.workspace - ai_models = generative_ai_model_type.get_enabled_models(workspace=workspace) - - if ai_field.ai_generative_ai_model not in ai_models: - raise ModelDoesNotBelongToType(model_name=ai_field.ai_generative_ai_model) - return generative_ai_model_type - def _get_field(self, field_id: int) -> AIField: """ Returns the AI field with the given ID. @@ -209,6 +218,8 @@ def prepare_values(self, values, user): # Create the job instance without saving it yet, so we can use its mode property unsaved_job = GenerateAIValuesJob(**values) + get_valid_generative_ai_model_type_or_raise(ai_field) + if unsaved_job.mode == GenerateAIValuesJob.MODES.ROWS: found_rows_ids = ( RowHandler().get_rows(model, req_row_ids).values_list("id", flat=True) @@ -259,9 +270,80 @@ def run(self, job: GenerateAIValuesJob, progress): if job.only_empty: rows = self._filter_empty_values(rows, ai_field) + progress_builder = progress.create_child_builder( + represents_progress=progress.total + ) + + rows_progress = ChildProgressBuilder.build(progress_builder, rows.count()) + generator = AIValueGenerator(user, ai_field, self, rows_progress) + generator.process(rows.order_by("id")) + + +class AIValueGenerator: + """ + AIValueGenerator encapsulates AI field value generation process. It needs user and + field context to work, but also utilizes Job object as a sender for generation + error signal, and Progress object to mark the progress of processing. + + Internally, this schedules processing of each row to a separate thread using a + thread pool controlled by a `concurrent.futures.ThreadPoolExecutor`. Because we + use threads, a general rule is to run all code that needs a database connection in + the same, one (main) thread. The code that issues http requests to the model, + should be run in a separate thread. + + After a completion of processing, the result will be send back to the main thread + with a queue, and processed. + """ + + def __init__( + self, + user: AbstractUser, + ai_field: AIField, + signal_sender: GenerateAIValuesJob | Any | None = None, + progress: Progress | None = None, + ): + self.user = user + + self.ai_field = ai_field + self.table = table = ai_field.table + self.model = table.get_model() + self.signal_sender = signal_sender + self.workspace = table.database.workspace + self.max_concurrency = self.ai_field.ai_max_concurrent_generations + + # A counter of processed rows. This doesn't include rows being still processed. + self.finished = 0 + + # Keeps track of currently processing row ids. + self.in_process = set() + + # A marker to know if we should schedule more rows. This can be set to `False` + # in two cases: when rows iterator finishes, and when there's an error and + # we don't want to continue. + self.generate_more_rows = True + + # A queue of results + self.results_queue = Queue(self.max_concurrency) + + # Marker to keep track if any errors ocurred during the process. + self.has_errors = False + + self.row_handler = RowHandler() + self.progress = progress + + self.prepare() + + def prepare(self): + """ + Prepares runtime values from AI field attached. + + This method prepares common values to be used during processing and should be + called once, before processing is started. + """ + try: - generative_ai_model_type = self.get_valid_generative_ai_model_type_or_raise( - ai_field + self.generative_ai_model_type = get_valid_generative_ai_model_type_or_raise( + self.ai_field ) except ModelDoesNotBelongToType as exc: # If the workspace AI settings have been removed before the task starts, @@ -269,91 +351,285 @@ def run(self, job: GenerateAIValuesJob, progress): # fail. We therefore want to handle the error gracefully. # Note: rows might be a generator, so we can't pass it directly rows_ai_values_generation_error.send( - self, - user=user, + self.signal_sender, + user=self.user, rows=[], - field=ai_field, - table=ai_field.table, + field=self.ai_field, + table=self.ai_field.table, error_message=str(exc), ) raise exc - ai_output_type = ai_field_output_registry.get(ai_field.ai_output_type) + self.ai_output_type = ai_field_output_registry.get(self.ai_field.ai_output_type) - progress_builder = progress.create_child_builder( - represents_progress=progress.total + self.use_file_fields = ( + self.ai_field.ai_file_field_id is not None + and isinstance( + self.generative_ai_model_type, GenerativeAIWithFilesModelType + ) ) - rows_progress = ChildProgressBuilder.build(progress_builder, rows.count()) - for row in rows.iterator(chunk_size=200): - context = HumanReadableRowContext(row, exclude_field_ids=[ai_field.id]) - message = str( - resolve_formula( - ai_field.ai_prompt, formula_runtime_function_registry, context - ) + # FIXME: manually set the websocket_id to None for now because the frontend + # needs to receive the update to stop the loading state + self.user.web_socket_id = None + + def generate_value_for(self, row: GeneratedTableModel): + """ + Runs value generation for a single row using AI model. + + The contents of the method should prepare and run a prompt on a model for the + row. This method doesn't return any value. Instead, the result, or any error + that will happen during the processing, will be put on a results queue. + + :param row: A row to generate value for. + """ + + try: + result = self._generate_value_for(row) + + self.results_queue.put( + ( + row, + result, + ), + block=True, + ) + except Exception as e: + logger.opt(exception=e).error(f"Value generation for row {row} failed: {e}") + + self.results_queue.put( + ( + row, + e, + ), + block=True, ) - # The AI output type should be able to format the prompt because it can add - # additional instructions to it. The choice output type for example adds - # additional prompt trying to force the out, for example. - message = ai_output_type.format_prompt(message, ai_field) + def _generate_value_for(self, row: GeneratedTableModel) -> Any: + """ + Prepares the message (and optionally files), sends it to the AI model and + returns the result. + :param row: A row to generate value for. + :return: A result from the AI model. + """ + + ai_field = self.ai_field + ai_output_type = self.ai_output_type + generative_ai_model_type = self.generative_ai_model_type + workspace = self.workspace + + context = HumanReadableRowContext(row, exclude_field_ids=[ai_field.id]) + message = str( + resolve_formula( + ai_field.ai_prompt, formula_runtime_function_registry, context + ) + ) + + # The AI output type should be able to format the prompt because it can add + # additional instructions to it. The choice output type for example adds + # additional prompt trying to force the output, for example. + message = ai_output_type.format_prompt(message, ai_field) + + if self.use_file_fields: try: - if ai_field.ai_file_field_id is not None and isinstance( - generative_ai_model_type, GenerativeAIWithFilesModelType - ): - file_ids = AIFileManager.upload_files_from_file_field( - ai_field, row, generative_ai_model_type, workspace=workspace - ) - try: - value = generative_ai_model_type.prompt_with_files( - ai_field.ai_generative_ai_model, - message, - file_ids=file_ids, - workspace=workspace, - temperature=ai_field.ai_temperature, - ) - except Exception as exc: - raise exc - finally: - generative_ai_model_type.delete_files( - file_ids, workspace=workspace - ) - else: - value = generative_ai_model_type.prompt( - ai_field.ai_generative_ai_model, - message, - workspace=workspace, - temperature=ai_field.ai_temperature, - ) - - # Because the AI output type can change the prompt to try to force the - # output a certain way, then it should give the opportunity to parse the - # output when it's given. With the choice output type, it will try to - # match it to a `SelectOption`, for example. - value = ai_output_type.parse_output(value, ai_field) - except Exception as exc: - # If the prompt fails once, we should not continue with the other rows. - # Note: rows might be a generator, so we can't slice it - rows_ai_values_generation_error.send( - self, - user=user, - rows=[], - field=ai_field, - table=table, - error_message=str(exc), + file_ids = AIFileManager.upload_files_from_file_field( + ai_field, row, generative_ai_model_type + ) + value = generative_ai_model_type.prompt_with_files( + ai_field.ai_generative_ai_model, + message, + file_ids=file_ids, + workspace=workspace, + temperature=ai_field.ai_temperature, ) - raise exc - - # FIXME: manually set the websocket_id to None for now because the frontend - # needs to receive the update to stop the loading state - user.web_socket_id = None - RowHandler().update_row_by_id( - user, - table, - row.id, - {ai_field.db_column: value}, - model=model, - values_already_prepared=True, + finally: + generative_ai_model_type.delete_files(file_ids, workspace=workspace) + else: + value = generative_ai_model_type.prompt( + ai_field.ai_generative_ai_model, + message, + workspace=workspace, + temperature=ai_field.ai_temperature, + ) + + # Because the AI output type can change the prompt to try to force the + # output a certain way, then it should give the opportunity to parse the + # output when it's given. With the choice output type, it will try to + # match it to a `SelectOption`, for example. + value = ai_output_type.parse_output(value, ai_field) + return value + + def handle_error(self, row: GeneratedTableModel, exc: Exception): + """ + Error handling routine, if an error occurred during getting AI model response + for a row. + + If an error occurs, this will stop processing any pending rows and will notify + the frontend on a first occurrence of an error. + + :param row: A row on which the error occurred. + :param exc: The exception that occurred. + :return: + """ + + self.stop_scheduling_rows() + + if not self.has_errors: + rows_ai_values_generation_error.send( + self, + user=self.user, + rows=[row], + field=self.ai_field, + table=self.table, + error_message=str(exc), ) - rows_progress.increment() + + self.has_errors = True + + def update_value(self, row: GeneratedTableModel, value: Any): + """ + Updates AI field value for the row with the value returned from the AI model. + """ + + self.row_handler.update_row_by_id( + self.user, + self.table, + row.id, + {self.ai_field.db_column: value}, + model=self.model, + values_already_prepared=True, + ) + + def raise_if_error(self): + """ + Checks if there was any error during the processing of rows with the AI model, + and raises GenerativeAIPromptError exception.. + + This should be called at the end of processing, to inform the caller that + there was an error. + """ + + if self.has_errors: + raise GenerativeAIPromptError(f"AI model responded with errors.") + + def process(self, rows: QuerySet[GeneratedTableModel]): + """ + Generate AI model value for selected rows in parallel. + + This will call the AI model generator for several rows at once. Each row is + processed in a separate thread, and the number of worker threads is fixed, + controlled by AIField.ai_max_concurrent_generations value. + + When there an error occurs during the processing in a worker thread, it won't + be propagated immediately. Instead, it's pushed to the queue and handled as a + result for a specific row, but also an internal flag, that informs about the + error, is set. No new rows should be scheduled after an error is received, but + the loop will wait for already scheduled threads to finish. Because the flag + is set, we can raise an appropriate exception at the end to inform the caller + about the error. + + :param rows: An iterable of rows to generate values for. + :return: + :raise GenerativeAIPromptError: Raised at the end of processing, when at least + one row failed. + """ + + rows_iter = iter(rows.iterator(chunk_size=200)) + + with ThreadPoolExecutor(self.max_concurrency) as executor: + while True: + try: + # Allow to schedule only a limited number of futures, so we don't + # populate executor's backlog with an excessive amount of rows + # to process. We add new rows to process only if there's a + # 'free slot' in the executor. + while self.can_schedule_next(): + self.schedule_next_row(rows_iter, executor) + + except StopIteration: + self.stop_scheduling_rows() + + try: + processed = self.results_queue.get(block=True, timeout=0.1) + row, result = processed + self.handle_result(row, result) + + # Queue is empty, no processed results available yet; continue polling. + except Empty: + pass + except Exception as e: + logger.opt(exception=e).error(f"Error when handing result: {e}") + self.stop_scheduling_rows() + + if self.is_finished(): + break + + self.raise_if_error() + + def stop_scheduling_rows(self): + """ + Sets internal flag to stop producing and scheduling new rows to process. + """ + + self.generate_more_rows = False + + def can_schedule_next(self) -> bool: + """ + Returns True, if there's a free slot to process. + """ + + return self.generate_more_rows and len(self.in_process) < self.max_concurrency + + def is_finished(self) -> bool: + """ + Returns true, if there's no rows left to process. + """ + + return not len(self.in_process) and not self.generate_more_rows + + def handle_result(self, row: GeneratedTableModel, result: Exception | Any): + """ + An entry point to handle the result value for a row. The result may be an + error or a correct result, so, depending on its type, it will be handled + differently. + + A correct value will be stored for the row. + + The error will be stored and a signal may be emitted, so the frontend will + know about the error. This will also stop processing new rows. + + In any case, we want to update internal progress state. + + :param row: The row for which result arrived. + :param result: The result from the AI model. + :return: + """ + + try: + if isinstance(result, Exception): + self.handle_error(row, result) + else: + self.update_value(row, result) + finally: + self.update_progress(row) + + def update_progress(self, row: GeneratedTableModel): + """ + Update internal progress state. + """ + + self.finished += 1 + self.in_process.remove(row.id) + if self.progress: + self.progress.increment() + + def schedule_next_row(self, rows_iter: Iterator, executor: Executor): + """ + Prepares and adds the next row to the work queue. + """ + + row = next(rows_iter) + + executor.submit(self.generate_value_for, row) + self.in_process.add(row.id) diff --git a/premium/backend/src/baserow_premium/fields/models.py b/premium/backend/src/baserow_premium/fields/models.py index 5e733cff39..3ecb7f899f 100644 --- a/premium/backend/src/baserow_premium/fields/models.py +++ b/premium/backend/src/baserow_premium/fields/models.py @@ -1,5 +1,6 @@ from enum import StrEnum +from django.conf import settings from django.contrib.auth import get_user_model from django.contrib.postgres.fields import ArrayField from django.db import models @@ -58,6 +59,14 @@ def __getattr__(self, name): except Exception: super().__getattr__(name) + @property + def ai_max_concurrent_generations(self) -> int: + """ + Returns a number of max concurrent workers to be used with the model. + """ + + return settings.BASEROW_AI_FIELD_MAX_CONCURRENT_GENERATIONS + class GenerateAIValuesJob( JobWithUserIpAddress, JobWithWebsocketId, JobWithUndoRedoIds, Job diff --git a/premium/backend/src/baserow_premium/generative_ai/managers.py b/premium/backend/src/baserow_premium/generative_ai/managers.py index 7f845ccd85..b5e1c1b163 100644 --- a/premium/backend/src/baserow_premium/generative_ai/managers.py +++ b/premium/backend/src/baserow_premium/generative_ai/managers.py @@ -21,7 +21,7 @@ def upload_files_from_file_field( storage = get_default_storage() - all_cell_files = getattr(row, f"field_{ai_field.ai_file_field.id}") + all_cell_files = getattr(row, f"field_{ai_field.ai_file_field_id}") if not isinstance(all_cell_files, list): # just a single file all_cell_files = [all_cell_files] if all_cell_files else [] diff --git a/premium/backend/tests/baserow_premium_tests/fields/test_generate_ai_values_job_type.py b/premium/backend/tests/baserow_premium_tests/fields/test_generate_ai_values_job_type.py index 5a88daec52..cab76d9f19 100644 --- a/premium/backend/tests/baserow_premium_tests/fields/test_generate_ai_values_job_type.py +++ b/premium/backend/tests/baserow_premium_tests/fields/test_generate_ai_values_job_type.py @@ -653,9 +653,8 @@ def test_generate_ai_field_value_view_generative_ai_invalid_prompt( assert patched_rows_updated.call_count == 0 assert patched_rows_ai_values_generation_error.call_count == 1 call_args_rows = patched_rows_ai_values_generation_error.call_args[1]["rows"] - assert ( - len(call_args_rows) == 0 - ) # Changed because rows is passed as empty list in job_types.py:220 + assert len(call_args_rows) == 1 + assert [r.id for r in call_args_rows] == [rows[0].id] assert patched_rows_ai_values_generation_error.call_args[1]["field"] == field assert ( patched_rows_ai_values_generation_error.call_args[1]["error_message"] diff --git a/premium/backend/tests/baserow_premium_tests/generative_ai/test_ai_parallel_execution.py b/premium/backend/tests/baserow_premium_tests/generative_ai/test_ai_parallel_execution.py new file mode 100644 index 0000000000..490ff3ddb4 --- /dev/null +++ b/premium/backend/tests/baserow_premium_tests/generative_ai/test_ai_parallel_execution.py @@ -0,0 +1,124 @@ +from io import BytesIO + +import pytest +from baserow_premium.fields.job_types import AIValueGenerator + +from baserow.contrib.database.rows.handler import RowHandler +from baserow.core.generative_ai.exceptions import GenerativeAIPromptError +from baserow.core.storage import get_default_storage +from baserow.core.user_files.handler import UserFileHandler +from baserow.core.utils import Progress + + +@pytest.mark.django_db +def test_ai_parallel_execution(premium_data_fixture): + storage = get_default_storage() + + user = premium_data_fixture.create_user() + premium_data_fixture.create_premium_license_user(user=user) + workspace = premium_data_fixture.create_workspace(user=user) + database = premium_data_fixture.create_database_application( + user=user, workspace=workspace + ) + table = premium_data_fixture.create_database_table(database=database) + file_field = premium_data_fixture.create_file_field( + table=table, order=0, name="File" + ) + ai_field = premium_data_fixture.create_ai_field( + table=table, order=1, name="AI prompt", ai_file_field=file_field + ) + user_file_1 = UserFileHandler().upload_user_file( + user, "aifile.txt", BytesIO(b"Hello"), storage=storage + ) + table_model = table.get_model() + + values = [ + {f"field_{file_field.id}": [{"name": user_file_1.name}]}, + {f"field_{file_field.id}": [{"name": user_file_1.name}]}, + {f"field_{file_field.id}": [{"name": user_file_1.name}]}, + ] * 10 + + RowHandler().force_create_rows( + user, + table, + values, + model=table_model, + send_realtime_update=False, + send_webhook_events=False, + ) + + rows = table_model.objects.all() + + progress = Progress(len(rows)) + gen = AIValueGenerator( + user=user, + ai_field=ai_field, + progress=progress, + ) + gen.process(rows.order_by("id")) + + assert len(rows) == 30 + assert gen.finished == len(rows) + assert not gen.has_errors + assert progress.progress == 30 + + +@pytest.mark.django_db +def test_ai_parallel_execution_with_error(premium_data_fixture): + storage = get_default_storage() + premium_data_fixture.register_fake_generate_ai_type() + user = premium_data_fixture.create_user() + premium_data_fixture.create_premium_license_user(user=user) + + workspace = premium_data_fixture.create_workspace(user=user) + database = premium_data_fixture.create_database_application( + user=user, workspace=workspace + ) + table = premium_data_fixture.create_database_table(database=database) + file_field = premium_data_fixture.create_file_field( + table=table, order=0, name="File" + ) + ai_field = premium_data_fixture.create_ai_field( + table=table, + order=1, + name="AI prompt", + ai_file_field=file_field, + ai_generative_ai_type="test_generative_ai_prompt_error", + ai_generative_ai_model="test_1", + ) + user_file_1 = UserFileHandler().upload_user_file( + user, "aifile.txt", BytesIO(b"Hello"), storage=storage + ) + table_model = table.get_model() + + values = [ + {f"field_{file_field.id}": [{"name": user_file_1.name}]}, + {f"field_{file_field.id}": [{"name": user_file_1.name}]}, + {f"field_{file_field.id}": [{"name": user_file_1.name}]}, + ] * 10 + + RowHandler().force_create_rows( + user, + table, + values, + model=table_model, + send_realtime_update=False, + send_webhook_events=False, + ) + + rows = table_model.objects.all() + + progress = Progress(len(rows)) + gen = AIValueGenerator( + user=user, + ai_field=ai_field, + progress=progress, + ) + + with pytest.raises(GenerativeAIPromptError): + gen.process(rows.order_by("id")) + + assert len(rows) == 30 + assert gen.finished == 5 + assert gen.has_errors + assert progress.progress == 5 diff --git a/premium/backend/tests/baserow_premium_tests/generative_ai/test_managers.py b/premium/backend/tests/baserow_premium_tests/generative_ai/test_managers.py index 99b68070a8..c252ea443d 100644 --- a/premium/backend/tests/baserow_premium_tests/generative_ai/test_managers.py +++ b/premium/backend/tests/baserow_premium_tests/generative_ai/test_managers.py @@ -8,10 +8,11 @@ from baserow.core.storage import get_default_storage from baserow.core.user_files.handler import UserFileHandler from baserow.test_utils.fixtures.generative_ai import TestGenerativeAIWithFilesModelType +from baserow.test_utils.helpers import AnyStr @pytest.mark.django_db -def test_upload_files_from_file_field(premium_data_fixture): +def test_upload_files_from_file_field(premium_data_fixture, django_assert_num_queries): storage = get_default_storage() user = premium_data_fixture.create_user() @@ -37,9 +38,12 @@ def test_upload_files_from_file_field(premium_data_fixture): table_model, ) - file_ids = AIFileManager.upload_files_from_file_field( - ai_field, row, generative_ai_model_type - ) + ai_field.refresh_from_db() + with django_assert_num_queries(0): + file_ids = AIFileManager.upload_files_from_file_field( + ai_field, row, generative_ai_model_type + ) + assert file_ids == [AnyStr()] assert len(generative_ai_model_type._files) == 1 assert generative_ai_model_type._files[file_ids[0]]["file_name"].endswith( From 5ab9c9df1b9299159164f0e0f43b4d5d72ada927 Mon Sep 17 00:00:00 2001 From: Bram Date: Wed, 3 Dec 2025 21:29:44 +0100 Subject: [PATCH 2/2] Fix baserow_backend management command by correctly using pg3 name (#4355) --- .../src/baserow/core/management/backup/backup_runner.py | 2 +- .../unreleased/bug/fix_backup_baserow_pg3_dbname.json | 9 +++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) create mode 100644 changelog/entries/unreleased/bug/fix_backup_baserow_pg3_dbname.json diff --git a/backend/src/baserow/core/management/backup/backup_runner.py b/backend/src/baserow/core/management/backup/backup_runner.py index 8b61ea652c..c6eceb5178 100644 --- a/backend/src/baserow/core/management/backup/backup_runner.py +++ b/backend/src/baserow/core/management/backup/backup_runner.py @@ -158,7 +158,7 @@ def _build_connection(self): return psycopg.connect( host=self.host, port=self.port, - database=self.database, + dbname=self.database, user=self.username, ) diff --git a/changelog/entries/unreleased/bug/fix_backup_baserow_pg3_dbname.json b/changelog/entries/unreleased/bug/fix_backup_baserow_pg3_dbname.json new file mode 100644 index 0000000000..4f98921941 --- /dev/null +++ b/changelog/entries/unreleased/bug/fix_backup_baserow_pg3_dbname.json @@ -0,0 +1,9 @@ +{ + "type": "bug", + "message": "Fix backup_baserow management command by using correct pg3 dbname.", + "issue_origin": "github", + "issue_number": 4308, + "domain": "core", + "bullet_points": [], + "created_at": "2025-11-28" +}