-
Notifications
You must be signed in to change notification settings - Fork 253
impr(CLDSRV-852): Refactor WorkerTokenBucket #6108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: improvement/CLDSRV-852/refactor_cache
Are you sure you want to change the base?
Changes from all commits
3c5d00d
b6b5c39
d35971d
55c9d13
b40ff80
630def4
26aaf0d
bde0254
9b5ad23
605a606
1ef964d
a68c32e
5a26928
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,63 +1,66 @@ | ||
| const configCache = new Map(); | ||
|
|
||
| // Load tracking for adaptive burst capacity | ||
| // Map<bucketKey, Array<timestamp>> - rolling 1-second window | ||
| const requestTimestamps = new Map(); | ||
| const namespace = { | ||
| bucket: 'bkt', | ||
| }; | ||
|
|
||
| function setCachedConfig(key, limitConfig, ttl) { | ||
| function cacheSet(cache, key, value, ttl) { | ||
| const expiry = Date.now() + ttl; | ||
| configCache.set(key, { expiry, config: limitConfig }); | ||
| cache.set(key, { expiry, value }); | ||
| } | ||
|
|
||
| function getCachedConfig(key) { | ||
| const value = configCache.get(key); | ||
| if (value === undefined) { | ||
| function cacheGet(cache, key) { | ||
| const cachedValue = cache.get(key); | ||
| if (cachedValue === undefined) { | ||
| return undefined; | ||
| } | ||
|
|
||
| const { expiry, config } = value; | ||
| const { expiry, value } = cachedValue; | ||
| if (expiry <= Date.now()) { | ||
| configCache.delete(key); | ||
| cache.delete(key); | ||
| return undefined; | ||
| } | ||
|
|
||
| return config; | ||
| return value; | ||
| } | ||
|
|
||
| function cacheDelete(cache, key) { | ||
| cache.delete(key); | ||
| } | ||
|
|
||
| function expireCachedConfigs(now) { | ||
| function cacheExpire(cache) { | ||
| const now = Date.now(); | ||
|
|
||
| const toRemove = []; | ||
| for (const [key, { expiry }] of configCache.entries()) { | ||
| for (const [key, { expiry }] of cache.entries()) { | ||
| if (expiry <= now) { | ||
| toRemove.push(key); | ||
| } | ||
| } | ||
|
|
||
| for (const key of toRemove) { | ||
| configCache.delete(key); | ||
| cache.delete(key); | ||
| } | ||
|
|
||
| return toRemove.length; | ||
| } | ||
|
|
||
| /** | ||
| * Invalidate cached config for a specific bucket | ||
| * | ||
| * @param {string} bucketName - Bucket name | ||
| * @returns {boolean} True if entry was found and removed | ||
| */ | ||
| function invalidateCachedConfig(bucketName) { | ||
| const cacheKey = `bucket:${bucketName}`; | ||
| return configCache.delete(cacheKey); | ||
| function formatKeyDecorator(fn) { | ||
| return (resourceClass, resourceId, ...args) => fn(`${resourceClass}:${resourceId}`, ...args); | ||
| } | ||
|
|
||
| const getCachedConfig = formatKeyDecorator(cacheGet.bind(null, configCache)); | ||
| const setCachedConfig = formatKeyDecorator(cacheSet.bind(null, configCache)); | ||
| const deleteCachedConfig = formatKeyDecorator(cacheDelete.bind(null, configCache)); | ||
| const expireCachedConfigs = cacheExpire.bind(null, configCache); | ||
|
|
||
| module.exports = { | ||
| namespace, | ||
| setCachedConfig, | ||
| getCachedConfig, | ||
| expireCachedConfigs, | ||
| invalidateCachedConfig, | ||
|
|
||
| deleteCachedConfig, | ||
| // Do not access directly | ||
| // Used only for tests | ||
| configCache, | ||
| requestTimestamps, | ||
| }; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,38 +1,35 @@ | ||
| /** | ||
| * Calculate per-worker interval based on distributed architecture | ||
| * Calculate per-node interval based on distributed architecture | ||
| * | ||
| * In a distributed setup with N nodes and W workers per node: | ||
| * In a distributed setup with N nodes: | ||
| * - Global limit: R requests per second | ||
| * - Per-worker limit: R / N / W | ||
| * - Interval = 1000ms / (R / N / W) | ||
| * - Per-node limit: R / N | ||
| * - Interval = 1000ms / (R / N) | ||
| * | ||
| * The interval represents milliseconds between requests. We divide 1000 (milliseconds | ||
| * in a second) by the rate to convert "requests per second" to "milliseconds per request". | ||
| * | ||
| * Examples: | ||
| * - 100 req/s ÷ 1 node ÷ 10 workers = 10 req/s per worker → interval = 100ms | ||
| * - 600 req/s ÷ 6 nodes ÷ 10 workers = 10 req/s per worker → interval = 100ms | ||
| * - 100 req/s ÷ 1 node = 100 req/s per node → interval = 100ms | ||
| * - 600 req/s ÷ 6 nodes = 100 req/s per node → interval = 100ms | ||
| * | ||
| * Dynamic work-stealing is achieved through Redis sync reconciliation: | ||
| * - Each worker evaluates locally at its fixed per-worker quota | ||
| * - Workers report consumed / workers to Redis | ||
| * - Redis sums all workers' shares | ||
| * - Workers overwrite local counters with Redis values | ||
| * - Each worker evaluates locally using preallocated tokens | ||
| * - Workers report processed requests to Redis | ||
| * - Redis sums all workers' requests | ||
| * - Idle workers' unused capacity accumulates in Redis | ||
| * - Busy workers pull back higher emptyAt values and throttle proportionally | ||
| * | ||
tmacro marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * IMPORTANT: Limit must be >= N * W, otherwise per-worker rate < 1 req/s | ||
| * IMPORTANT: Limit must be >= N, otherwise per-node rate < 1 req/s | ||
| * which results in intervals > 1000ms and effectively blocks traffic. | ||
| * | ||
| * @param {number} limit - Global requests per second | ||
| * @param {number} nodes - Total number of nodes | ||
| * @param {number} _workers - Number of workers per node (unused in token reservation) | ||
| * @returns {number} Interval in milliseconds between requests | ||
| */ | ||
| // eslint-disable-next-line no-unused-vars | ||
| function calculateInterval(limit, nodes, _workers) { | ||
| // Per-node rate = limit / nodes (workers NOT divided) | ||
| // This allows dynamic work-stealing - workers evaluate at node quota | ||
|
|
||
| function calculateInterval(limit, nodes) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| // Per-node rate = limit / nodes | ||
| const perNodeRate = limit / nodes; | ||
|
|
||
| // Interval = 1000ms / rate | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The examples are incorrect after removing the workers divisor.
100 req/s ÷ 1 node = 100 req/s per node → interval = 1000/100 = 10ms, not 100ms. Similarly600 req/s ÷ 6 nodes = 100 req/s per node → interval = 10ms, not 100ms. These values were correct when workers (10) was part of the formula but weren't updated.— Claude Code