Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions backend/src/baserow/config/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,10 @@ def __setitem__(self, key, value):
os.getenv("BATCH_ROWS_SIZE_LIMIT", 200)
) # How many rows can be modified at once.

SEARCH_UPDATE_BATCH_SIZE = int(
os.getenv("BASEROW_SEARCH_UPDATE_BATCH_SIZE", 2000)
) # How many rows to process per batch in search index updates.

# Maximum count of records considered as a 'small table' during field rule operations.
FIELD_RULE_ROWS_LIMIT = int(os.getenv("FIELD_RULE_ROWS_LIMIT", BATCH_ROWS_SIZE_LIMIT))

Expand Down
75 changes: 59 additions & 16 deletions backend/src/baserow/contrib/database/data_sync/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from baserow.contrib.database.models import Database
from baserow.contrib.database.operations import CreateTableDatabaseTableOperationType
from baserow.contrib.database.rows.handler import RowHandler
from baserow.contrib.database.rows.types import CreatedRowsData
from baserow.contrib.database.rows.types import CreatedRowsData, UpdatedRowsData
from baserow.contrib.database.search.handler import SearchHandler
from baserow.contrib.database.table.models import Table
from baserow.contrib.database.table.operations import UpdateDatabaseTableOperationType
Expand Down Expand Up @@ -43,8 +43,56 @@
from .operations import SyncTableOperationType
from .registries import data_sync_type_registry, two_way_sync_strategy_type_registry

LARGE_DATA_SYNC_SEARCH_UPDATE_MIN_CHANGED_ROWS = 10_000
LARGE_DATA_SYNC_SEARCH_UPDATE_THRESHOLD = 0.5


class DataSyncHandler:
def _schedule_search_updates_after_sync(
self,
data_sync: DataSync,
enabled_properties: QuerySet[DataSyncSyncedProperty],
existing_row_count: int,
created_rows: CreatedRowsData,
updated_rows: UpdatedRowsData,
row_ids_to_delete: list[int],
) -> None:
changed_rows_count = (
len(created_rows.created_rows)
+ len(updated_rows.updated_rows)
+ len(row_ids_to_delete)
)
if changed_rows_count == 0:
return

search_fields = [p.field for p in enabled_properties]
full_field_search_update = (
len(row_ids_to_delete) > 0
or changed_rows_count > LARGE_DATA_SYNC_SEARCH_UPDATE_MIN_CHANGED_ROWS
or changed_rows_count
>= existing_row_count * LARGE_DATA_SYNC_SEARCH_UPDATE_THRESHOLD
)

if full_field_search_update:
SearchHandler.schedule_update_search_data(
data_sync.table, fields=search_fields
)
return

row_ids_to_refresh = {row.id for row in created_rows.created_rows} | {
row.id for row in updated_rows.updated_rows
}
if created_rows.cascade_update:
row_ids_to_refresh.update(created_rows.cascade_update.row_ids)
if updated_rows.cascade_update:
row_ids_to_refresh.update(updated_rows.cascade_update.row_ids)

SearchHandler.schedule_update_search_data(
data_sync.table,
fields=search_fields,
row_ids=sorted(row_ids_to_refresh),
)

def get_data_sync(
self, data_sync_id: int, base_queryset: Optional[QuerySet] = None
) -> DataSync:
Expand Down Expand Up @@ -473,8 +521,9 @@ def _do_sync_table(self, user, data_sync, progress_builder):
)
progress.increment(by=10) # makes the total `80`

