Skip to content

Commit fe714bf

Browse files
feat(server-core): Exponential backoff in refresh scheduler (cube-js#10302)
* add CUBEJS_PRE_AGG_BACKOFF_MAX_TIME env * implement PreAggBackoff api in orchestrator pre-aggs * impl backoff in refreshScheduler * wip: debug logs * add tests * Revert "wip: debug logs" This reverts commit 8f7bfb6. * renaming * docs: New env var * . --------- Co-authored-by: Igor Lukanin <igor@cube.dev>
1 parent 7f6049a commit fe714bf

6 files changed

Lines changed: 222 additions & 3 deletions

File tree

docs/pages/product/caching/refreshing-pre-aggregations.mdx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ behavior:
1616
- `CUBEJS_REFRESH_WORKER_CONCURRENCY` (see also `CUBEJS_CONCURRENCY`)
1717
- `CUBEJS_SCHEDULED_REFRESH_QUERIES_PER_APP_ID`
1818
- `CUBEJS_DROP_PRE_AGG_WITHOUT_TOUCH`
19+
- `CUBEJS_PRE_AGGREGATIONS_BACKOFF_MAX_TIME`
1920

2021
## Troubleshooting
2122

docs/pages/product/configuration/reference/environment-variables.mdx

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1155,6 +1155,17 @@ This can be overridden for individual pre-aggregations using the
11551155
| --------------- | ---------------------- | --------------------- |
11561156
| `true`, `false` | `true` | `true` |
11571157

1158+
## `CUBEJS_PRE_AGGREGATIONS_BACKOFF_MAX_TIME`
1159+
1160+
The maximum time, in seconds, for exponential backoff for retries when pre-aggregation
1161+
builds fail. When a pre-aggregation refresh fails, retries will be executed with
1162+
exponentially increasing delays, but the delay will not exceed the value specified by
1163+
this variable.
1164+
1165+
| Possible Values | Default in Development | Default in Production |
1166+
| ------------------------- | ---------------------- | --------------------- |
1167+
| A valid number in seconds | `600` | `600` |
1168+
11581169
## `CUBEJS_REFRESH_WORKER`
11591170

11601171
If `true`, this instance of Cube will **only** refresh pre-aggregations.

packages/cubejs-backend-shared/src/env.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,13 @@ const variables: Record<string, (...args: any) => any> = {
751751
.default(60 * 60 * 24)
752752
.asIntPositive(),
753753

754+
/**
755+
* Maximum time for exponential backoff for pre-aggs (in seconds)
756+
*/
757+
preAggBackoffMaxTime: (): number => get('CUBEJS_PRE_AGGREGATIONS_BACKOFF_MAX_TIME')
758+
.default(10 * 60)
759+
.asIntPositive(),
760+
754761
/**
755762
* Expire time for touch records
756763
*/

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,8 @@ export class PreAggregations {
251251

252252
private readonly touchTablePersistTime: number;
253253

254+
private readonly preAggBackoffMaxTime: number;
255+
254256
public readonly dropPreAggregationsWithoutTouch: boolean;
255257

256258
private readonly usedTablePersistTime: number;
@@ -277,6 +279,7 @@ export class PreAggregations {
277279
this.externalDriverFactory = options.externalDriverFactory;
278280
this.structureVersionPersistTime = options.structureVersionPersistTime || 60 * 60 * 24 * 30;
279281
this.touchTablePersistTime = options.touchTablePersistTime || getEnv('touchPreAggregationTimeout');
282+
this.preAggBackoffMaxTime = options.preAggBackoffMaxTime || getEnv('preAggBackoffMaxTime');
280283
this.dropPreAggregationsWithoutTouch = options.dropPreAggregationsWithoutTouch || getEnv('dropPreAggregationsWithoutTouch');
281284
this.usedTablePersistTime = options.usedTablePersistTime || getEnv('dbQueryTimeout');
282285
this.externalRefresh = options.externalRefresh;
@@ -317,6 +320,11 @@ export class PreAggregations {
317320
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_TOUCH', tableName);
318321
}
319322

323+
protected preAggBackoffRedisKey(tableName: string): string {
324+
// TODO add dataSource?
325+
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_BACKOFF', tableName);
326+
}
327+
320328
protected refreshEndReachedKey() {
321329
// TODO add dataSource?
322330
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_REFRESH_END_REACHED', '');
@@ -372,6 +380,36 @@ export class PreAggregations {
372380
.map(k => k.replace(this.tablesTouchRedisKey(''), ''));
373381
}
374382

383+
public async updatePreAggBackoff(tableName: string, backoffData: { backoffMultiplier: number, nextTimestamp: Date }): Promise<void> {
384+
await this.queryCache.getCacheDriver().set(
385+
this.preAggBackoffRedisKey(tableName),
386+
JSON.stringify(backoffData),
387+
this.preAggBackoffMaxTime
388+
);
389+
}
390+
391+
public async removePreAggBackoff(tableName: string): Promise<void> {
392+
await this.queryCache.getCacheDriver().remove(this.preAggBackoffRedisKey(tableName));
393+
}
394+
395+
public getPreAggBackoffMaxTime(): number {
396+
return this.preAggBackoffMaxTime;
397+
}
398+
399+
public async getPreAggBackoff(tableName: string): Promise<{ backoffMultiplier: number, nextTimestamp: Date } | null> {
400+
const res = await this.queryCache.getCacheDriver().get(this.preAggBackoffRedisKey(tableName));
401+
402+
if (!res) {
403+
return null;
404+
}
405+
406+
const parsed = JSON.parse(res);
407+
return {
408+
backoffMultiplier: parsed.backoffMultiplier,
409+
nextTimestamp: new Date(parsed.nextTimestamp),
410+
};
411+
}
412+
375413
public async updateRefreshEndReached() {
376414
return this.queryCache.getCacheDriver().set(this.refreshEndReachedKey(), new Date().getTime(), this.touchTablePersistTime);
377415
}

packages/cubejs-server-core/src/core/RefreshScheduler.ts

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -615,7 +615,49 @@ export class RefreshScheduler {
615615
const currentQuery = await queryIterator.current();
616616
if (currentQuery && queryIterator.partitionCounter() % concurrency === workerIndex) {
617617
const orchestratorApi = await this.serverCore.getOrchestratorApi(context);
618-
await orchestratorApi.executeQuery({ ...currentQuery, preAggregationsLoadCacheByDataSource });
618+
const preAggsInstance = orchestratorApi.getQueryOrchestrator().getPreAggregations();
619+
const now = new Date();
620+
621+
const backoffChecks = await Promise.all(
622+
currentQuery.preAggregations.map(p => preAggsInstance.getPreAggBackoff(p.tableName))
623+
);
624+
625+
// Skip execution if any pre-aggregation is still in backoff window
626+
const shouldSkip = backoffChecks.some(backoffData => backoffData && now < backoffData.nextTimestamp);
627+
628+
if (!shouldSkip) {
629+
try {
630+
await orchestratorApi.executeQuery({ ...currentQuery, preAggregationsLoadCacheByDataSource });
631+
} catch (e: any) {
632+
// Check if this is a "Continue wait" error - these are normal queue signals
633+
// For Continue wait errors, re-throw to handle them in the normal flow
634+
if (e.error === 'Continue wait') {
635+
throw e;
636+
}
637+
638+
// Real datasource error - apply exponential backoff
639+
for (const p of currentQuery.preAggregations) {
640+
let backoffData = await preAggsInstance.getPreAggBackoff(p.tableName);
641+
642+
if (backoffData && backoffData.backoffMultiplier > 0) {
643+
const newMultiplier = backoffData.backoffMultiplier * 2;
644+
const delaySeconds = Math.min(newMultiplier, preAggsInstance.getPreAggBackoffMaxTime());
645+
646+
backoffData = {
647+
backoffMultiplier: newMultiplier,
648+
nextTimestamp: new Date(now.valueOf() + delaySeconds * 1000),
649+
};
650+
} else {
651+
backoffData = {
652+
backoffMultiplier: 1,
653+
nextTimestamp: new Date(now.valueOf() + 1000),
654+
};
655+
}
656+
657+
await preAggsInstance.updatePreAggBackoff(p.tableName, backoffData);
658+
}
659+
}
660+
}
619661
}
620662
const hasNext = await queryIterator.advance();
621663
if (!hasNext) {

packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,12 @@ class MockDriver extends BaseDriver {
248248

249249
private schema: any;
250250

251+
public shouldFailQuery: boolean = false;
252+
253+
public failQueryPattern: RegExp | null = null;
254+
255+
public queryAttempts: number = 0;
256+
251257
public constructor() {
252258
super();
253259
}
@@ -257,9 +263,22 @@ class MockDriver extends BaseDriver {
257263

258264
public query(query) {
259265
this.executedQueries.push(query);
266+
267+
// Track query attempts for backoff testing
268+
if (this.failQueryPattern && query.match(this.failQueryPattern)) {
269+
this.queryAttempts++;
270+
}
271+
260272
let promise: any = Promise.resolve([query]);
261273
promise = promise.then((res) => new Promise(resolve => setTimeout(() => resolve(res), 150)));
262274

275+
// Simulate query failure for backoff testing
276+
if (this.shouldFailQuery && this.failQueryPattern && query.match(this.failQueryPattern)) {
277+
promise = promise.then(() => {
278+
throw new Error('Simulated datasource error');
279+
});
280+
}
281+
263282
if (query.match(/min\(.*timestamp.*foo/)) {
264283
promise = promise.then(() => [{ min: '2020-12-27T00:00:00.000' }]);
265284
}
@@ -331,7 +350,11 @@ class MockDriver extends BaseDriver {
331350

332351
let testCounter = 1;
333352

334-
const setupScheduler = ({ repository, useOriginalSqlPreAggregations, skipAssertSecurityContext }: { repository: SchemaFileRepository, useOriginalSqlPreAggregations?: boolean, skipAssertSecurityContext?: true }) => {
353+
const setupScheduler = ({ repository, useOriginalSqlPreAggregations, skipAssertSecurityContext }: {
354+
repository: SchemaFileRepository,
355+
useOriginalSqlPreAggregations?: boolean,
356+
skipAssertSecurityContext?: true
357+
}) => {
335358
const mockDriver = new MockDriver();
336359
const externalDriver = new MockDriver();
337360

@@ -362,7 +385,7 @@ const setupScheduler = ({ repository, useOriginalSqlPreAggregations, skipAssertS
362385
return externalDriver;
363386
},
364387
orchestratorOptions: () => ({
365-
continueWaitTimeout: 0.1,
388+
continueWaitTimeout: 1,
366389
queryCacheOptions: {
367390
queueOptions: () => ({
368391
concurrency: 2,
@@ -1112,4 +1135,101 @@ describe('Refresh Scheduler', () => {
11121135
}
11131136
}
11141137
});
1138+
1139+
test('Exponential backoff', async () => {
1140+
process.env.CUBEJS_EXTERNAL_DEFAULT = 'false';
1141+
process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true';
1142+
process.env.CUBEJS_PRE_AGGREGATIONS_BACKOFF_MAX_TIME = '10'; // 10 seconds max backoff
1143+
1144+
const {
1145+
refreshScheduler, mockDriver, serverCore
1146+
} = setupScheduler({ repository: repositoryWithPreAggregations });
1147+
1148+
const ctx = { authInfo: { tenantId: 'tenant1' }, securityContext: { tenantId: 'tenant1' }, requestId: 'XXX' };
1149+
1150+
const orchestratorApi = await serverCore.getOrchestratorApi(ctx);
1151+
const preAggsInstance = orchestratorApi.getQueryOrchestrator().getPreAggregations();
1152+
1153+
// Target specific pre-aggregation: foo_first (all partitions)
1154+
// Scheduler processes multiple partitions: foo_first20201231, foo_first20201230, etc.
1155+
// Configure driver to fail only for foo_first table creation
1156+
mockDriver.shouldFailQuery = true;
1157+
mockDriver.failQueryPattern = /foo_first/;
1158+
1159+
// Run refresh until it tries to create foo_first table and fails
1160+
const queryIteratorState = {};
1161+
const maxIterations = 100;
1162+
for (let i = 0; i < maxIterations; i++) {
1163+
try {
1164+
await refreshScheduler.runScheduledRefresh(ctx, {
1165+
concurrency: 1,
1166+
workerIndices: [0],
1167+
timezones: ['UTC'],
1168+
queryIteratorState,
1169+
});
1170+
} catch (e) {
1171+
// Expected to fail when hitting foo_first
1172+
}
1173+
1174+
// Check if we started attempting to create foo_first table
1175+
if (mockDriver.queryAttempts > 0) {
1176+
break;
1177+
}
1178+
}
1179+
1180+
const initialAttempts = mockDriver.queryAttempts;
1181+
expect(initialAttempts).toBeGreaterThan(0);
1182+
1183+
// Wait for backoff to be set in storage (increased delay for async Redis writes)
1184+
await mockDriver.delay(1000);
1185+
1186+
// Find which foo_first partition has backoff set
1187+
// Scheduler may process different partitions (20201231, 20201230, etc.)
1188+
const possiblePartitions = ['20201231', '20201230', '20201229', '20201228', '20201227'];
1189+
let backoffData: { backoffMultiplier: number, nextTimestamp: Date } | null = null;
1190+
let targetTableName: string | null = null;
1191+
1192+
for (const partition of possiblePartitions) {
1193+
const tableName = `stb_pre_aggregations.foo_first${partition}`;
1194+
const data = await preAggsInstance.getPreAggBackoff(tableName);
1195+
if (data) {
1196+
backoffData = data;
1197+
targetTableName = tableName;
1198+
break;
1199+
}
1200+
}
1201+
1202+
// Verify backoff was set for at least one foo_first table
1203+
expect(backoffData).not.toBeNull();
1204+
expect(targetTableName).not.toBeNull();
1205+
// Initial backoff multiplier is 1 second
1206+
expect(backoffData!.backoffMultiplier).toBeGreaterThanOrEqual(1);
1207+
1208+
// Step 1: Immediate retry - should skip due to backoff (10-second window)
1209+
const beforeSkipAttempts = mockDriver.queryAttempts;
1210+
const immediateRetryCount = 5;
1211+
for (let i = 0; i < immediateRetryCount; i++) {
1212+
try {
1213+
await refreshScheduler.runScheduledRefresh(ctx, {
1214+
concurrency: 1,
1215+
workerIndices: [0],
1216+
timezones: ['UTC'],
1217+
queryIteratorState,
1218+
});
1219+
} catch (e) {
1220+
// Expected to skip due to backoff
1221+
}
1222+
}
1223+
1224+
// Query attempts should not increase significantly (skipped due to backoff)
1225+
// Allow some margin for other pre-aggregations processed by scheduler
1226+
expect(mockDriver.queryAttempts).toBeLessThanOrEqual(beforeSkipAttempts + 2);
1227+
1228+
// Step 2: Verify backoff persists - pre-aggregation is still in backoff after 500ms
1229+
await mockDriver.delay(500);
1230+
const backoffDataStillActive = await preAggsInstance.getPreAggBackoff(targetTableName!);
1231+
expect(backoffDataStillActive).not.toBeNull();
1232+
// backoffDataStillActive exists, which means backoff is still in place
1233+
// (nextTimestamp may be close to current time due to test execution delays)
1234+
});
11151235
});

0 commit comments

Comments
 (0)