Skip to content

feat(report): add --threads option to fetch report data in parallel#2197

Open
dtaniwaki wants to merge 2 commits into
elementary-data:masterfrom
dtaniwaki:feat/parallel-report-fetchers
Open

feat(report): add --threads option to fetch report data in parallel#2197
dtaniwaki wants to merge 2 commits into
elementary-data:masterfrom
dtaniwaki:feat/parallel-report-fetchers

Conversation

@dtaniwaki
Copy link
Copy Markdown

@dtaniwaki dtaniwaki commented Apr 22, 2026

Hi maintainers, thanks for the great work on Elementary! I'd appreciate a review when you get a chance.

Problem

Generating the Elementary report can be slow on larger projects because ReportAPI.get_report_data issues a long sequence of dbt run-operation calls (models, sources, seeds, snapshots, tests, test results, source freshness, invocations, coverages, lineage, …) one after another. Each call has a non-trivial dbt startup cost on top of the actual query time, so end-to-end report generation is bottlenecked on serial fetch latency even though most of these fetches are independent of each other.

Two things make this tricky to parallelize today:

  1. The default in-process APIDbtRunner relies on dbt globals (GLOBAL_FLAGS, the adapter FACTORY, etc.) that are not safe to drive from multiple threads.
  2. The existing flow interleaves fetching and post-processing, so it isn't obvious which calls can actually run concurrently.

Solution

This PR adds an opt-in --threads N option (default 1, preserving current behavior) to both edr report and edr send-report.

When threads > 1:

  • A SubprocessDbtRunner is used for the parallel fetches. Each call spawns an independent dbt process, sidestepping the in-process global state and making it safe to drive concurrently from a ThreadPoolExecutor.
  • get_report_data is split into phases by data dependency:
    • Phase 1 — fully independent fetches (models, sources, seeds, snapshots, tests, test/source-freshness results & runs, coverages, invocations, …) run in parallel.
    • Phase 2 — fetches that need Phase 1 IDs (exposures from upstream node IDs, test_results keyed by the test invocation) run in parallel.
    • Phase 3lineage is fetched once Phase 2 node IDs are known.
    • Phase 4 — pure in-memory assembly (no dbt calls), shared with the sequential path via a new _assemble_report_data helper.
flowchart TD
    Start([edr report --threads N]) --> P1
    subgraph P1[Phase 1: independent fetches in parallel]
        direction LR
        M[models] ~~~ S[sources] ~~~ Se[seeds] ~~~ Sn[snapshots] ~~~ T[tests] ~~~ TR[test runs] ~~~ SFR[source freshness runs] ~~~ C[coverages] ~~~ I[invocations]
    end
    P1 --> P2
    subgraph P2[Phase 2: needs Phase 1 IDs, in parallel]
        direction LR
        E[exposures] ~~~ TRes[test_results]
    end
    P2 --> P3
    subgraph P3[Phase 3: needs Phase 2 node IDs]
        L[lineage]
    end
    P3 --> P4[Phase 4: in-memory assembly]
    P4 --> End([report data])
Loading

The sequential path is unchanged when --threads 1 (the default), so users who don't opt in see no behavior change.

Example:

edr report --threads 8
edr send-report --slack-token ... --threads 8

A unit test (tests/unit/monitor/api/report/test_report_parallel.py) covers the parallel orchestration and confirms the assembled output matches the sequential path.

Happy to adjust the option name, default, or factor things differently if you'd prefer a different shape.

@github-actions
Copy link
Copy Markdown
Contributor

👋 @dtaniwaki
Thank you for raising your pull request.
Please make sure to add tests and document all user-facing changes.
You can do this by editing the docs files in this pull request.

@dtaniwaki dtaniwaki requested a deployment to elementary_test_env April 22, 2026 13:16 — with GitHub Actions Waiting
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 22, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds an optional threads parameter propagated from CLI to report generation; ReportAPI.get_report_data(..., threads) now dispatches to sequential or phased ThreadPoolExecutor parallel flows, introduces _create_subprocess_runner() for thread-safe dbt runs, and centralizes report assembly.

Changes

Parallel report generation