updated_rows = UpdatedRowsData([], [], {}, {}, None, [], None)
if len(rows_to_update) > 0:
RowHandler().update_rows(
updated_rows = RowHandler().update_rows(
user=user,
table=data_sync.table,
rows_values=rows_to_update,
Expand All @@ -500,20 +549,14 @@ def _do_sync_table(self, user, data_sync, progress_builder):
)
progress.increment(by=10) # makes the total `100`

if (
len(rows_to_create) > 0
or len(rows_to_update) > 0
or len(row_ids_to_delete) > 0
):
# No need to include this in the progress as it triggers a celery task
row_ids = [r["id"] for r in rows_to_update] + [
r.id for r in created_rows.created_rows
]
SearchHandler.schedule_update_search_data(
data_sync.table,
fields=[p.field for p in enabled_properties],
row_ids=row_ids,
)
self._schedule_search_updates_after_sync(
data_sync=data_sync,
enabled_properties=enabled_properties,
existing_row_count=len(existing_rows_queryset),
created_rows=created_rows,
updated_rows=updated_rows,
row_ids_to_delete=row_ids_to_delete,
)

def set_data_sync_synced_properties(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Generated by Django 5.2.13 on 2026-04-21 20:08

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
('database', '0208_gridview_frozen_column_count'),
]

operations = [
migrations.RemoveIndex(
model_name='pendingsearchvalueupdate',
name='pendingsearchvaluedeletion_frd',
),
# Column and FK kept one release for rollback safety; only the FK index is dropped.
migrations.SeparateDatabaseAndState(
state_operations=[
migrations.RemoveField(
model_name='pendingsearchvalueupdate',
name='table',
),
],
database_operations=[
migrations.RunSQL(
sql=(
"DROP INDEX IF EXISTS "
"database_pendingsearchvalueupdate_table_id_813adfd1;"
),
reverse_sql=(
"CREATE INDEX IF NOT EXISTS "
"database_pendingsearchvalueupdate_table_id_813adfd1 "
"ON database_pendingsearchvalueupdate (table_id);"
),
),
],
),
migrations.RunSQL(
sql="""
ALTER TABLE database_pendingsearchvalueupdate SET (
autovacuum_analyze_threshold = 2000,
autovacuum_analyze_scale_factor = 0.002,
autovacuum_vacuum_threshold = 5000,
autovacuum_vacuum_scale_factor = 0.01,
autovacuum_vacuum_insert_threshold = 5000,
autovacuum_vacuum_insert_scale_factor = 0.01
);
""",
reverse_sql="""
ALTER TABLE database_pendingsearchvalueupdate RESET (
autovacuum_analyze_threshold,
autovacuum_vacuum_scale_factor,
autovacuum_analyze_scale_factor,
autovacuum_vacuum_threshold,
autovacuum_vacuum_insert_threshold,
autovacuum_vacuum_insert_scale_factor
);
""",
),
migrations.RunSQL(
sql="""
ALTER TABLE database_pendingsearchvalueupdate
ALTER COLUMN field_id SET STATISTICS 2000;
""",
reverse_sql="""
ALTER TABLE database_pendingsearchvalueupdate
ALTER COLUMN field_id SET STATISTICS -1;
""",
),
]
41 changes: 34 additions & 7 deletions backend/src/baserow/contrib/database/rows/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@
tracer = trace.get_tracer(__name__)

BATCH_SIZE = 1024
LARGE_IMPORT_SEARCH_UPDATE_MIN_CHANGED_ROWS = 10_000
LARGE_IMPORT_SEARCH_UPDATE_THRESHOLD = 0.5

meter = metrics.get_meter(__name__)
rows_created_counter = meter.create_counter(
Expand Down Expand Up @@ -236,6 +238,19 @@ def get_deleted_link_row_rels_for_update_collector(


class RowHandler(metaclass=baserow_trace_methods(tracer)):
def _should_use_full_field_search_update_for_import(
self,
changed_rows: int,
model: GeneratedTableModel,
) -> bool:
if changed_rows > LARGE_IMPORT_SEARCH_UPDATE_MIN_CHANGED_ROWS:
return True

existing_row_count = model.objects.count()
return changed_rows >= (
existing_row_count * LARGE_IMPORT_SEARCH_UPDATE_THRESHOLD
)

def prepare_values(self, fields, values):
"""
Prepares a set of values so that they can be created or updated in the database.
Expand Down Expand Up @@ -1767,6 +1782,7 @@ def force_create_rows_by_batch(
progress: Optional[Progress] = None,
model: Optional[Type[GeneratedTableModel]] = None,
signal_params: Optional[Dict] = None,
skip_search_update: bool = True,
) -> Tuple[List[GeneratedTableModel], Dict[str, Dict[str, Any]]]:
"""
Creates rows by batch and generates an error report instead of failing on first
Expand All @@ -1777,6 +1793,9 @@ def force_create_rows_by_batch(
:param rows_values: List of rows values for rows that need to be created.
:param progress: Give a progress instance to track the progress of the import.
:param model: Optional model to prevent recomputing table model.
:param signal_params: Additional parameters that are added to the signal.
:param skip_search_update: When True, skip search updates. The caller is
responsible for managing search updates.
:return: The created rows and the error report.
"""

Expand Down Expand Up @@ -1806,9 +1825,7 @@ def force_create_rows_by_batch(
generate_error_report=True,
send_realtime_update=False,
send_webhook_events=False,
# Don't trigger loads of search updates for every batch of rows we
# create but instead a single one for this entire table at the end.
skip_search_update=True,
skip_search_update=skip_search_update,
signal_params=signal_params,
)

Expand All @@ -1822,10 +1839,6 @@ def force_create_rows_by_batch(

all_created_rows += created_rows

SearchHandler.schedule_update_search_data(
table, row_ids=[r.id for r in all_created_rows]
)

return all_created_rows, report

def force_update_rows_by_batch(
Expand All @@ -1836,6 +1849,7 @@ def force_update_rows_by_batch(
progress: Progress,
model: Optional[Type[GeneratedTableModel]] = None,
signal_params: Optional[Dict] = None,
skip_search_update: bool = True,
) -> Tuple[List[Dict[str, Any] | None], Dict[str, Dict[str, Any]]]:
"""
Updates rows by batch and generates an error report instead of failing on first
Expand All @@ -1847,6 +1861,8 @@ def force_update_rows_by_batch(
:param progress: Give a progress instance to track the progress of the import.
:param model: Optional model to prevent recomputing table model.
:param signal_params: Additional parameters that are added to the signal.
:param skip_search_update: When True, skip search updates. The caller is
responsible for managing search updates.
:return: The updated rows and the error report.
"""

Expand Down Expand Up @@ -1876,6 +1892,7 @@ def force_update_rows_by_batch(
send_realtime_update=False,
send_webhook_events=False,
generate_error_report=True,
skip_search_update=skip_search_update,
signal_params=signal_params,
)
report.update(result.errors)
Expand Down Expand Up @@ -2059,12 +2076,18 @@ def import_rows(
else:
rows_values_to_create = valid_rows

changed_rows = len(rows_values_to_create) + len(rows_values_to_update)
full_field_search_update = self._should_use_full_field_search_update_for_import(
changed_rows, model
)

created_rows, creation_report = self.force_create_rows_by_batch(
user,
table,
rows_values_to_create,
progress=creation_sub_progress,
model=model,
skip_search_update=full_field_search_update,
)

if rows_values_to_update:
Expand All @@ -2074,6 +2097,7 @@ def import_rows(
rows_values_to_update,
progress=creation_sub_progress,
model=model,
skip_search_update=full_field_search_update,
)

# Add errors to global report
Expand All @@ -2095,6 +2119,9 @@ def import_rows(
# of rows_created because we might import a lot of rows.
table_updated.send(self, table=table, user=user, force_table_refresh=True)

if full_field_search_update and changed_rows > 0:
SearchHandler.schedule_update_search_data(table)

return created_rows, error_report.to_dict()

def get_fields_metadata_for_row_history(
Expand Down
Loading
Loading