|
2 | 2 | from typing import List, Optional |
3 | 3 |
|
4 | 4 | from django.conf import settings |
5 | | -from django.core.cache import cache |
6 | 5 | from django.db.models import Q |
7 | 6 |
|
8 | 7 | from celery_singleton import DuplicateTaskError, Singleton |
9 | 8 | from django_cte import With |
10 | 9 | from loguru import logger |
11 | 10 |
|
| 11 | +from baserow.celery_singleton_backend import SingletonAutoRescheduleFlag |
12 | 12 | from baserow.config.celery import app |
13 | 13 | from baserow.contrib.database.search.models import PendingSearchValueUpdate |
14 | 14 | from baserow.contrib.database.table.exceptions import TableDoesNotExist |
|
17 | 17 | PERIODIC_CHECK_TIME_LIMIT = 60 * PERIODIC_CHECK_MINUTES # 15 minutes. |
18 | 18 |
|
19 | 19 |
|
20 | | -class PendingSearchUpdateFlag: |
21 | | - """ |
22 | | - Flag is used to indicate that a search data update task is pending for a |
23 | | - specific table and it has not been possible to schedule it yet due to a concurrent |
24 | | - task already running for the same table. |
25 | | -
|
26 | | - When the task ends, if this flag is set, it will re-schedule itself to ensure that |
27 | | - the search data is eventually updated. |
28 | | - """ |
29 | | - |
30 | | - def __init__(self, table_id: int): |
31 | | - self.table_id = table_id |
32 | | - |
33 | | - @property |
34 | | - def key(self): |
35 | | - """ |
36 | | - Returns the cache key to use for the table lock. |
37 | | - """ |
38 | | - |
39 | | - return f"database_search_data_lock_{self.table_id}" |
40 | | - |
41 | | - def get(self): |
42 | | - """ |
43 | | - Gets the lock for the search data update task. |
44 | | -
|
45 | | - :return: True if the lock is set, False otherwise. |
46 | | - """ |
47 | | - |
48 | | - return cache.get(key=self.key) |
49 | | - |
50 | | - def set(self): |
51 | | - """ |
52 | | - Sets the lock for the search data update task. |
53 | | - """ |
54 | | - |
55 | | - return cache.set( |
56 | | - key=self.key, |
57 | | - value=True, |
58 | | - timeout=settings.AUTO_INDEX_LOCK_EXPIRY * 2, |
59 | | - ) |
60 | | - |
61 | | - def clear(self): |
62 | | - """ |
63 | | - Clears the lock for the search data update task. |
64 | | - """ |
65 | | - |
66 | | - return cache.delete(key=self.key) |
| 20 | +def _get_singleton_autoreschedule_flag(table_id: int) -> SingletonAutoRescheduleFlag: |
| 21 | + return SingletonAutoRescheduleFlag(f"database_search_data_lock_{table_id}") |
67 | 22 |
|
68 | 23 |
|
69 | 24 | @app.task(queue="export") |
@@ -114,7 +69,8 @@ def schedule_update_search_data( |
114 | 69 | # There are new updates pending to be processed, make sure the flag is set |
115 | 70 | # so the task will be re-scheduled at the end of the current run. |
116 | 71 | if new_pending_updates: |
117 | | - PendingSearchUpdateFlag(table_id).set() |
| 72 | + flag = _get_singleton_autoreschedule_flag(table_id) |
| 73 | + flag.set() |
118 | 74 |
|
119 | 75 |
|
120 | 76 | @app.task( |
@@ -162,13 +118,13 @@ def update_search_data(table_id: int): |
162 | 118 | SearchHandler.initialize_missing_search_data(table) |
163 | 119 |
|
164 | 120 | # Make sure newer updates will re-schedule this task at the end if needed. |
165 | | - flag = PendingSearchUpdateFlag(table_id) |
| 121 | + flag = _get_singleton_autoreschedule_flag(table_id) |
166 | 122 | flag.clear() |
167 | 123 |
|
168 | 124 | SearchHandler.process_search_data_updates(table) |
169 | 125 |
|
170 | 126 | # If new updates were queued during processing, schedule another update |
171 | | - if flag.get(): |
| 127 | + if flag.is_set(): |
172 | 128 | logger.debug( |
173 | 129 | f"New updates detected, rescheduling the task for table {table_id}." |
174 | 130 | ) |
|
0 commit comments