Layer / File(s) Summary
Imports and logger
elementary/monitor/api/report/report.py
Import ThreadPoolExecutor and SubprocessDbtRunner, and initialize logger = get_logger(__name__).
Subprocess runner constructor
elementary/monitor/api/report/report.py
Add _create_subprocess_runner() to construct a SubprocessDbtRunner from the existing dbt_runner configuration for thread-safe use.
get_report_data branching
elementary/monitor/api/report/report.py
Add threads parameter to ReportAPI.get_report_data() and route to _get_report_data_sequential(...) when threads==1 or _get_report_data_parallel(...) when threads>1.
Sequential flow refactor
elementary/monitor/api/report/report.py
Refactor sequential path to fetch tests/freshness/lineage then delegate final assembly to _assemble_report_data(...).
Parallel phased executor
elementary/monitor/api/report/report.py
Add _get_report_data_parallel(...) using a phased ThreadPoolExecutor: phase 1 independent fetches, phase 2 dependent exposures/tests, phase 3 lineage, then call _assemble_report_data(...).
Central assembly
elementary/monitor/api/report/report.py
Add _assemble_report_data(...) to union/serialize test results/runs/freshness, compute totals and filters, build invocations/resources_latest_invocation, and return ReportDataSchema.
CLI Threading Options
elementary/monitor/cli.py
Add --threads (click.IntRange(min=1), default 1) to monitor report and monitor send-report, and forward the value to report methods.
Data Monitoring Report Forwarding
elementary/monitor/data_monitoring/report/data_monitoring_report.py
Propagate threads through generate_report(), get_report_data(), and send_report() into ReportAPI.get_report_data(...).
Tests: Parallel Behavior
tests/unit/monitor/api/report/test_report_parallel.py
Add tests for _create_subprocess_runner(), routing of get_report_data(threads=...), executor usage and submit/result flow in _get_report_data_parallel, _assemble_report_data() invocation, and exception propagation.

Sequence Diagram(s)

sequenceDiagram
    participant CLI as CLI
    participant DMR as DataMonitoringReport
    participant ReportAPI as ReportAPI
    participant Executor as ThreadPoolExecutor
    participant DBT as SubprocessDbtRunner
    participant APIs as External_APIs

    CLI->>DMR: generate_report(threads=N)
    DMR->>ReportAPI: get_report_data(threads=N)
    alt threads > 1
        ReportAPI->>Executor: start phased parallel fetches
        Executor->>DBT: _create_subprocess_runner() (per-thread runner)
        Executor->>APIs: fetch models/tests/source_freshnesses/invocations (phase 1, parallel)
        APIs-->>Executor: results
        Executor->>APIs: fetch dependent exposures/tests (phase 2)
        APIs-->>Executor: results
        Executor->>APIs: fetch lineage (phase 3)
        APIs-->>Executor: lineage
    else
        ReportAPI->>DBT: sequential dbt/API calls
        DBT-->>ReportAPI: results
        APIs-->>ReportAPI: results
    end
    ReportAPI->>ReportAPI: _assemble_report_data(all fetched datasets)
    ReportAPI-->>DMR: ReportDataSchema, err?
    DMR-->>CLI: output/send report
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Poem

🐇 Threads hop out across the glade,
Little runners in a tidy parade,
Parallel paws or single-file cheer,
Subprocess carrots drawing near,
A snug report arrives—hip, hop, hooray!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 12.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Title check ✅ Passed The title accurately summarizes the main change: adding a --threads option to enable parallel fetching of report data.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
elementary/monitor/api/report/report.py (1)

451-493: Reuse _build_report_data in the parallel path.

The parallel path still duplicates most of the new builder logic, so the two execution modes can drift in serialized shape or invocation handling. Consider letting _build_report_data accept optional pre-fetched invocation data, then call it from both paths.

♻️ Directional refactor
 def _build_report_data(
     self,
     ...
     filters,
     invocations_api,
+    models_latest_invocation=None,
+    invocations=None,
 ) -> Tuple[ReportDataSchema, Optional[Exception]]:
@@
-    models_latest_invocation = invocations_api.get_models_latest_invocation()
-    invocations = invocations_api.get_models_latest_invocations_data()
+    if models_latest_invocation is None:
+        models_latest_invocation = invocations_api.get_models_latest_invocation()
+    if invocations is None:
+        invocations = invocations_api.get_models_latest_invocations_data()

Then _get_report_data_parallel can pass the already-fetched models_latest_invocation and invocations_data into the same builder.

