Skip to content

Commit deef31d

Browse files
authored
perf: optimize pending search update scheduling for large imports and syncs (baserow#5250)
* optimize pending search update scheduling for large imports and syncs * address copilot feedback * address feedback
1 parent d2a84ac commit deef31d

11 files changed

Lines changed: 446 additions & 75 deletions

File tree

backend/src/baserow/config/settings/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -850,6 +850,10 @@ def __setitem__(self, key, value):
850850
os.getenv("BATCH_ROWS_SIZE_LIMIT", 200)
851851
) # How many rows can be modified at once.
852852

853+
SEARCH_UPDATE_BATCH_SIZE = int(
854+
os.getenv("BASEROW_SEARCH_UPDATE_BATCH_SIZE", 2000)
855+
) # How many rows to process per batch in search index updates.
856+
853857
# Maximum count of records considered as a 'small table' during field rule operations.
854858
FIELD_RULE_ROWS_LIMIT = int(os.getenv("FIELD_RULE_ROWS_LIMIT", BATCH_ROWS_SIZE_LIMIT))
855859

backend/src/baserow/contrib/database/data_sync/handler.py

Lines changed: 59 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from baserow.contrib.database.models import Database
1616
from baserow.contrib.database.operations import CreateTableDatabaseTableOperationType
1717
from baserow.contrib.database.rows.handler import RowHandler
18-
from baserow.contrib.database.rows.types import CreatedRowsData
18+
from baserow.contrib.database.rows.types import CreatedRowsData, UpdatedRowsData
1919
from baserow.contrib.database.search.handler import SearchHandler
2020
from baserow.contrib.database.table.models import Table
2121
from baserow.contrib.database.table.operations import UpdateDatabaseTableOperationType
@@ -43,8 +43,56 @@
4343
from .operations import SyncTableOperationType
4444
from .registries import data_sync_type_registry, two_way_sync_strategy_type_registry
4545

46+
LARGE_DATA_SYNC_SEARCH_UPDATE_MIN_CHANGED_ROWS = 10_000
47+
LARGE_DATA_SYNC_SEARCH_UPDATE_THRESHOLD = 0.5
48+
4649

4750
class DataSyncHandler:
51+
def _schedule_search_updates_after_sync(
52+
self,
53+
data_sync: DataSync,
54+
enabled_properties: QuerySet[DataSyncSyncedProperty],
55+
existing_row_count: int,
56+
created_rows: CreatedRowsData,
57+
updated_rows: UpdatedRowsData,
58+
row_ids_to_delete: list[int],
59+
) -> None:
60+
changed_rows_count = (
61+
len(created_rows.created_rows)
62+
+ len(updated_rows.updated_rows)
63+
+ len(row_ids_to_delete)
64+
)
65+
if changed_rows_count == 0:
66+
return
67+
68+
search_fields = [p.field for p in enabled_properties]
69+
full_field_search_update = (
70+
len(row_ids_to_delete) > 0
71+
or changed_rows_count > LARGE_DATA_SYNC_SEARCH_UPDATE_MIN_CHANGED_ROWS
72+
or changed_rows_count
73+
>= existing_row_count * LARGE_DATA_SYNC_SEARCH_UPDATE_THRESHOLD
74+
)
75+
76+
if full_field_search_update:
77+
SearchHandler.schedule_update_search_data(
78+
data_sync.table, fields=search_fields
79+
)
80+
return
81+
82+
row_ids_to_refresh = {row.id for row in created_rows.created_rows} | {
83+
row.id for row in updated_rows.updated_rows
84+
}
85+
if created_rows.cascade_update:
86+
row_ids_to_refresh.update(created_rows.cascade_update.row_ids)
87+
if updated_rows.cascade_update:
88+
row_ids_to_refresh.update(updated_rows.cascade_update.row_ids)
89+
90+
SearchHandler.schedule_update_search_data(
91+
data_sync.table,
92+
fields=search_fields,
93+
row_ids=sorted(row_ids_to_refresh),
94+
)
95+
4896
def get_data_sync(
4997
self, data_sync_id: int, base_queryset: Optional[QuerySet] = None
5098
) -> DataSync:
@@ -473,8 +521,9 @@ def _do_sync_table(self, user, data_sync, progress_builder):
473521
)
474522
progress.increment(by=10) # makes the total `80`
475523

