feat(report): add --threads option to fetch report data in parallel#2197
feat(report): add --threads option to fetch report data in parallel#2197dtaniwaki wants to merge 2 commits into
Conversation
|
👋 @dtaniwaki |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds an optional ChangesParallel report generation
Sequence Diagram(s)sequenceDiagram
participant CLI as CLI
participant DMR as DataMonitoringReport
participant ReportAPI as ReportAPI
participant Executor as ThreadPoolExecutor
participant DBT as SubprocessDbtRunner
participant APIs as External_APIs
CLI->>DMR: generate_report(threads=N)
DMR->>ReportAPI: get_report_data(threads=N)
alt threads > 1
ReportAPI->>Executor: start phased parallel fetches
Executor->>DBT: _create_subprocess_runner() (per-thread runner)
Executor->>APIs: fetch models/tests/source_freshnesses/invocations (phase 1, parallel)
APIs-->>Executor: results
Executor->>APIs: fetch dependent exposures/tests (phase 2)
APIs-->>Executor: results
Executor->>APIs: fetch lineage (phase 3)
APIs-->>Executor: lineage
else
ReportAPI->>DBT: sequential dbt/API calls
DBT-->>ReportAPI: results
APIs-->>ReportAPI: results
end
ReportAPI->>ReportAPI: _assemble_report_data(all fetched datasets)
ReportAPI-->>DMR: ReportDataSchema, err?
DMR-->>CLI: output/send report
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
elementary/monitor/api/report/report.py (1)
451-493: Reuse_build_report_datain the parallel path.The parallel path still duplicates most of the new builder logic, so the two execution modes can drift in serialized shape or invocation handling. Consider letting
_build_report_dataaccept optional pre-fetched invocation data, then call it from both paths.♻️ Directional refactor
def _build_report_data( self, ... filters, invocations_api, + models_latest_invocation=None, + invocations=None, ) -> Tuple[ReportDataSchema, Optional[Exception]]: @@ - models_latest_invocation = invocations_api.get_models_latest_invocation() - invocations = invocations_api.get_models_latest_invocations_data() + if models_latest_invocation is None: + models_latest_invocation = invocations_api.get_models_latest_invocation() + if invocations is None: + invocations = invocations_api.get_models_latest_invocations_data()Then
_get_report_data_parallelcan pass the already-fetchedmodels_latest_invocationandinvocations_datainto the same builder.Also applies to: 498-593
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@elementary/monitor/api/report/report.py` around lines 451 - 493, The parallel path duplicates the ReportDataSchema-building logic; change _build_report_data to accept optional parameters for pre-fetched invocation data (e.g., invocations_data and models_latest_invocation) and use those instead of redoing serialization when provided, keeping existing behavior by making them optional with defaults; then update _get_report_data_parallel to call _build_report_data passing the already-fetched models_latest_invocation and invocations_data and remove the duplicated serialization/ReportDataSchema construction (the serializable_* variables and ReportDataSchema instantiation should be produced only by _build_report_data), ensuring all callers that don't pass the new args continue to work.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@elementary/monitor/api/report/report.py`:
- Around line 76-95: Multiple concurrent dbt subprocesses created by
_create_subprocess_runner/SubprocessDbtRunner can write to the same artifact/log
directories (DBT_LOG_PATH and dbt_project.yml's target-path), causing race
conditions; update _create_subprocess_runner to inject per-subprocess isolation
by cloning the runner.env_vars and adding a unique artifact/log path (e.g., set
DBT_LOG_PATH to a worker-specific temp dir) and also ensure a per-subprocess
target path is provided (either via an environment variable or by passing a
config override) so that run_operation-created subprocesses do not share the
same project_dir artifacts.
In `@elementary/monitor/cli.py`:
- Around line 452-458: Replace the loose integer type on the "--threads" Click
option with a ranged validator so zero and negative values are rejected at the
CLI: change the option declaration for the "--threads" argument used by the
monitor-report and monitor-send-report commands from type=int to
type=click.IntRange(min=1) (or type=click.IntRange(1, None)) so Click will
enforce threads >= 1 and surface an immediate error to the user.
---
Nitpick comments:
In `@elementary/monitor/api/report/report.py`:
- Around line 451-493: The parallel path duplicates the
ReportDataSchema-building logic; change _build_report_data to accept optional
parameters for pre-fetched invocation data (e.g., invocations_data and
models_latest_invocation) and use those instead of redoing serialization when
provided, keeping existing behavior by making them optional with defaults; then
update _get_report_data_parallel to call _build_report_data passing the
already-fetched models_latest_invocation and invocations_data and remove the
duplicated serialization/ReportDataSchema construction (the serializable_*
variables and ReportDataSchema instantiation should be produced only by
_build_report_data), ensuring all callers that don't pass the new args continue
to work.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 16bd31bf-041a-4378-9f87-ea58c7ebec88
📒 Files selected for processing (3)
elementary/monitor/api/report/report.pyelementary/monitor/cli.pyelementary/monitor/data_monitoring/report/data_monitoring_report.py
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (5)
elementary/monitor/api/report/report.py (5)
239-239: Inconsistent internal default forthreads.
_get_report_data_paralleldefaultsthreads=4while the publicget_report_datadefaults tothreads=1. Since_get_report_data_parallelis only reached whenthreads > 1via the router, the internal default is effectively dead code and can mislead future callers. Consider removing the default or aligning it with the router semantics (threads >= 2).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@elementary/monitor/api/report/report.py` at line 239, The internal default for threads on _get_report_data_parallel (threads: int = 4) conflicts with the public get_report_data default (threads=1) and is effectively dead; update _get_report_data_parallel to remove the misleading default (make threads a required parameter) or set it to align with router semantics (e.g., threads: int = 2 or threads: int >= 2) so callers and future maintainers see consistent behavior—modify the signature of _get_report_data_parallel and any internal calls (and the router logic that branches to _get_report_data_parallel) to pass the explicit threads value from get_report_data or the router.
340-368: Phase 3/4 unnecessarily useSubprocessDbtRunnerfor sequential calls.Both
LineageAPI(line 340) andFiltersAPI(line 365) are instantiated withparallel_runnerbut invoked sequentially — a single call each. UsingSubprocessDbtRunnerincurs process-spawn overhead without any parallelism benefit, and it diverges from_get_report_data_sequentialwhich usesself.dbt_runner(the in-process runner) for these same APIs. Suggest usingself.dbt_runnerhere for parity and performance.♻️ Suggested fix
# Phase 3: lineage depends on all node IDs - lineage = LineageAPI(dbt_runner=parallel_runner).get_lineage( + lineage = LineageAPI(dbt_runner=self.dbt_runner).get_lineage( lineage_node_ids, exclude_elementary_models ) @@ - filters_api=FiltersAPI(dbt_runner=parallel_runner), + filters_api=FiltersAPI(dbt_runner=self.dbt_runner),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@elementary/monitor/api/report/report.py` around lines 340 - 368, LineageAPI and FiltersAPI are being constructed with parallel_runner (SubprocessDbtRunner) even though they are used sequentially; change their instantiation to use the in-process runner self.dbt_runner instead of parallel_runner to avoid unnecessary subprocess overhead — update the LineageAPI(dbt_runner=parallel_runner) call and the FiltersAPI(dbt_runner=parallel_runner) call to use dbt_runner=self.dbt_runner (or equivalent parameter name) before passing the resulting lineage and filters_api into _assemble_report_data.
369-370: Parallel path swallows exceptions without logging.A module-level
loggerwas just added (line 46) but theexcept Exception as errorblock here returns the error without logging it. In the parallel path, exceptions can originate from worker threads/subprocesses and are non-trivial to attribute without a log line. Suggestlogger.exception(...)to preserve the traceback for diagnosis (same applies to lines 225-226 in the sequential path, but the newloggeravailability makes this a good moment to address both).♻️ Suggested fix
except Exception as error: + logger.exception("Parallel report data fetch failed") return ReportDataSchema(), error🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@elementary/monitor/api/report/report.py` around lines 369 - 370, The parallel-path except block currently returns ReportDataSchema(), error without logging; update the handler to call logger.exception("Failed to build report (parallel path): %s", error) (or logger.exception with a clear message) before returning to preserve traceback; do the same change in the sequential-path except that returns ReportDataSchema(), error (use a distinct message like "Failed to build report (sequential path)"); reference the existing logger object and the ReportDataSchema/ error variables so you only add logger.exception(...) prior to the return.
372-396: Missing type annotations on_assemble_report_data.All 22 parameters lack type hints, which is inconsistent with the rest of the class (e.g.,
_get_groups,_serialize_*). Given the complexity of the argument list (mixedDict,List,Schematypes), annotations would meaningfully improve readability and catch mismatches between sequential/parallel callers at type-check time.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@elementary/monitor/api/report/report.py` around lines 372 - 396, The _assemble_report_data function is missing type annotations for all parameters; add explicit, precise type hints for each argument (e.g., days_back: int, project_name: str, env: str, warehouse_type: str, seeds: List[SeedSchema] or Dict[str, SeedSchema] as appropriate, snapshots: List[SnapshotSchema], models: List[ModelSchema], sources: List[SourceSchema], exposures: List[ExposureSchema], singular_tests: List[TestSchema], models_runs: Dict[str, InvocationSchema] or List[InvocationSchema], coverages: CoverageSchema or Dict[str, CoverageSchema], tests: List[TestSchema], test_invocation: Optional[InvocationSchema], test_results: List[TestResultSchema], source_freshness_results: List[FreshnessResultSchema], test_runs: List[RunSchema], source_freshness_runs: List[RunSchema], lineage: LineageSchema, filters_api: FiltersApiSchema, models_latest_invocation: Optional[InvocationSchema], invocations_data: InvocationsDataSchema) matching existing project schemas/types and keep the return type Tuple[ReportDataSchema, Optional[Exception]]; update any imports and/or type aliases used by _assemble_report_data to ensure type-checking passes and callers align with these annotations.
277-285: Use keyword arguments for thread pool submissions to prevent silent breakage if method signatures drift.The sequential path invokes
get_models_runs(days_back=..., exclude_elementary_models=...)(lines 179–181) andget_test_results(invocation_id=..., disable_samples=...)(lines 187–190) with keyword arguments. The parallel path at lines 278–281 and 329–333 relies on positional order. While the current signatures match, positional arguments create a silent failure risk: if either method's signature changes or parameters are reordered, the calls will invoke incorrectly without raising aTypeError. Usefunctools.partialor inlinelambdawith explicit keyword arguments for safety.♻️ Suggested fix
f_models_runs = pool.submit( - _new_models_api().get_models_runs, - days_back, - exclude_elementary_models, + lambda: _new_models_api().get_models_runs( + days_back=days_back, + exclude_elementary_models=exclude_elementary_models, + ) )And similarly for
f_test_results(lines 329–333):f_test_results = pool.submit( - _new_tests_api().get_test_results, - test_invocation.invocation_id, - disable_samples, + lambda: _new_tests_api().get_test_results( + invocation_id=test_invocation.invocation_id, + disable_samples=disable_samples, + ) )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@elementary/monitor/api/report/report.py` around lines 277 - 285, The parallel pool.submit calls (e.g., f_models_runs, f_coverages, f_tests, f_test_invocation and the later f_test_results) pass positional args to _new_models_api().get_models_runs, _new_tests_api().get_tests and _new_invocations_api().get_test_invocation_from_filter which risks silent breakage if signatures change; update each pool.submit to pass keyword arguments (use functools.partial(fn, days_back=..., exclude_elementary_models=...) or a lambda that calls fn(days_back=..., exclude_elementary_models=...)) so the submitted callable binds parameters by name (apply the same change to the get_test_results/get_test_invocation calls referenced by f_test_results).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/unit/monitor/api/report/test_report_parallel.py`:
- Around line 76-121: The test allows hidden exceptions because
mock_pool.submit.return_value.result.return_value = {} makes Phase-2 code
operate on a dict (missing .invocation_id) which raises and is swallowed; fix
the test for test_uses_thread_pool_executor by returning a mock object with an
invocation_id for the test-invocation future (so Phase 2 sees .invocation_id),
assert that _get_report_data_parallel returned err is None or that
api._assemble_report_data was called to ensure the full parallel flow ran, and
rename the unused variable result in test_error_propagation to _ to satisfy
RUF059; update mocks created in the test to return appropriate per-future
results instead of a plain dict.
---
Nitpick comments:
In `@elementary/monitor/api/report/report.py`:
- Line 239: The internal default for threads on _get_report_data_parallel
(threads: int = 4) conflicts with the public get_report_data default (threads=1)
and is effectively dead; update _get_report_data_parallel to remove the
misleading default (make threads a required parameter) or set it to align with
router semantics (e.g., threads: int = 2 or threads: int >= 2) so callers and
future maintainers see consistent behavior—modify the signature of
_get_report_data_parallel and any internal calls (and the router logic that
branches to _get_report_data_parallel) to pass the explicit threads value from
get_report_data or the router.
- Around line 340-368: LineageAPI and FiltersAPI are being constructed with
parallel_runner (SubprocessDbtRunner) even though they are used sequentially;
change their instantiation to use the in-process runner self.dbt_runner instead
of parallel_runner to avoid unnecessary subprocess overhead — update the
LineageAPI(dbt_runner=parallel_runner) call and the
FiltersAPI(dbt_runner=parallel_runner) call to use dbt_runner=self.dbt_runner
(or equivalent parameter name) before passing the resulting lineage and
filters_api into _assemble_report_data.
- Around line 369-370: The parallel-path except block currently returns
ReportDataSchema(), error without logging; update the handler to call
logger.exception("Failed to build report (parallel path): %s", error) (or
logger.exception with a clear message) before returning to preserve traceback;
do the same change in the sequential-path except that returns
ReportDataSchema(), error (use a distinct message like "Failed to build report
(sequential path)"); reference the existing logger object and the
ReportDataSchema/ error variables so you only add logger.exception(...) prior to
the return.
- Around line 372-396: The _assemble_report_data function is missing type
annotations for all parameters; add explicit, precise type hints for each
argument (e.g., days_back: int, project_name: str, env: str, warehouse_type:
str, seeds: List[SeedSchema] or Dict[str, SeedSchema] as appropriate, snapshots:
List[SnapshotSchema], models: List[ModelSchema], sources: List[SourceSchema],
exposures: List[ExposureSchema], singular_tests: List[TestSchema], models_runs:
Dict[str, InvocationSchema] or List[InvocationSchema], coverages: CoverageSchema
or Dict[str, CoverageSchema], tests: List[TestSchema], test_invocation:
Optional[InvocationSchema], test_results: List[TestResultSchema],
source_freshness_results: List[FreshnessResultSchema], test_runs:
List[RunSchema], source_freshness_runs: List[RunSchema], lineage: LineageSchema,
filters_api: FiltersApiSchema, models_latest_invocation:
Optional[InvocationSchema], invocations_data: InvocationsDataSchema) matching
existing project schemas/types and keep the return type Tuple[ReportDataSchema,
Optional[Exception]]; update any imports and/or type aliases used by
_assemble_report_data to ensure type-checking passes and callers align with
these annotations.
- Around line 277-285: The parallel pool.submit calls (e.g., f_models_runs,
f_coverages, f_tests, f_test_invocation and the later f_test_results) pass
positional args to _new_models_api().get_models_runs, _new_tests_api().get_tests
and _new_invocations_api().get_test_invocation_from_filter which risks silent
breakage if signatures change; update each pool.submit to pass keyword arguments
(use functools.partial(fn, days_back=..., exclude_elementary_models=...) or a
lambda that calls fn(days_back=..., exclude_elementary_models=...)) so the
submitted callable binds parameters by name (apply the same change to the
get_test_results/get_test_invocation calls referenced by f_test_results).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 76e46349-7bcd-4e50-bda4-be6bf55b392a
📒 Files selected for processing (3)
elementary/monitor/api/report/report.pytests/unit/monitor/api/report/__init__.pytests/unit/monitor/api/report/test_report_parallel.py
There was a problem hiding this comment.
🧹 Nitpick comments (1)
elementary/monitor/cli.py (1)
672-678: LGTM — consistent with thereportcommand.Same validated option duplicated on
send-report. If more commands adopt--threadslater, consider lifting this intocommon_options(...)gated oncmd in (Command.REPORT, Command.SEND_REPORT)to avoid drift between the two declarations, but this is optional.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@elementary/monitor/cli.py` around lines 672 - 678, Duplicate click option definition for "--threads" exists in the report and send-report command declarations; refactor by moving the shared click.option into the common_options(...) helper and conditionally apply it only when cmd is in (Command.REPORT, Command.SEND_REPORT) so both command functions (the ones registering "--threads") use the single source of truth; update any command decorators to remove the local "--threads" declaration and ensure common_options(...) documents the same IntRange(min=1) default and help text.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@elementary/monitor/cli.py`:
- Around line 672-678: Duplicate click option definition for "--threads" exists
in the report and send-report command declarations; refactor by moving the
shared click.option into the common_options(...) helper and conditionally apply
it only when cmd is in (Command.REPORT, Command.SEND_REPORT) so both command
functions (the ones registering "--threads") use the single source of truth;
update any command decorators to remove the local "--threads" declaration and
ensure common_options(...) documents the same IntRange(min=1) default and help
text.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
tests/unit/monitor/api/report/test_report_parallel.py (1)
14-14: Make this test prove subprocess runs always raise on failure.Because the fixture already has
raise_on_failure=True, the assertion would still pass if_create_subprocess_runner()simply forwards the parent runner’s value. Set the fixture value toFalsein this test and keep expectingTrueso error propagation is actually covered.🧪 Proposed strengthening
def test_creates_runner_with_correct_config(self, mock_dbt_runner): + mock_dbt_runner.raise_on_failure = False api = ReportAPI(mock_dbt_runner) with patch( "elementary.monitor.api.report.report.SubprocessDbtRunner" ) as mock_cls: api._create_subprocess_runner() @@ - raise_on_failure=True, + raise_on_failure=True,Also applies to: 33-33
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/monitor/api/report/test_report_parallel.py` at line 14, The test currently sets runner.raise_on_failure = True which doesn't prove subprocesses are forced to raise; change the test to set runner.raise_on_failure = False but keep the assertion that the subprocess runner has raise_on_failure == True so the test verifies _create_subprocess_runner() forces error propagation; update both occurrences of runner.raise_on_failure in this test file and ensure the assertion still checks the subprocess runner produced by _create_subprocess_runner().
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/unit/monitor/api/report/test_report_parallel.py`:
- Around line 11-12: Replace insecure /tmp literals used in the test by
assigning neutral fake paths to the runner attributes: change runner.project_dir
and runner.profiles_dir (and the other occurrences of those same assignments
later in the file) from "/tmp/project" and "/tmp/profiles" to non-/tmp
placeholders (e.g., "/fake/project" and "/fake/profiles" or "fake/project" and
"fake/profiles") so Ruff S108 is not triggered while preserving test semantics.
---
Nitpick comments:
In `@tests/unit/monitor/api/report/test_report_parallel.py`:
- Line 14: The test currently sets runner.raise_on_failure = True which doesn't
prove subprocesses are forced to raise; change the test to set
runner.raise_on_failure = False but keep the assertion that the subprocess
runner has raise_on_failure == True so the test verifies
_create_subprocess_runner() forces error propagation; update both occurrences of
runner.raise_on_failure in this test file and ensure the assertion still checks
the subprocess runner produced by _create_subprocess_runner().
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 06033225-6488-4eee-8642-bcbf8d5c6227
📒 Files selected for processing (1)
tests/unit/monitor/api/report/test_report_parallel.py
Signed-off-by: Daisuke Taniwaki <daisuketaniwaki@gmail.com>
6048e93 to
ad039e2
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
tests/unit/monitor/api/report/test_report_parallel.py (1)
41-48: 💤 Low valueConsider removing redundant test.
test_deps_not_runre-checksrun_deps_if_needed=False, which is already verified bytest_creates_runner_with_correct_configat line 38 viaassert_called_once_with. The redundant test adds no additional coverage.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/unit/monitor/api/report/test_report_parallel.py` around lines 41 - 48, Remove the redundant test test_deps_not_run: it re-verifies run_deps_if_needed=False which is already asserted in test_creates_runner_with_correct_config via the SubprocessDbtRunner call assertion. Delete the test_deps_not_run function (the block that constructs ReportAPI, patches report.SubprocessDbtRunner, calls api._create_subprocess_runner(), and asserts call_kwargs["run_deps_if_needed"] is False) to avoid duplicate coverage and keep tests focused.elementary/monitor/api/report/report.py (1)
372-396: ⚡ Quick winAdd type annotations to
_assemble_report_dataparameters.All other helpers in
ReportAPI(including the newly added_get_report_data_sequential/_get_report_data_paralleland the pre-existing_serialize_*methods) carry full type hints, but this new shared assembly method drops them entirely across 21 parameters. Lift the types from the call sites to keep the class consistent and catch any future caller drift between the sequential and parallel paths.♻️ Suggested annotation sketch
- def _assemble_report_data( - self, - days_back, - project_name, - env, - warehouse_type, - seeds, - snapshots, - models, - sources, - exposures, - singular_tests, - models_runs, - coverages, - tests, - test_invocation, - test_results, - source_freshness_results, - test_runs, - source_freshness_runs, - lineage, - filters_api, - models_latest_invocation, - invocations_data, - ) -> Tuple[ReportDataSchema, Optional[Exception]]: + def _assemble_report_data( + self, + days_back: int, + project_name: Optional[str], + env: Optional[str], + warehouse_type: Optional[str], + seeds: Dict[str, NormalizedSeedSchema], + snapshots: Dict[str, NormalizedSnapshotSchema], + models: Dict[str, NormalizedModelSchema], + sources: Dict[str, NormalizedSourceSchema], + exposures: Dict[str, NormalizedExposureSchema], + singular_tests: Iterable[NormalizedTestSchema], + models_runs, # ModelRunsResultsSchema (from ModelsAPI.get_models_runs) + coverages: Dict[str, ModelCoverageSchema], + tests: Dict[str, TestSchema], + test_invocation, # InvocationSchema from InvocationsAPI + test_results: Dict[str, List[TestResultSchema]], + source_freshness_results: Dict[str, List[SourceFreshnessResultSchema]], + test_runs: Dict[str, List[TestRunSchema]], + source_freshness_runs: Dict[str, List[SourceFreshnessRunSchema]], + lineage, # LineageSchema from LineageAPI.get_lineage + filters_api: FiltersAPI, + models_latest_invocation, + invocations_data, + ) -> Tuple[ReportDataSchema, Optional[Exception]]:Resolve the few
# ...placeholders against the concrete return types ofModelsAPI.get_models_runs,InvocationsAPI.get_test_invocation_from_filter,LineageAPI.get_lineage, and the latest-invocation getters.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@elementary/monitor/api/report/report.py` around lines 372 - 396, The method ReportAPI._assemble_report_data currently lacks type annotations across its 21 parameters; update its signature to add precise type hints matching the call sites (e.g., use the concrete types returned/passed by ModelsAPI.get_models_runs, InvocationsAPI.get_test_invocation_from_filter, LineageAPI.get_lineage, and the latest-invocation getters) so the sequential and parallel callers have consistent types, and replace the existing `# ...` placeholders with the actual return types from those APIs; keep the return annotation Tuple[ReportDataSchema, Optional[Exception]] as-is and ensure imports/forward refs are added if needed.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tests/unit/monitor/api/report/test_report_parallel.py`:
- Around line 11-12: The test uses literal insecure temp paths
runner.project_dir = "/tmp/project" and runner.profiles_dir = "/tmp/profiles"
which triggers Ruff S108; change those to neutral mock paths (e.g.,
"mock_project_dir" and "mock_profiles_dir" or a non-/tmp absolute path) in
tests/unit/monitor/api/report/test_report_parallel.py where runner.project_dir
and runner.profiles_dir are set so Ruff no longer flags them while preserving
test semantics.
---
Nitpick comments:
In `@elementary/monitor/api/report/report.py`:
- Around line 372-396: The method ReportAPI._assemble_report_data currently
lacks type annotations across its 21 parameters; update its signature to add
precise type hints matching the call sites (e.g., use the concrete types
returned/passed by ModelsAPI.get_models_runs,
InvocationsAPI.get_test_invocation_from_filter, LineageAPI.get_lineage, and the
latest-invocation getters) so the sequential and parallel callers have
consistent types, and replace the existing `# ...` placeholders with the actual
return types from those APIs; keep the return annotation Tuple[ReportDataSchema,
Optional[Exception]] as-is and ensure imports/forward refs are added if needed.
In `@tests/unit/monitor/api/report/test_report_parallel.py`:
- Around line 41-48: Remove the redundant test test_deps_not_run: it re-verifies
run_deps_if_needed=False which is already asserted in
test_creates_runner_with_correct_config via the SubprocessDbtRunner call
assertion. Delete the test_deps_not_run function (the block that constructs
ReportAPI, patches report.SubprocessDbtRunner, calls
api._create_subprocess_runner(), and asserts call_kwargs["run_deps_if_needed"]
is False) to avoid duplicate coverage and keep tests focused.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: a8cd6aad-07a2-4109-ab7b-1eb688ddc912
📒 Files selected for processing (5)
elementary/monitor/api/report/report.pyelementary/monitor/cli.pyelementary/monitor/data_monitoring/report/data_monitoring_report.pytests/unit/monitor/api/report/__init__.pytests/unit/monitor/api/report/test_report_parallel.py
🚧 Files skipped from review as they are similar to previous changes (2)
- elementary/monitor/data_monitoring/report/data_monitoring_report.py
- elementary/monitor/cli.py
…s required, in-process runner for sequential calls, logger.exception, typed _assemble_report_data, partial-based kwargs, test cleanup) Signed-off-by: Daisuke Taniwaki <daisuketaniwaki@gmail.com>
| filter: SelectorFilterSchema = SelectorFilterSchema(), | ||
| env: Optional[str] = None, | ||
| warehouse_type: Optional[str] = None, | ||
| threads: int = 4, |
There was a problem hiding this comment.
Addressed in 7b2df2b — made threads a required parameter on _get_report_data_parallel so the misleading =4 default is gone and callers must come through the public router.
| f_models_runs = pool.submit( | ||
| _new_models_api().get_models_runs, | ||
| days_back, | ||
| exclude_elementary_models, | ||
| ) | ||
| f_coverages = pool.submit(_new_models_api().get_test_coverages) | ||
| f_tests = pool.submit(_new_tests_api().get_tests) | ||
| f_test_invocation = pool.submit( | ||
| _new_invocations_api().get_test_invocation_from_filter, filter |
There was a problem hiding this comment.
Addressed in 7b2df2b — switched pool.submit calls to functools.partial(fn, kwarg=...) so keyword binding survives future signature changes. Same change applied to f_test_results / f_exposures / f_test_invocation further down.
| lineage = LineageAPI(dbt_runner=parallel_runner).get_lineage( | ||
| lineage_node_ids, exclude_elementary_models | ||
| ) | ||
|
|
||
| # Phase 4: pure computation (no dbt calls) | ||
| return self._assemble_report_data( | ||
| days_back=days_back, | ||
| models=serializable_models, | ||
| groups=serializable_groups, | ||
| tests=serializable_tests, | ||
| invocation=serializable_invocation, | ||
| test_results=serializable_test_results, | ||
| test_results_totals=serializable_test_results_totals, | ||
| test_runs=serializable_test_runs, | ||
| test_runs_totals=serializable_test_runs_totals, | ||
| coverages=serializable_models_coverages, | ||
| model_runs=serializable_model_runs, | ||
| model_runs_totals=serializable_model_runs_totals, | ||
| filters=serializable_filters, | ||
| lineage=serializable_lineage, | ||
| invocations=invocations, | ||
| resources_latest_invocation=models_latest_invocation, | ||
| invocations_job_identification=invocations_job_identification, | ||
| env=ReportDataEnvSchema( | ||
| project_name=project_name, env=env, warehouse_type=warehouse_type | ||
| ), | ||
| project_name=project_name, | ||
| env=env, | ||
| warehouse_type=warehouse_type, | ||
| seeds=seeds, | ||
| snapshots=snapshots, | ||
| models=models, | ||
| sources=sources, | ||
| exposures=exposures, | ||
| singular_tests=singular_tests, | ||
| models_runs=models_runs, | ||
| coverages=coverages, | ||
| tests=tests, | ||
| test_invocation=test_invocation, | ||
| test_results=test_results, | ||
| source_freshness_results=source_freshness_results, | ||
| test_runs=test_runs, | ||
| source_freshness_runs=source_freshness_runs, | ||
| lineage=lineage, | ||
| filters_api=FiltersAPI(dbt_runner=parallel_runner), |
There was a problem hiding this comment.
Addressed in 7b2df2b — both LineageAPI (Phase 3) and FiltersAPI (Phase 4) now use self.dbt_runner since they are invoked sequentially after the thread pool exits, avoiding unnecessary subprocess spawn overhead.
| except Exception as error: | ||
| return ReportDataSchema(), error |
There was a problem hiding this comment.
Addressed in 7b2df2b — both the parallel and sequential except Exception blocks now call logger.exception(...) with a distinct message before returning, so the traceback is preserved for diagnosis.
| def _assemble_report_data( | ||
| self, | ||
| days_back, | ||
| project_name, | ||
| env, | ||
| warehouse_type, | ||
| seeds, | ||
| snapshots, | ||
| models, | ||
| sources, | ||
| exposures, | ||
| singular_tests, | ||
| models_runs, | ||
| coverages, | ||
| tests, | ||
| test_invocation, | ||
| test_results, | ||
| source_freshness_results, | ||
| test_runs, | ||
| source_freshness_runs, | ||
| lineage, | ||
| filters_api, | ||
| models_latest_invocation, | ||
| invocations_data, | ||
| ) -> Tuple[ReportDataSchema, Optional[Exception]]: |
There was a problem hiding this comment.
Addressed in 7b2df2b — _assemble_report_data now has full type annotations for all 22 parameters: Dict[str, NormalizedSeedSchema], ModelRunsWithTotalsSchema, DbtInvocationSchema, LineageSchema, etc. Added matching imports.
| @click.option( | ||
| "--threads", | ||
| type=click.IntRange(min=1), | ||
| default=1, | ||
| help="Number of threads for fetching report data in parallel. " | ||
| "When set to >1, independent dbt operations run concurrently using subprocess-based runners.", | ||
| ) |
There was a problem hiding this comment.
Not addressing for now — only two declarations exist today (report and send-report). Lifting into common_options(...) with conditional gating felt like premature abstraction for a 2-call duplication. Happy to revisit if a third command adopts --threads.
| runner.project_dir = "/tmp/project" | ||
| runner.profiles_dir = "/tmp/profiles" | ||
| runner.target = "dev" | ||
| runner.raise_on_failure = True |
There was a problem hiding this comment.
Not addressing — _create_subprocess_runner() intentionally forwards the parent runner's raise_on_failure rather than forcing True, so the user's configured behavior carries over to parallel mode. Setting the fixture to False while asserting True would test a behavior the code does not (and should not) implement. The current test correctly verifies the forwarding contract.
| def test_deps_not_run(self, mock_dbt_runner): | ||
| api = ReportAPI(mock_dbt_runner) | ||
| with patch( | ||
| "elementary.monitor.api.report.report.SubprocessDbtRunner" | ||
| ) as mock_cls: | ||
| api._create_subprocess_runner() | ||
| call_kwargs = mock_cls.call_args[1] | ||
| assert call_kwargs["run_deps_if_needed"] is False |
There was a problem hiding this comment.
Addressed in 7b2df2b — test_deps_not_run removed. run_deps_if_needed=False is already covered by test_creates_runner_with_correct_config via assert_called_once_with(...).
Hi maintainers, thanks for the great work on Elementary! I'd appreciate a review when you get a chance.
Problem
Generating the Elementary report can be slow on larger projects because
ReportAPI.get_report_dataissues a long sequence ofdbt run-operationcalls (models, sources, seeds, snapshots, tests, test results, source freshness, invocations, coverages, lineage, …) one after another. Each call has a non-trivial dbt startup cost on top of the actual query time, so end-to-end report generation is bottlenecked on serial fetch latency even though most of these fetches are independent of each other.Two things make this tricky to parallelize today:
APIDbtRunnerrelies on dbt globals (GLOBAL_FLAGS, the adapterFACTORY, etc.) that are not safe to drive from multiple threads.Solution
This PR adds an opt-in
--threads Noption (default1, preserving current behavior) to bothedr reportandedr send-report.When
threads > 1:SubprocessDbtRunneris used for the parallel fetches. Each call spawns an independent dbt process, sidestepping the in-process global state and making it safe to drive concurrently from aThreadPoolExecutor.get_report_datais split into phases by data dependency:exposuresfrom upstream node IDs,test_resultskeyed by the test invocation) run in parallel.lineageis fetched once Phase 2 node IDs are known._assemble_report_datahelper.flowchart TD Start([edr report --threads N]) --> P1 subgraph P1[Phase 1: independent fetches in parallel] direction LR M[models] ~~~ S[sources] ~~~ Se[seeds] ~~~ Sn[snapshots] ~~~ T[tests] ~~~ TR[test runs] ~~~ SFR[source freshness runs] ~~~ C[coverages] ~~~ I[invocations] end P1 --> P2 subgraph P2[Phase 2: needs Phase 1 IDs, in parallel] direction LR E[exposures] ~~~ TRes[test_results] end P2 --> P3 subgraph P3[Phase 3: needs Phase 2 node IDs] L[lineage] end P3 --> P4[Phase 4: in-memory assembly] P4 --> End([report data])The sequential path is unchanged when
--threads 1(the default), so users who don't opt in see no behavior change.Example:
A unit test (
tests/unit/monitor/api/report/test_report_parallel.py) covers the parallel orchestration and confirms the assembled output matches the sequential path.Happy to adjust the option name, default, or factor things differently if you'd prefer a different shape.