Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 107 additions & 47 deletions workers/limiter/src/dbHelper.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
import { Collection, Db, ObjectId } from 'mongodb';
import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types';
import { PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types';
import { WorkspaceWithTariffPlan } from '../types';
import HawkCatcher from '@hawk.so/nodejs';
import { CriticalError, NonCriticalError } from '../../../lib/workerErrors';

const WORKSPACE_PROJECTION = {
_id: 1,
name: 1,
isBlocked: 1,
blockedDate: 1,
lastChargeDate: 1,
billingPeriodEventsCount: 1,
tariffPlanId: 1,
} as const;

type WorkspaceForLimiter = Pick<
WorkspaceDBScheme,
'_id' | 'name' | 'isBlocked' | 'blockedDate' | 'lastChargeDate' | 'billingPeriodEventsCount' | 'tariffPlanId'
>;

/**
* Class that implements methods used for interaction between limiter and db
*/
Expand All @@ -23,15 +38,43 @@ export class DbHelper {
*/
private workspacesCollection: Collection<WorkspaceDBScheme>;

/**
* Collection with tariff plans
*/
private plansCollection: Collection<PlanDBScheme>;

/**
* In-memory cache of tariff plans — avoids $lookup on the small plans collection per workspace
*/
private plans: PlanDBScheme[] = [];

/**
* @param projects - projects collection
* @param workspaces - workspaces collection
* @param plans - plans collection
* @param eventsDbConnection - connection to events DB
*/
constructor(projects: Collection<ProjectDBScheme>, workspaces: Collection<WorkspaceDBScheme>, eventsDbConnection: Db) {
constructor(
projects: Collection<ProjectDBScheme>,
workspaces: Collection<WorkspaceDBScheme>,
plans: Collection<PlanDBScheme>,
eventsDbConnection: Db
) {
this.eventsDbConnection = eventsDbConnection;
this.projectsCollection = projects;
this.workspacesCollection = workspaces;
this.plansCollection = plans;
}

/**
* Fetches tariff plans from database and keeps them cached
*/
public async fetchPlans(): Promise<void> {
this.plans = await this.plansCollection.find({}).toArray();

if (this.plans.length === 0) {
throw new CriticalError('Please add tariff plans to the database');
}
}

/**
Expand Down Expand Up @@ -148,71 +191,88 @@ export class DbHelper {
return this.projectsCollection.find(query).toArray();
}

/**
* Returns plan from cache, refetches once on miss
*
* @param planId - id of the plan to find
*/
private async resolvePlan(planId: WorkspaceDBScheme['tariffPlanId']): Promise<PlanDBScheme | null> {
let plan = this.findPlanById(planId);

if (plan) {
return plan;
}

await this.fetchPlans();
plan = this.findPlanById(planId);

return plan ?? null;
}

/**
* @param planId - id of the plan to find
*/
private findPlanById(planId: WorkspaceDBScheme['tariffPlanId']): PlanDBScheme | undefined {
return this.plans.find((plan) => plan._id.toString() === planId.toString());
}

/**
* Returns a single workspace with its tariff plan by id
*
* @param id - workspace id
*/
private async getOneWorkspaceWithTariffPlan(id: string): Promise<WorkspaceWithTariffPlan> {
const pipeline = [
{
$match: {
_id: new ObjectId(id),
},
},
...this.tariffPlanLookupPipeline(),
];

const workspace = await this.workspacesCollection.aggregate<WorkspaceWithTariffPlan>(pipeline).next();
const workspace = await this.workspacesCollection
.find({ _id: new ObjectId(id) })
.project<WorkspaceForLimiter>(WORKSPACE_PROJECTION)
.next();

if (workspace === null) {
throw new NonCriticalError(`Workspace ${id} not found`, {
workspaceId: id,
});
}

return workspace;
const plan = await this.resolvePlan(workspace.tariffPlanId);

if (!plan) {
throw new NonCriticalError(`Tariff plan ${workspace.tariffPlanId.toString()} not found for workspace ${id}`, {
workspaceId: id,
});
}

return {
...workspace,
tariffPlan: plan,
};
}

/**
* Yields all workspaces with their tariff plans one by one
*/
private async * yieldWorkspacesWithTariffPlans(): AsyncGenerator<WorkspaceWithTariffPlan> {
const pipeline = this.tariffPlanLookupPipeline();
const cursor = this.workspacesCollection.aggregate<WorkspaceWithTariffPlan>(pipeline);
const cursor = this.workspacesCollection
.find({})
.project<WorkspaceForLimiter>(WORKSPACE_PROJECTION);

for await (const workspace of cursor) {
yield workspace;
}
}
const plan = await this.resolvePlan(workspace.tariffPlanId);

/* eslint-disable-next-line */
private tariffPlanLookupPipeline(): any[] {
return [
{
$lookup: {
from: 'plans',
localField: 'tariffPlanId',
foreignField: '_id',
as: 'tariffPlan',
},
},
{
$unwind: {
path: '$tariffPlan',
},
},
{
$project: {
_id: 1,
name: 1,
isBlocked: 1,
blockedDate: 1,
lastChargeDate: 1,
billingPeriodEventsCount: 1,
tariffPlan: 1,
},
},
];
if (!plan) {
HawkCatcher.send(
new Error(`[Limiter] Tariff plan not found for workspace`),
{
workspaceId: workspace._id.toString(),
tariffPlanId: workspace.tariffPlanId?.toString(),
}
);
continue;
}

yield {
...workspace,
tariffPlan: plan,
};
}
}
}
}
7 changes: 5 additions & 2 deletions workers/limiter/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Worker } from '../../../lib/worker';
import * as pkg from '../package.json';
import * as path from 'path';
import * as dotenv from 'dotenv';
import { ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types';
import { PlanDBScheme, ProjectDBScheme, WorkspaceDBScheme } from '@hawk.so/types';
import HawkCatcher from '@hawk.so/nodejs';
import { MS_IN_SEC } from '../../../lib/utils/consts';
import LimiterEvent, { BlockWorkspaceEvent, UnblockWorkspaceEvent } from '../types/eventTypes';
Expand Down Expand Up @@ -68,8 +68,11 @@ export default class LimiterWorker extends Worker {

const projectsCollection = accountDbConnection.collection<ProjectDBScheme>('projects');
const workspacesCollection = accountDbConnection.collection<WorkspaceDBScheme>('workspaces');
const plansCollection = accountDbConnection.collection<PlanDBScheme>('plans');

this.dbHelper = new DbHelper(projectsCollection, workspacesCollection, eventsDbConnection);
this.dbHelper = new DbHelper(projectsCollection, workspacesCollection, plansCollection, eventsDbConnection);

await this.dbHelper.fetchPlans();

await this.redis.initialize();

Expand Down
3 changes: 2 additions & 1 deletion workers/limiter/tests/dbHelper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ describe('DbHelper', () => {
await planCollection.deleteMany({});
await planCollection.insertMany(Object.values(mockedPlans));

dbHelper = new DbHelper(projectCollection, workspaceCollection, db);
dbHelper = new DbHelper(projectCollection, workspaceCollection, planCollection, db);
await dbHelper.fetchPlans();
}, 30000); // 30 seconds timeout for MongoDB connection and setup

beforeEach(async () => {
Expand Down
Loading