524+
updated_rows = UpdatedRowsData([], [], {}, {}, None, [], None)
476525
if len(rows_to_update) > 0:
477-
RowHandler().update_rows(
526+
updated_rows = RowHandler().update_rows(
478527
user=user,
479528
table=data_sync.table,
480529
rows_values=rows_to_update,
@@ -500,20 +549,14 @@ def _do_sync_table(self, user, data_sync, progress_builder):
500549
)
501550
progress.increment(by=10) # makes the total `100`
502551

503-
if (
504-
len(rows_to_create) > 0
505-
or len(rows_to_update) > 0
506-
or len(row_ids_to_delete) > 0
507-
):
508-
# No need to include this in the progress as it triggers a celery task
509-
row_ids = [r["id"] for r in rows_to_update] + [
510-
r.id for r in created_rows.created_rows
511-
]
512-
SearchHandler.schedule_update_search_data(
513-
data_sync.table,
514-
fields=[p.field for p in enabled_properties],
515-
row_ids=row_ids,
516-
)
552+
self._schedule_search_updates_after_sync(
553+
data_sync=data_sync,
554+
enabled_properties=enabled_properties,
555+
existing_row_count=len(existing_rows_queryset),
556+
created_rows=created_rows,
557+
updated_rows=updated_rows,
558+
row_ids_to_delete=row_ids_to_delete,
559+
)
517560

518561
def set_data_sync_synced_properties(
519562
self,
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# Generated by Django 5.2.13 on 2026-04-21 20:08
2+
3+
from django.db import migrations
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
('database', '0208_gridview_frozen_column_count'),
10+
]
11+
12+
operations = [
13+
migrations.RemoveIndex(
14+
model_name='pendingsearchvalueupdate',
15+
name='pendingsearchvaluedeletion_frd',
16+
),
17+
# Column and FK kept one release for rollback safety; only the FK index is dropped.
18+
migrations.SeparateDatabaseAndState(
19+
state_operations=[
20+
migrations.RemoveField(
21+
model_name='pendingsearchvalueupdate',
22+
name='table',
23+
),
24+
],
25+
database_operations=[
26+
migrations.RunSQL(
27+
sql=(
28+
"DROP INDEX IF EXISTS "
29+
"database_pendingsearchvalueupdate_table_id_813adfd1;"
30+
),
31+
reverse_sql=(
32+
"CREATE INDEX IF NOT EXISTS "
33+
"database_pendingsearchvalueupdate_table_id_813adfd1 "
34+
"ON database_pendingsearchvalueupdate (table_id);"
35+
),
36+
),
37+
],
38+
),
39+
migrations.RunSQL(
40+
sql="""
41+
ALTER TABLE database_pendingsearchvalueupdate SET (
42+
autovacuum_analyze_threshold = 2000,
43+
autovacuum_analyze_scale_factor = 0.002,
44+
autovacuum_vacuum_threshold = 5000,
45+
autovacuum_vacuum_scale_factor = 0.01,
46+
autovacuum_vacuum_insert_threshold = 5000,
47+
autovacuum_vacuum_insert_scale_factor = 0.01
48+
);
49+
""",
50+
reverse_sql="""
51+
ALTER TABLE database_pendingsearchvalueupdate RESET (
52+
autovacuum_analyze_threshold,
53+
autovacuum_vacuum_scale_factor,
54+
autovacuum_analyze_scale_factor,
55+
autovacuum_vacuum_threshold,
56+
autovacuum_vacuum_insert_threshold,
57+
autovacuum_vacuum_insert_scale_factor
58+
);
59+
""",
60+
),
61+
migrations.RunSQL(
62+
sql="""
63+
ALTER TABLE database_pendingsearchvalueupdate
64+
ALTER COLUMN field_id SET STATISTICS 2000;
65+
""",
66+
reverse_sql="""
67+
ALTER TABLE database_pendingsearchvalueupdate
68+
ALTER COLUMN field_id SET STATISTICS -1;
69+
""",
70+
),
71+
]

backend/src/baserow/contrib/database/rows/handler.py

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@
124124
tracer = trace.get_tracer(__name__)
125125

126126
BATCH_SIZE = 1024
127+
LARGE_IMPORT_SEARCH_UPDATE_MIN_CHANGED_ROWS = 10_000
128+
LARGE_IMPORT_SEARCH_UPDATE_THRESHOLD = 0.5
127129

128130
meter = metrics.get_meter(__name__)
129131
rows_created_counter = meter.create_counter(
@@ -236,6 +238,19 @@ def get_deleted_link_row_rels_for_update_collector(
236238

237239

238240
class RowHandler(metaclass=baserow_trace_methods(tracer)):
241+
def _should_use_full_field_search_update_for_import(
242+
self,
243+
changed_rows: int,
244+
model: GeneratedTableModel,
245+
) -> bool:
246+
if changed_rows > LARGE_IMPORT_SEARCH_UPDATE_MIN_CHANGED_ROWS:
247+
return True
248+
249+
existing_row_count = model.objects.count()
250+
return changed_rows >= (
251+
existing_row_count * LARGE_IMPORT_SEARCH_UPDATE_THRESHOLD
252+
)
253+
239254
def prepare_values(self, fields, values):
240255
"""
241256
Prepares a set of values so that they can be created or updated in the database.
@@ -1767,6 +1782,7 @@ def force_create_rows_by_batch(
17671782
progress: Optional[Progress] = None,
17681783
model: Optional[Type[GeneratedTableModel]] = None,
17691784
signal_params: Optional[Dict] = None,
1785+
skip_search_update: bool = True,
17701786
) -> Tuple[List[GeneratedTableModel], Dict[str, Dict[str, Any]]]:
17711787
"""
17721788
Creates rows by batch and generates an error report instead of failing on first
@@ -1777,6 +1793,9 @@ def force_create_rows_by_batch(
17771793
:param rows_values: List of rows values for rows that need to be created.
17781794
:param progress: Give a progress instance to track the progress of the import.
17791795
:param model: Optional model to prevent recomputing table model.
1796+
:param signal_params: Additional parameters that are added to the signal.
1797+
:param skip_search_update: When True, skip search updates. The caller is
1798+
responsible for managing search updates.
17801799
:return: The created rows and the error report.
17811800
"""
17821801

@@ -1806,9 +1825,7 @@ def force_create_rows_by_batch(
18061825
generate_error_report=True,
18071826
send_realtime_update=False,
18081827
send_webhook_events=False,
1809-
# Don't trigger loads of search updates for every batch of rows we
1810-
# create but instead a single one for this entire table at the end.
1811-
skip_search_update=True,
1828+
skip_search_update=skip_search_update,
18121829
signal_params=signal_params,
18131830
)
18141831

@@ -1822,10 +1839,6 @@ def force_create_rows_by_batch(
18221839

18231840
all_created_rows += created_rows
18241841

1825-
SearchHandler.schedule_update_search_data(
1826-
table, row_ids=[r.id for r in all_created_rows]
1827-
)
1828-
18291842
return all_created_rows, report
18301843

18311844
def force_update_rows_by_batch(
@@ -1836,6 +1849,7 @@ def force_update_rows_by_batch(
18361849
progress: Progress,
18371850
model: Optional[Type[GeneratedTableModel]] = None,
18381851
signal_params: Optional[Dict] = None,
1852+
skip_search_update: bool = True,
18391853
) -> Tuple[List[Dict[str, Any] | None], Dict[str, Dict[str, Any]]]:
18401854
"""
18411855
Updates rows by batch and generates an error report instead of failing on first
@@ -1847,6 +1861,8 @@ def force_update_rows_by_batch(
18471861
:param progress: Give a progress instance to track the progress of the import.
18481862
:param model: Optional model to prevent recomputing table model.
18491863
:param signal_params: Additional parameters that are added to the signal.
1864+
:param skip_search_update: When True, skip search updates. The caller is
1865+
responsible for managing search updates.
18501866
:return: The updated rows and the error report.
18511867
"""
18521868

@@ -1876,6 +1892,7 @@ def force_update_rows_by_batch(
18761892
send_realtime_update=False,
18771893
send_webhook_events=False,
18781894
generate_error_report=True,
1895+
skip_search_update=skip_search_update,
18791896
signal_params=signal_params,
18801897
)
18811898
report.update(result.errors)
@@ -2059,12 +2076,18 @@ def import_rows(
20592076
else:
20602077
rows_values_to_create = valid_rows
20612078

2079+
changed_rows = len(rows_values_to_create) + len(rows_values_to_update)
2080+
full_field_search_update = self._should_use_full_field_search_update_for_import(
2081+
changed_rows, model
2082+
)
2083+
20622084
created_rows, creation_report = self.force_create_rows_by_batch(
20632085
user,
20642086
table,
20652087
rows_values_to_create,
20662088
progress=creation_sub_progress,
20672089
model=model,
2090+
skip_search_update=full_field_search_update,
20682091
)
20692092

20702093
if rows_values_to_update:
@@ -2074,6 +2097,7 @@ def import_rows(
20742097
rows_values_to_update,
20752098
progress=creation_sub_progress,
20762099
model=model,
2100+
skip_search_update=full_field_search_update,
20772101
)
20782102

20792103
# Add errors to global report
@@ -2095,6 +2119,9 @@ def import_rows(
20952119
# of rows_created because we might import a lot of rows.
20962120
table_updated.send(self, table=table, user=user, force_table_refresh=True)
20972121

2122+
if full_field_search_update and changed_rows > 0:
2123+
SearchHandler.schedule_update_search_data(table)
2124+
20982125
return created_rows, error_report.to_dict()
20992126

21002127
def get_fields_metadata_for_row_history(

0 commit comments

Comments
 (0)