Also applies to: 498-593

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@elementary/monitor/api/report/report.py` around lines 451 - 493, The parallel
path duplicates the ReportDataSchema-building logic; change _build_report_data
to accept optional parameters for pre-fetched invocation data (e.g.,
invocations_data and models_latest_invocation) and use those instead of redoing
serialization when provided, keeping existing behavior by making them optional
with defaults; then update _get_report_data_parallel to call _build_report_data
passing the already-fetched models_latest_invocation and invocations_data and
remove the duplicated serialization/ReportDataSchema construction (the
serializable_* variables and ReportDataSchema instantiation should be produced
only by _build_report_data), ensuring all callers that don't pass the new args
continue to work.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@elementary/monitor/api/report/report.py`:
- Around line 76-95: Multiple concurrent dbt subprocesses created by
_create_subprocess_runner/SubprocessDbtRunner can write to the same artifact/log
directories (DBT_LOG_PATH and dbt_project.yml's target-path), causing race
conditions; update _create_subprocess_runner to inject per-subprocess isolation
by cloning the runner.env_vars and adding a unique artifact/log path (e.g., set
DBT_LOG_PATH to a worker-specific temp dir) and also ensure a per-subprocess
target path is provided (either via an environment variable or by passing a
config override) so that run_operation-created subprocesses do not share the
same project_dir artifacts.

In `@elementary/monitor/cli.py`:
- Around line 452-458: Replace the loose integer type on the "--threads" Click
option with a ranged validator so zero and negative values are rejected at the
CLI: change the option declaration for the "--threads" argument used by the
monitor-report and monitor-send-report commands from type=int to
type=click.IntRange(min=1) (or type=click.IntRange(1, None)) so Click will
enforce threads >= 1 and surface an immediate error to the user.

---

Nitpick comments:
In `@elementary/monitor/api/report/report.py`:
- Around line 451-493: The parallel path duplicates the
ReportDataSchema-building logic; change _build_report_data to accept optional
parameters for pre-fetched invocation data (e.g., invocations_data and
models_latest_invocation) and use those instead of redoing serialization when
provided, keeping existing behavior by making them optional with defaults; then
update _get_report_data_parallel to call _build_report_data passing the
already-fetched models_latest_invocation and invocations_data and remove the
duplicated serialization/ReportDataSchema construction (the serializable_*
variables and ReportDataSchema instantiation should be produced only by
_build_report_data), ensuring all callers that don't pass the new args continue
to work.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 16bd31bf-041a-4378-9f87-ea58c7ebec88

📥 Commits

Reviewing files that changed from the base of the PR and between e5af7e7 and 9e9c925.

📒 Files selected for processing (3)
  • elementary/monitor/api/report/report.py
  • elementary/monitor/cli.py
  • elementary/monitor/data_monitoring/report/data_monitoring_report.py

Comment thread elementary/monitor/api/report/report.py
Comment thread elementary/monitor/cli.py
@dtaniwaki dtaniwaki requested a deployment to elementary_test_env April 22, 2026 13:32 — with GitHub Actions Waiting
@dtaniwaki dtaniwaki requested a deployment to elementary_test_env April 22, 2026 13:33 — with GitHub Actions Waiting
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (5)
elementary/monitor/api/report/report.py (5)

239-239: Inconsistent internal default for threads.

_get_report_data_parallel defaults threads=4 while the public get_report_data defaults to threads=1. Since _get_report_data_parallel is only reached when threads > 1 via the router, the internal default is effectively dead code and can mislead future callers. Consider removing the default or aligning it with the router semantics (threads >= 2).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@elementary/monitor/api/report/report.py` at line 239, The internal default
for threads on _get_report_data_parallel (threads: int = 4) conflicts with the
public get_report_data default (threads=1) and is effectively dead; update
_get_report_data_parallel to remove the misleading default (make threads a
required parameter) or set it to align with router semantics (e.g., threads: int
= 2 or threads: int >= 2) so callers and future maintainers see consistent
behavior—modify the signature of _get_report_data_parallel and any internal
calls (and the router logic that branches to _get_report_data_parallel) to pass
the explicit threads value from get_report_data or the router.

340-368: Phase 3/4 unnecessarily use SubprocessDbtRunner for sequential calls.

Both LineageAPI (line 340) and FiltersAPI (line 365) are instantiated with parallel_runner but invoked sequentially — a single call each. Using SubprocessDbtRunner incurs process-spawn overhead without any parallelism benefit, and it diverges from _get_report_data_sequential which uses self.dbt_runner (the in-process runner) for these same APIs. Suggest using self.dbt_runner here for parity and performance.

♻️ Suggested fix
             # Phase 3: lineage depends on all node IDs
-            lineage = LineageAPI(dbt_runner=parallel_runner).get_lineage(
+            lineage = LineageAPI(dbt_runner=self.dbt_runner).get_lineage(
                 lineage_node_ids, exclude_elementary_models
             )
@@
-                filters_api=FiltersAPI(dbt_runner=parallel_runner),
+                filters_api=FiltersAPI(dbt_runner=self.dbt_runner),
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@elementary/monitor/api/report/report.py` around lines 340 - 368, LineageAPI
and FiltersAPI are being constructed with parallel_runner (SubprocessDbtRunner)
even though they are used sequentially; change their instantiation to use the
in-process runner self.dbt_runner instead of parallel_runner to avoid
unnecessary subprocess overhead — update the
LineageAPI(dbt_runner=parallel_runner) call and the
FiltersAPI(dbt_runner=parallel_runner) call to use dbt_runner=self.dbt_runner
(or equivalent parameter name) before passing the resulting lineage and
filters_api into _assemble_report_data.

369-370: Parallel path swallows exceptions without logging.

A module-level logger was just added (line 46) but the except Exception as error block here returns the error without logging it. In the parallel path, exceptions can originate from worker threads/subprocesses and are non-trivial to attribute without a log line. Suggest logger.exception(...) to preserve the traceback for diagnosis (same applies to lines 225-226 in the sequential path, but the new logger availability makes this a good moment to address both).

♻️ Suggested fix
         except Exception as error:
+            logger.exception("Parallel report data fetch failed")
             return ReportDataSchema(), error
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@elementary/monitor/api/report/report.py` around lines 369 - 370, The
parallel-path except block currently returns ReportDataSchema(), error without
logging; update the handler to call logger.exception("Failed to build report
(parallel path): %s", error) (or logger.exception with a clear message) before
returning to preserve traceback; do the same change in the sequential-path
except that returns ReportDataSchema(), error (use a distinct message like
"Failed to build report (sequential path)"); reference the existing logger
object and the ReportDataSchema/ error variables so you only add
logger.exception(...) prior to the return.

372-396: Missing type annotations on _assemble_report_data.

All 22 parameters lack type hints, which is inconsistent with the rest of the class (e.g., _get_groups, _serialize_*). Given the complexity of the argument list (mixed Dict, List, Schema types), annotations would meaningfully improve readability and catch mismatches between sequential/parallel callers at type-check time.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@elementary/monitor/api/report/report.py` around lines 372 - 396, The
_assemble_report_data function is missing type annotations for all parameters;
add explicit, precise type hints for each argument (e.g., days_back: int,
project_name: str, env: str, warehouse_type: str, seeds: List[SeedSchema] or
Dict[str, SeedSchema] as appropriate, snapshots: List[SnapshotSchema], models:
List[ModelSchema], sources: List[SourceSchema], exposures: List[ExposureSchema],
singular_tests: List[TestSchema], models_runs: Dict[str, InvocationSchema] or
List[InvocationSchema], coverages: CoverageSchema or Dict[str, CoverageSchema],
tests: List[TestSchema], test_invocation: Optional[InvocationSchema],
test_results: List[TestResultSchema], source_freshness_results:
List[FreshnessResultSchema], test_runs: List[RunSchema], source_freshness_runs:
List[RunSchema], lineage: LineageSchema, filters_api: FiltersApiSchema,
models_latest_invocation: Optional[InvocationSchema], invocations_data:
InvocationsDataSchema) matching existing project schemas/types and keep the
return type Tuple[ReportDataSchema, Optional[Exception]]; update any imports
and/or type aliases used by _assemble_report_data to ensure type-checking passes
and callers align with these annotations.

277-285: Use keyword arguments for thread pool submissions to prevent silent breakage if method signatures drift.

The sequential path invokes get_models_runs(days_back=..., exclude_elementary_models=...) (lines 179–181) and get_test_results(invocation_id=..., disable_samples=...) (lines 187–190) with keyword arguments. The parallel path at lines 278–281 and 329–333 relies on positional order. While the current signatures match, positional arguments create a silent failure risk: if either method's signature changes or parameters are reordered, the calls will invoke incorrectly without raising a TypeError. Use functools.partial or inline lambda with explicit keyword arguments for safety.

♻️ Suggested fix
                 f_models_runs = pool.submit(
-                    _new_models_api().get_models_runs,
-                    days_back,
-                    exclude_elementary_models,
+                    lambda: _new_models_api().get_models_runs(
+                        days_back=days_back,
+                        exclude_elementary_models=exclude_elementary_models,
+                    )
                 )

And similarly for f_test_results (lines 329–333):

                 f_test_results = pool.submit(
-                    _new_tests_api().get_test_results,
-                    test_invocation.invocation_id,
-                    disable_samples,
+                    lambda: _new_tests_api().get_test_results(
+                        invocation_id=test_invocation.invocation_id,
+                        disable_samples=disable_samples,
+                    )
                 )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@elementary/monitor/api/report/report.py` around lines 277 - 285, The parallel
pool.submit calls (e.g., f_models_runs, f_coverages, f_tests, f_test_invocation
and the later f_test_results) pass positional args to
_new_models_api().get_models_runs, _new_tests_api().get_tests and
_new_invocations_api().get_test_invocation_from_filter which risks silent
breakage if signatures change; update each pool.submit to pass keyword arguments
(use functools.partial(fn, days_back=..., exclude_elementary_models=...) or a
lambda that calls fn(days_back=..., exclude_elementary_models=...)) so the
submitted callable binds parameters by name (apply the same change to the
get_test_results/get_test_invocation calls referenced by f_test_results).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/unit/monitor/api/report/test_report_parallel.py`:
- Around line 76-121: The test allows hidden exceptions because
mock_pool.submit.return_value.result.return_value = {} makes Phase-2 code
operate on a dict (missing .invocation_id) which raises and is swallowed; fix
the test for test_uses_thread_pool_executor by returning a mock object with an
invocation_id for the test-invocation future (so Phase 2 sees .invocation_id),
assert that _get_report_data_parallel returned err is None or that
api._assemble_report_data was called to ensure the full parallel flow ran, and
rename the unused variable result in test_error_propagation to _ to satisfy
RUF059; update mocks created in the test to return appropriate per-future
results instead of a plain dict.

---

Nitpick comments:
In `@elementary/monitor/api/report/report.py`:
- Line 239: The internal default for threads on _get_report_data_parallel
(threads: int = 4) conflicts with the public get_report_data default (threads=1)
and is effectively dead; update _get_report_data_parallel to remove the
misleading default (make threads a required parameter) or set it to align with
router semantics (e.g., threads: int = 2 or threads: int >= 2) so callers and
future maintainers see consistent behavior—modify the signature of
_get_report_data_parallel and any internal calls (and the router logic that
branches to _get_report_data_parallel) to pass the explicit threads value from
get_report_data or the router.
- Around line 340-368: LineageAPI and FiltersAPI are being constructed with
parallel_runner (SubprocessDbtRunner) even though they are used sequentially;
change their instantiation to use the in-process runner self.dbt_runner instead
of parallel_runner to avoid unnecessary subprocess overhead — update the
LineageAPI(dbt_runner=parallel_runner) call and the
FiltersAPI(dbt_runner=parallel_runner) call to use dbt_runner=self.dbt_runner
(or equivalent parameter name) before passing the resulting lineage and
filters_api into _assemble_report_data.
- Around line 369-370: The parallel-path except block currently returns
ReportDataSchema(), error without logging; update the handler to call
logger.exception("Failed to build report (parallel path): %s", error) (or
logger.exception with a clear message) before returning to preserve traceback;
do the same change in the sequential-path except that returns
ReportDataSchema(), error (use a distinct message like "Failed to build report
(sequential path)"); reference the existing logger object and the
ReportDataSchema/ error variables so you only add logger.exception(...) prior to
the return.
- Around line 372-396: The _assemble_report_data function is missing type
annotations for all parameters; add explicit, precise type hints for each
argument (e.g., days_back: int, project_name: str, env: str, warehouse_type:
str, seeds: List[SeedSchema] or Dict[str, SeedSchema] as appropriate, snapshots:
List[SnapshotSchema], models: List[ModelSchema], sources: List[SourceSchema],
exposures: List[ExposureSchema], singular_tests: List[TestSchema], models_runs:
Dict[str, InvocationSchema] or List[InvocationSchema], coverages: CoverageSchema
or Dict[str, CoverageSchema], tests: List[TestSchema], test_invocation:
Optional[InvocationSchema], test_results: List[TestResultSchema],
source_freshness_results: List[FreshnessResultSchema], test_runs:
List[RunSchema], source_freshness_runs: List[RunSchema], lineage: LineageSchema,
filters_api: FiltersApiSchema, models_latest_invocation:
Optional[InvocationSchema], invocations_data: InvocationsDataSchema) matching
existing project schemas/types and keep the return type Tuple[ReportDataSchema,
Optional[Exception]]; update any imports and/or type aliases used by
_assemble_report_data to ensure type-checking passes and callers align with
these annotations.
- Around line 277-285: The parallel pool.submit calls (e.g., f_models_runs,
f_coverages, f_tests, f_test_invocation and the later f_test_results) pass
positional args to _new_models_api().get_models_runs, _new_tests_api().get_tests
and _new_invocations_api().get_test_invocation_from_filter which risks silent
breakage if signatures change; update each pool.submit to pass keyword arguments
(use functools.partial(fn, days_back=..., exclude_elementary_models=...) or a
lambda that calls fn(days_back=..., exclude_elementary_models=...)) so the
submitted callable binds parameters by name (apply the same change to the
get_test_results/get_test_invocation calls referenced by f_test_results).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 76e46349-7bcd-4e50-bda4-be6bf55b392a

📥 Commits

Reviewing files that changed from the base of the PR and between 9e9c925 and 1221e71.

📒 Files selected for processing (3)
  • elementary/monitor/api/report/report.py
  • tests/unit/monitor/api/report/__init__.py
  • tests/unit/monitor/api/report/test_report_parallel.py

Comment thread tests/unit/monitor/api/report/test_report_parallel.py
@dtaniwaki dtaniwaki requested a deployment to elementary_test_env April 22, 2026 13:42 — with GitHub Actions Waiting
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
elementary/monitor/cli.py (1)

672-678: LGTM — consistent with the report command.

Same validated option duplicated on send-report. If more commands adopt --threads later, consider lifting this into common_options(...) gated on cmd in (Command.REPORT, Command.SEND_REPORT) to avoid drift between the two declarations, but this is optional.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@elementary/monitor/cli.py` around lines 672 - 678, Duplicate click option
definition for "--threads" exists in the report and send-report command
declarations; refactor by moving the shared click.option into the
common_options(...) helper and conditionally apply it only when cmd is in
(Command.REPORT, Command.SEND_REPORT) so both command functions (the ones
registering "--threads") use the single source of truth; update any command
decorators to remove the local "--threads" declaration and ensure
common_options(...) documents the same IntRange(min=1) default and help text.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@elementary/monitor/cli.py`:
- Around line 672-678: Duplicate click option definition for "--threads" exists
in the report and send-report command declarations; refactor by moving the
shared click.option into the common_options(...) helper and conditionally apply
it only when cmd is in (Command.REPORT, Command.SEND_REPORT) so both command
functions (the ones registering "--threads") use the single source of truth;
update any command decorators to remove the local "--threads" declaration and
ensure common_options(...) documents the same IntRange(min=1) default and help
text.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 04a85476-c7d7-4bdd-a480-7256e0e16dd7

📥 Commits

Reviewing files that changed from the base of the PR and between 1221e71 and 3ed15b9.

📒 Files selected for processing (1)
  • elementary/monitor/cli.py

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
tests/unit/monitor/api/report/test_report_parallel.py (1)

14-14: Make this test prove subprocess runs always raise on failure.

Because the fixture already has raise_on_failure=True, the assertion would still pass if _create_subprocess_runner() simply forwards the parent runner’s value. Set the fixture value to False in this test and keep expecting True so error propagation is actually covered.

🧪 Proposed strengthening
     def test_creates_runner_with_correct_config(self, mock_dbt_runner):
+        mock_dbt_runner.raise_on_failure = False
         api = ReportAPI(mock_dbt_runner)
         with patch(
             "elementary.monitor.api.report.report.SubprocessDbtRunner"
         ) as mock_cls:
             api._create_subprocess_runner()
@@
-                raise_on_failure=True,
+                raise_on_failure=True,

Also applies to: 33-33

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/monitor/api/report/test_report_parallel.py` at line 14, The test
currently sets runner.raise_on_failure = True which doesn't prove subprocesses
are forced to raise; change the test to set runner.raise_on_failure = False but
keep the assertion that the subprocess runner has raise_on_failure == True so
the test verifies _create_subprocess_runner() forces error propagation; update
both occurrences of runner.raise_on_failure in this test file and ensure the
assertion still checks the subprocess runner produced by
_create_subprocess_runner().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/unit/monitor/api/report/test_report_parallel.py`:
- Around line 11-12: Replace insecure /tmp literals used in the test by
assigning neutral fake paths to the runner attributes: change runner.project_dir
and runner.profiles_dir (and the other occurrences of those same assignments
later in the file) from "/tmp/project" and "/tmp/profiles" to non-/tmp
placeholders (e.g., "/fake/project" and "/fake/profiles" or "fake/project" and
"fake/profiles") so Ruff S108 is not triggered while preserving test semantics.

---

Nitpick comments:
In `@tests/unit/monitor/api/report/test_report_parallel.py`:
- Line 14: The test currently sets runner.raise_on_failure = True which doesn't
prove subprocesses are forced to raise; change the test to set
runner.raise_on_failure = False but keep the assertion that the subprocess
runner has raise_on_failure == True so the test verifies
_create_subprocess_runner() forces error propagation; update both occurrences of
runner.raise_on_failure in this test file and ensure the assertion still checks
the subprocess runner produced by _create_subprocess_runner().
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 06033225-6488-4eee-8642-bcbf8d5c6227

📥 Commits

Reviewing files that changed from the base of the PR and between 3ed15b9 and 6048e93.

📒 Files selected for processing (1)
  • tests/unit/monitor/api/report/test_report_parallel.py

Signed-off-by: Daisuke Taniwaki <daisuketaniwaki@gmail.com>
@dtaniwaki dtaniwaki force-pushed the feat/parallel-report-fetchers branch from 6048e93 to ad039e2 Compare May 13, 2026 04:35
@dtaniwaki dtaniwaki requested a deployment to elementary_test_env May 13, 2026 04:35 — with GitHub Actions Waiting
@dtaniwaki dtaniwaki changed the title feat: add --threads option to parallelize report data fetching feat(report): add --threads option to fetch report data in parallel May 13, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
tests/unit/monitor/api/report/test_report_parallel.py (1)

41-48: 💤 Low value

Consider removing redundant test.

test_deps_not_run re-checks run_deps_if_needed=False, which is already verified by test_creates_runner_with_correct_config at line 38 via assert_called_once_with. The redundant test adds no additional coverage.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/unit/monitor/api/report/test_report_parallel.py` around lines 41 - 48,
Remove the redundant test test_deps_not_run: it re-verifies
run_deps_if_needed=False which is already asserted in
test_creates_runner_with_correct_config via the SubprocessDbtRunner call
assertion. Delete the test_deps_not_run function (the block that constructs
ReportAPI, patches report.SubprocessDbtRunner, calls
api._create_subprocess_runner(), and asserts call_kwargs["run_deps_if_needed"]
is False) to avoid duplicate coverage and keep tests focused.
elementary/monitor/api/report/report.py (1)

372-396: ⚡ Quick win

Add type annotations to _assemble_report_data parameters.

All other helpers in ReportAPI (including the newly added _get_report_data_sequential / _get_report_data_parallel and the pre-existing _serialize_* methods) carry full type hints, but this new shared assembly method drops them entirely across 21 parameters. Lift the types from the call sites to keep the class consistent and catch any future caller drift between the sequential and parallel paths.

♻️ Suggested annotation sketch
-    def _assemble_report_data(
-        self,
-        days_back,
-        project_name,
-        env,
-        warehouse_type,
-        seeds,
-        snapshots,
-        models,
-        sources,
-        exposures,
-        singular_tests,
-        models_runs,
-        coverages,
-        tests,
-        test_invocation,
-        test_results,
-        source_freshness_results,
-        test_runs,
-        source_freshness_runs,
-        lineage,
-        filters_api,
-        models_latest_invocation,
-        invocations_data,
-    ) -> Tuple[ReportDataSchema, Optional[Exception]]:
+    def _assemble_report_data(
+        self,
+        days_back: int,
+        project_name: Optional[str],
+        env: Optional[str],
+        warehouse_type: Optional[str],
+        seeds: Dict[str, NormalizedSeedSchema],
+        snapshots: Dict[str, NormalizedSnapshotSchema],
+        models: Dict[str, NormalizedModelSchema],
+        sources: Dict[str, NormalizedSourceSchema],
+        exposures: Dict[str, NormalizedExposureSchema],
+        singular_tests: Iterable[NormalizedTestSchema],
+        models_runs,  # ModelRunsResultsSchema (from ModelsAPI.get_models_runs)
+        coverages: Dict[str, ModelCoverageSchema],
+        tests: Dict[str, TestSchema],
+        test_invocation,  # InvocationSchema from InvocationsAPI
+        test_results: Dict[str, List[TestResultSchema]],
+        source_freshness_results: Dict[str, List[SourceFreshnessResultSchema]],
+        test_runs: Dict[str, List[TestRunSchema]],
+        source_freshness_runs: Dict[str, List[SourceFreshnessRunSchema]],
+        lineage,  # LineageSchema from LineageAPI.get_lineage
+        filters_api: FiltersAPI,
+        models_latest_invocation,
+        invocations_data,
+    ) -> Tuple[ReportDataSchema, Optional[Exception]]:

Resolve the few # ... placeholders against the concrete return types of ModelsAPI.get_models_runs, InvocationsAPI.get_test_invocation_from_filter, LineageAPI.get_lineage, and the latest-invocation getters.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@elementary/monitor/api/report/report.py` around lines 372 - 396, The method
ReportAPI._assemble_report_data currently lacks type annotations across its 21
parameters; update its signature to add precise type hints matching the call
sites (e.g., use the concrete types returned/passed by
ModelsAPI.get_models_runs, InvocationsAPI.get_test_invocation_from_filter,
LineageAPI.get_lineage, and the latest-invocation getters) so the sequential and
parallel callers have consistent types, and replace the existing `# ...`
placeholders with the actual return types from those APIs; keep the return
annotation Tuple[ReportDataSchema, Optional[Exception]] as-is and ensure
imports/forward refs are added if needed.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@tests/unit/monitor/api/report/test_report_parallel.py`:
- Around line 11-12: The test uses literal insecure temp paths
runner.project_dir = "/tmp/project" and runner.profiles_dir = "/tmp/profiles"
which triggers Ruff S108; change those to neutral mock paths (e.g.,
"mock_project_dir" and "mock_profiles_dir" or a non-/tmp absolute path) in
tests/unit/monitor/api/report/test_report_parallel.py where runner.project_dir
and runner.profiles_dir are set so Ruff no longer flags them while preserving
test semantics.

---

Nitpick comments:
In `@elementary/monitor/api/report/report.py`:
- Around line 372-396: The method ReportAPI._assemble_report_data currently
lacks type annotations across its 21 parameters; update its signature to add
precise type hints matching the call sites (e.g., use the concrete types
returned/passed by ModelsAPI.get_models_runs,
InvocationsAPI.get_test_invocation_from_filter, LineageAPI.get_lineage, and the
latest-invocation getters) so the sequential and parallel callers have
consistent types, and replace the existing `# ...` placeholders with the actual
return types from those APIs; keep the return annotation Tuple[ReportDataSchema,
Optional[Exception]] as-is and ensure imports/forward refs are added if needed.

In `@tests/unit/monitor/api/report/test_report_parallel.py`:
- Around line 41-48: Remove the redundant test test_deps_not_run: it re-verifies
run_deps_if_needed=False which is already asserted in
test_creates_runner_with_correct_config via the SubprocessDbtRunner call
assertion. Delete the test_deps_not_run function (the block that constructs
ReportAPI, patches report.SubprocessDbtRunner, calls
api._create_subprocess_runner(), and asserts call_kwargs["run_deps_if_needed"]
is False) to avoid duplicate coverage and keep tests focused.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a8cd6aad-07a2-4109-ab7b-1eb688ddc912

📥 Commits

Reviewing files that changed from the base of the PR and between 6048e93 and ad039e2.

📒 Files selected for processing (5)
  • elementary/monitor/api/report/report.py
  • elementary/monitor/cli.py
  • elementary/monitor/data_monitoring/report/data_monitoring_report.py
  • tests/unit/monitor/api/report/__init__.py
  • tests/unit/monitor/api/report/test_report_parallel.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • elementary/monitor/data_monitoring/report/data_monitoring_report.py
  • elementary/monitor/cli.py

Comment thread tests/unit/monitor/api/report/test_report_parallel.py Outdated
…s required, in-process runner for sequential calls, logger.exception, typed _assemble_report_data, partial-based kwargs, test cleanup)

Signed-off-by: Daisuke Taniwaki <daisuketaniwaki@gmail.com>
Comment thread elementary/monitor/api/report/report.py Outdated
filter: SelectorFilterSchema = SelectorFilterSchema(),
env: Optional[str] = None,
warehouse_type: Optional[str] = None,
threads: int = 4,
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 7b2df2b — made threads a required parameter on _get_report_data_parallel so the misleading =4 default is gone and callers must come through the public router.

Comment thread elementary/monitor/api/report/report.py Outdated
Comment on lines +277 to +285
f_models_runs = pool.submit(
_new_models_api().get_models_runs,
days_back,
exclude_elementary_models,
)
f_coverages = pool.submit(_new_models_api().get_test_coverages)
f_tests = pool.submit(_new_tests_api().get_tests)
f_test_invocation = pool.submit(
_new_invocations_api().get_test_invocation_from_filter, filter
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 7b2df2b — switched pool.submit calls to functools.partial(fn, kwarg=...) so keyword binding survives future signature changes. Same change applied to f_test_results / f_exposures / f_test_invocation further down.

Comment thread elementary/monitor/api/report/report.py Outdated
Comment on lines +340 to +365
lineage = LineageAPI(dbt_runner=parallel_runner).get_lineage(
lineage_node_ids, exclude_elementary_models
)

# Phase 4: pure computation (no dbt calls)
return self._assemble_report_data(
days_back=days_back,
models=serializable_models,
groups=serializable_groups,
tests=serializable_tests,
invocation=serializable_invocation,
test_results=serializable_test_results,
test_results_totals=serializable_test_results_totals,
test_runs=serializable_test_runs,
test_runs_totals=serializable_test_runs_totals,
coverages=serializable_models_coverages,
model_runs=serializable_model_runs,
model_runs_totals=serializable_model_runs_totals,
filters=serializable_filters,
lineage=serializable_lineage,
invocations=invocations,
resources_latest_invocation=models_latest_invocation,
invocations_job_identification=invocations_job_identification,
env=ReportDataEnvSchema(
project_name=project_name, env=env, warehouse_type=warehouse_type
),
project_name=project_name,
env=env,
warehouse_type=warehouse_type,
seeds=seeds,
snapshots=snapshots,
models=models,
sources=sources,
exposures=exposures,
singular_tests=singular_tests,
models_runs=models_runs,
coverages=coverages,
tests=tests,
test_invocation=test_invocation,
test_results=test_results,
source_freshness_results=source_freshness_results,
test_runs=test_runs,
source_freshness_runs=source_freshness_runs,
lineage=lineage,
filters_api=FiltersAPI(dbt_runner=parallel_runner),
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 7b2df2b — both LineageAPI (Phase 3) and FiltersAPI (Phase 4) now use self.dbt_runner since they are invoked sequentially after the thread pool exits, avoiding unnecessary subprocess spawn overhead.

Comment on lines 369 to 370
except Exception as error:
return ReportDataSchema(), error
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 7b2df2b — both the parallel and sequential except Exception blocks now call logger.exception(...) with a distinct message before returning, so the traceback is preserved for diagnosis.

Comment on lines +372 to +396
def _assemble_report_data(
self,
days_back,
project_name,
env,
warehouse_type,
seeds,
snapshots,
models,
sources,
exposures,
singular_tests,
models_runs,
coverages,
tests,
test_invocation,
test_results,
source_freshness_results,
test_runs,
source_freshness_runs,
lineage,
filters_api,
models_latest_invocation,
invocations_data,
) -> Tuple[ReportDataSchema, Optional[Exception]]:
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 7b2df2b_assemble_report_data now has full type annotations for all 22 parameters: Dict[str, NormalizedSeedSchema], ModelRunsWithTotalsSchema, DbtInvocationSchema, LineageSchema, etc. Added matching imports.

Comment thread elementary/monitor/cli.py
Comment on lines +672 to +678
@click.option(
"--threads",
type=click.IntRange(min=1),
default=1,
help="Number of threads for fetching report data in parallel. "
"When set to >1, independent dbt operations run concurrently using subprocess-based runners.",
)
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not addressing for now — only two declarations exist today (report and send-report). Lifting into common_options(...) with conditional gating felt like premature abstraction for a 2-call duplication. Happy to revisit if a third command adopts --threads.

runner.project_dir = "/tmp/project"
runner.profiles_dir = "/tmp/profiles"
runner.target = "dev"
runner.raise_on_failure = True
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not addressing — _create_subprocess_runner() intentionally forwards the parent runner's raise_on_failure rather than forcing True, so the user's configured behavior carries over to parallel mode. Setting the fixture to False while asserting True would test a behavior the code does not (and should not) implement. The current test correctly verifies the forwarding contract.

Comment on lines +41 to +48
def test_deps_not_run(self, mock_dbt_runner):
api = ReportAPI(mock_dbt_runner)
with patch(
"elementary.monitor.api.report.report.SubprocessDbtRunner"
) as mock_cls:
api._create_subprocess_runner()
call_kwargs = mock_cls.call_args[1]
assert call_kwargs["run_deps_if_needed"] is False
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 7b2df2btest_deps_not_run removed. run_deps_if_needed=False is already covered by test_creates_runner_with_correct_config via assert_called_once_with(...).

@dtaniwaki dtaniwaki requested a deployment to elementary_test_env May 13, 2026 05:29 — with GitHub Actions Waiting
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant