Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
3bae24c
UN-1798 [PERF] Optimize dashboard metrics aggregation task
athul-rs Mar 8, 2026
27b1575
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 8, 2026
fa38979
Merge branch 'main' into UN-1798/optimize-metrics-aggregation
athul-rs Mar 10, 2026
db14aa2
Fix redundant LLM queries and daily_start boundary bug
athul-rs Mar 10, 2026
10b6b0d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 10, 2026
19bc235
Merge branch 'main' into UN-1798/optimize-metrics-aggregation
athul-rs Mar 10, 2026
6341fca
Reduce cognitive complexity in tasks and views
athul-rs Mar 10, 2026
d20a95a
Extract _fetch_live_series to reduce live_series complexity
athul-rs Mar 10, 2026
0cece48
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 10, 2026
3609c9e
Fix metric_type for histogram metrics in views
athul-rs Mar 10, 2026
fdc791f
Merge branch 'main' into UN-1798/optimize-metrics-aggregation
athul-rs Mar 17, 2026
dac10fd
Merge branch 'main' into UN-1798/optimize-metrics-aggregation
athul-rs Mar 31, 2026
39cecac
Address review feedback on metrics aggregation
athul-rs Mar 31, 2026
6a461c4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Mar 31, 2026
31b85f3
Merge branch 'main' into UN-1798/optimize-metrics-aggregation
kirtimanmishrazipstack Mar 31, 2026
4c00b46
Retry lock acquisition when lock expires between add and get
athul-rs Mar 31, 2026
fb40a3b
Merge branch 'main' into UN-1798/optimize-metrics-aggregation
athul-rs Mar 31, 2026
04e4267
chore: retrigger CI checks
athul-rs Mar 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 129 additions & 58 deletions backend/dashboard_metrics/management/commands/backfill_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import logging
from datetime import datetime, timedelta
from typing import Any

from account_v2.models import Organization
from django.core.management.base import BaseCommand
Expand Down Expand Up @@ -41,25 +42,30 @@ class Command(BaseCommand):
help = "Backfill metrics from source tables into aggregated tables"

# Metric configurations: (name, query_method, is_histogram)
# LLM metrics are handled separately via get_llm_metrics_split().
METRIC_CONFIGS = [
("documents_processed", MetricsQueryService.get_documents_processed, False),
("pages_processed", MetricsQueryService.get_pages_processed, True),
("llm_calls", MetricsQueryService.get_llm_calls, False),
("challenges", MetricsQueryService.get_challenges, False),
("summarization_calls", MetricsQueryService.get_summarization_calls, False),
("deployed_api_requests", MetricsQueryService.get_deployed_api_requests, False),
(
"etl_pipeline_executions",
MetricsQueryService.get_etl_pipeline_executions,
False,
),
("llm_usage", MetricsQueryService.get_llm_usage_cost, True),
("prompt_executions", MetricsQueryService.get_prompt_executions, False),
("failed_pages", MetricsQueryService.get_failed_pages, True),
("hitl_reviews", MetricsQueryService.get_hitl_reviews, False),
("hitl_completions", MetricsQueryService.get_hitl_completions, False),
]

# LLM metric names and whether they are histograms
LLM_METRIC_TYPES: dict[str, bool] = {
"llm_calls": False,
"challenges": False,
"summarization_calls": False,
"llm_usage": True,
}

def add_arguments(self, parser):
parser.add_argument(
"--days",
Expand Down Expand Up @@ -139,15 +145,32 @@ def handle(self, *args, **options):
"errors": 0,
}

# Pre-resolve org identifiers for PageUsage queries (avoids
# redundant Organization lookups inside the metric query loop).
org_identifiers = dict(
Organization.objects.filter(
id__in=[int(oid) if oid.isdigit() else oid for oid in org_ids]
).values_list("id", "organization_id")
)

for i, current_org_id in enumerate(org_ids):
self.stdout.write(
f"\n[{i + 1}/{len(org_ids)}] Processing org: {current_org_id}"
)

try:
# Resolve org string identifier for PageUsage queries
org_id_key = (
int(current_org_id) if current_org_id.isdigit() else current_org_id
)
org_identifier = org_identifiers.get(org_id_key)

# Collect all metric data for this org
hourly_data, daily_data, monthly_data = self._collect_metrics(
current_org_id, start_date, end_date
current_org_id,
start_date,
end_date,
org_identifier=org_identifier,
)

self.stdout.write(
Expand Down Expand Up @@ -245,73 +268,121 @@ def _resolve_org_ids(
return sorted(str(oid) for oid in all_org_ids)

def _collect_metrics(
self, org_id: str, start_date: datetime, end_date: datetime
self,
org_id: str,
start_date: datetime,
end_date: datetime,
org_identifier: str | None = None,
) -> tuple[dict, dict, dict]:
"""Collect metrics from source tables for all granularities."""
hourly_agg = {}
daily_agg = {}
monthly_agg = {}

def _ingest_results(
results: list[dict[str, Any]],
metric_name: str,
metric_type: str,
) -> None:
"""Ingest query results into hourly/daily/monthly aggregation dicts."""
for row in results:
period = row["period"]
value = row["value"] or 0
hour_ts = self._truncate_to_hour(period)
key = (org_id, hour_ts.isoformat(), metric_name, "default", "")

if key not in hourly_agg:
hourly_agg[key] = {
"metric_type": metric_type,
"value": 0,
"count": 0,
}
hourly_agg[key]["value"] += value
hourly_agg[key]["count"] += 1

def _ingest_daily_results(
results: list[dict[str, Any]],
metric_name: str,
metric_type: str,
) -> None:
"""Ingest daily query results into daily and monthly aggregation dicts."""
for row in results:
period = row["period"]
value = row["value"] or 0
day_date = period.date() if hasattr(period, "date") else period
key = (org_id, day_date.isoformat(), metric_name, "default", "")

if key not in daily_agg:
daily_agg[key] = {
"metric_type": metric_type,
"value": 0,
"count": 0,
}
daily_agg[key]["value"] += value
daily_agg[key]["count"] += 1

# Monthly aggregation from daily results
if hasattr(period, "date"):
month_date = period.replace(day=1).date()
else:
month_date = period.replace(day=1)
mkey = (org_id, month_date.isoformat(), metric_name, "default", "")

if mkey not in monthly_agg:
monthly_agg[mkey] = {
"metric_type": metric_type,
"value": 0,
"count": 0,
}
monthly_agg[mkey]["value"] += value
monthly_agg[mkey]["count"] += 1

# Fetch all 4 LLM metrics in one query per granularity
try:
for granularity in (Granularity.HOUR, Granularity.DAY):
llm_split = MetricsQueryService.get_llm_metrics_split(
org_id, start_date, end_date, granularity
)
for metric_name, data in llm_split.items():
is_histogram = self.LLM_METRIC_TYPES[metric_name]
metric_type = (
MetricType.HISTOGRAM if is_histogram else MetricType.COUNTER
)
if granularity == Granularity.HOUR:
_ingest_results(data, metric_name, metric_type)
else:
_ingest_daily_results(data, metric_name, metric_type)
except Exception as e:
logger.warning("Error querying LLM metrics for org %s: %s", org_id, e)

# Fetch remaining (non-LLM) metrics individually
for metric_name, query_method, is_histogram in self.METRIC_CONFIGS:
metric_type = MetricType.HISTOGRAM if is_histogram else MetricType.COUNTER

# Pass org_identifier to PageUsage-based metrics to
# avoid redundant Organization lookups per call.
extra_kwargs: dict[str, Any] = {}
if metric_name == "pages_processed":
extra_kwargs["org_identifier"] = org_identifier

try:
# Query hourly data
hourly_results = query_method(
org_id, start_date, end_date, granularity=Granularity.HOUR
org_id,
start_date,
end_date,
granularity=Granularity.HOUR,
**extra_kwargs,
)
for row in hourly_results:
period = row["period"]
value = row["value"] or 0
hour_ts = self._truncate_to_hour(period)
key = (org_id, hour_ts.isoformat(), metric_name, "default", "")

if key not in hourly_agg:
hourly_agg[key] = {
"metric_type": metric_type,
"value": 0,
"count": 0,
}
hourly_agg[key]["value"] += value
hourly_agg[key]["count"] += 1

# Query daily data
_ingest_results(hourly_results, metric_name, metric_type)

daily_results = query_method(
org_id, start_date, end_date, granularity=Granularity.DAY
org_id,
start_date,
end_date,
granularity=Granularity.DAY,
**extra_kwargs,
)
for row in daily_results:
period = row["period"]
value = row["value"] or 0
day_date = period.date() if hasattr(period, "date") else period
key = (org_id, day_date.isoformat(), metric_name, "default", "")

if key not in daily_agg:
daily_agg[key] = {
"metric_type": metric_type,
"value": 0,
"count": 0,
}
daily_agg[key]["value"] += value
daily_agg[key]["count"] += 1

# Query for monthly (aggregate daily results by month)
for row in daily_results:
period = row["period"]
value = row["value"] or 0
if hasattr(period, "date"):
month_date = period.replace(day=1).date()
else:
month_date = period.replace(day=1)
key = (org_id, month_date.isoformat(), metric_name, "default", "")

if key not in monthly_agg:
monthly_agg[key] = {
"metric_type": metric_type,
"value": 0,
"count": 0,
}
monthly_agg[key]["value"] += value
monthly_agg[key]["count"] += 1
_ingest_daily_results(daily_results, metric_name, metric_type)

except Exception as e:
logger.warning("Error querying %s for org %s: %s", metric_name, org_id, e)
Expand Down
Loading
Loading