feat: Add block_simultaneous_read with top-level stream_groups interface#870
feat: Add block_simultaneous_read with top-level stream_groups interface#870Anatolii Yatsuk (tolik0) wants to merge 12 commits intomainfrom
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@tolik0/concurrent-source/add-block_simultaneous_read#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch tolik0/concurrent-source/add-block_simultaneous_readHelpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
|
/autofix
|
|
/prerelease
|
|
/prerelease
|
|
/prerelease
|
There was a problem hiding this comment.
Pull request overview
This PR introduces a block_simultaneous_read feature to the Python CDK that prevents concurrent execution of streams sharing the same resource (API endpoint, session, or rate limit pool). The feature uses string-based group identifiers where streams with matching non-empty group names will not run concurrently, addressing issues like duplicate API calls when streams function as both standalone and parent streams.
Changes:
- Added
block_simultaneous_readproperty to stream interfaces and schema definitions with empty string as default (backward compatible) - Implemented blocking logic in
ConcurrentReadProcessorthat defers streams when their group or parent's group is active - Added comprehensive test coverage for various blocking scenarios including parent-child relationships and multi-level hierarchies
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
test_concurrent_read_processor.py |
Added comprehensive test suite covering all blocking scenarios |
test_model_to_component_factory.py |
Added integration test verifying manifest-to-stream property flow |
default_stream.py |
Added block_simultaneous_read property to DefaultStream |
adapters.py |
Added property adapter for legacy stream compatibility |
abstract_stream.py |
Added abstract property definition with documentation |
model_to_component_factory.py |
Integrated property from manifest to stream construction |
declarative_component_schema.py |
Generated schema with new property definition |
declarative_component_schema.yaml |
Added schema definition with comprehensive documentation |
concurrent_read_processor.py |
Implemented core blocking logic with group tracking and deferral |
Comments suppressed due to low confidence (1)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml:1
- The description change for
use_cacheappears unrelated to theblock_simultaneous_readfeature. This change should be separated into its own PR or have an explanation for why it's included in this feature PR.
"$schema": http://json-schema.org/draft-07/schema#
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
Outdated
Show resolved
Hide resolved
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds declarative-configurable stream groups and a stream-level Changes
Sequence Diagram(s)sequenceDiagram
participant Processor as ConcurrentReadProcessor
participant Queue as StreamQueue
participant Generator as PartitionGenerator
participant Tracker as ActiveStreamTracker
Processor->>Processor: start_next_partition_generator()
loop attempts until a stream starts or all blocked
Processor->>Queue: peek next stream
Processor->>Tracker: collect group + parent chain
alt blocked (self/parent/group active)
Processor->>Queue: defer stream (requeue to tail)
else not blocked
Processor->>Tracker: mark stream & parents active
Processor->>Tracker: associate stream with blocking group
Processor->>Generator: submit partition generator
Generator-->>Processor: STARTED status
end
end
Generator->>Processor: on_partition_generation_completed()
Processor->>Tracker: deactivate completed stream & related parents
Processor->>Processor: start_next_partition_generator() (may yield status)
Processor->>Processor: _on_stream_is_done()
Processor->>Tracker: remove stream from active sets and update groups
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Want me to call out specific lines or logic hotspots in concurrent_read_processor.py for deeper review (e.g., max_attempts loop, parent-collection edge cases, or log verbosity)? wdyt? 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)
90-135: Possible correctness issue: parent “deactivation” can un-block a parent that’s still independently active, wdyt?
Right now, after child partition generation completes you discard each parent from_active_stream_names/_active_groupspurely based on being a parent. If the parent stream is also actively generating partitions or still reading its own partitions, this can weaken the blocking guarantee for that parent’s group.Proposed guard before deactivating a parent
parent_streams = self._collect_all_parent_stream_names(stream_name) for parent_stream_name in parent_streams: if parent_stream_name in self._active_stream_names: + # If the parent is still independently active (generating partitions or reading partitions), + # don't deactivate it just because a child's partition generation finished. + if ( + parent_stream_name in self._streams_currently_generating_partitions + or len(self._streams_to_running_partitions.get(parent_stream_name, set())) > 0 + ): + continue self._logger.debug(f"Removing '{parent_stream_name}' from active streams") self._active_stream_names.discard(parent_stream_name)
🤖 Fix all issues with AI agents
In @airbyte_cdk/sources/declarative/declarative_component_schema.yaml:
- Around line 1556-1575: The docs claim block_simultaneous_read accepts "" or
null but the schema defines only type: string, causing validation errors; update
the declarative_component_schema.yaml to make block_simultaneous_read accept
null by changing its type to allow both string and null (e.g., type: [ "string",
"null" ] and keep default ""), or alternatively remove the word "null" from the
description so the docs match the current type: string; ensure the change
references the block_simultaneous_read entry and adjust the default/description
accordingly.
In @airbyte_cdk/sources/declarative/models/declarative_component_schema.py:
- Around line 2746-2749: The Field use_cache in declarative_component_schema.py
currently has default=False but the description implies a default of True;
update the description to match the default (e.g., change the sentence “Only set
this to false if you are certain that caching should be disabled…” to “Only set
this to true if you are certain that caching should be enabled…”) or, if the
intent was to enable caching by default, flip the Field default to True—locate
the use_cache Field definition and make the description and default consistent.
In @unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py:
- Around line 1175-1208: The test test_multiple_blocked_streams_retry_in_order
is incomplete and doesn't assert retry ordering; update the test to
deterministically exercise
ConcurrentReadProcessor.start_next_partition_generator and then assert the
expected order and retry behavior: after starting the parent (verify "parent" in
handler._active_stream_names), call handler.start_next_partition_generator
repeatedly and assert that child1 is deferred before child2 (inspect
handler._stream_instances_to_start_partition_generation order or pop sequence),
then simulate parent completion (mark partition generation finished or remove
"parent" from _active_stream_names) and call start_next_partition_generator to
verify that child1 is retried/started before child2. Use the existing mock
streams (parent, child1, child2) and the handler instance to make these concrete
assertions.
- Around line 1047-1073: The test's docstring is accidentally split into two
adjacent string literals ("""Test that blocking doesn't occur when
block_simultaneous_read=""" """) which is confusing; replace it with a single
proper triple-quoted docstring for test_no_defer_when_flag_false (e.g. """Test
that blocking doesn't occur when block_simultaneous_read=''""") and leave the
call to _create_mock_stream(stream1, block_simultaneous_read="") and references
to ConcurrentReadProcessor unchanged.
🧹 Nitpick comments (10)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2098-2122: Propagation looks correct; consider normalizing whitespace-only group names (optional), wdyt?
model.block_simultaneous_read or ""keeps backward compatibility, nice. If you want to reduce accidental “blocking enabled” from values like" "in manifests, would you consider trimming here?Proposed tweak
- block_simultaneous_read=model.block_simultaneous_read or "", + block_simultaneous_read=(model.block_simultaneous_read or "").strip(),airbyte_cdk/sources/streams/concurrent/default_stream.py (2)
18-41: Consider defensively coercingNoneto""in the ctor, wdyt?Even with type hints, callers can pass
Noneat runtime; making this robust in the ctor avoids surprising downstream issues.Proposed tweak
- self._block_simultaneous_read = block_simultaneous_read + self._block_simultaneous_read = block_simultaneous_read or ""
86-103: Would addingblock_simultaneous_readto the debug “sync configuration” log help diagnosability, wdyt?Right now
log_stream_sync_configuration()logs PK + cursor; including the group could make concurrency issues much easier to debug.unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
5217-5353: Nice end-to-end propagation test; consider also asserting the nested parent stream instance on the child carries the group, wdyt?
Right now you assertparent_stream.block_simultaneous_readandchild_stream.block_simultaneous_read, but not that theParentStreamConfig.streamcreated underchild_stream’sSubstreamPartitionRouteralso has"issues_endpoint"(covers the “parent stream instantiated as a dependency” path too). Also, the inline comments still mentiontruethough this is a string field.unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (2)
809-839: Mockingblock_simultaneous_readas an attribute is fine here; could you make logger assertions less brittle, wdyt?
Instead of"... in str(call)", consider asserting onself._logger.info.call_args_list[i].args[0](or usingassert_any_callwith a stable substring) so formatting changes don’t break tests.
976-1015: This test mutates internal state in a way that can create duplicates; can we avoid appending “parent” twice, wdyt?
start_next_partition_generator()already appends"parent"to_streams_currently_generating_partitions; the extraappend("parent")can leave a stale entry after.remove()and make later invariants weird.airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (4)
69-89: Per-streaminfolog on init may be noisy; woulddebugbe enough, wdyt?
This logs once per stream with a group; on connectors with many streams this could be a lot atinfo.
228-336: Docs/comments still mentionblock_simultaneous_read=Truebut the API is string-based; update wording, wdyt?
This is minor, but the docstring bullets (and some inline comments) still describe it like a boolean flag, which makes the grouping semantics harder to follow.
60-61: Do we want_stream_instances_to_start_partition_generationto be a copy to avoid mutating the caller’s list, wdyt?
Since you pop/append/reorder, copying is a cheap way to avoid surprising aliasing.Proposed tweak
- self._stream_instances_to_start_partition_generation = stream_instances_to_read_from + self._stream_instances_to_start_partition_generation = list(stream_instances_to_read_from)Also applies to: 240-335
367-407: Parent collection relies on private attributes; would a small helper/getattrchain improve readability, wdyt?
The logic is sound, but the nestedhasattrchain is a bit hard to scan and easy to break during refactors.
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.pyairbyte_cdk/sources/declarative/declarative_component_schema.yamlairbyte_cdk/sources/declarative/models/declarative_component_schema.pyairbyte_cdk/sources/declarative/parsers/model_to_component_factory.pyairbyte_cdk/sources/streams/concurrent/abstract_stream.pyairbyte_cdk/sources/streams/concurrent/adapters.pyairbyte_cdk/sources/streams/concurrent/default_stream.pyunit_tests/sources/declarative/parsers/test_model_to_component_factory.pyunit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2024-12-11T16:34:46.319Z
Learnt from: pnilan
Repo: airbytehq/airbyte-python-cdk PR: 0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.
Applied to files:
airbyte_cdk/sources/declarative/models/declarative_component_schema.pyairbyte_cdk/sources/declarative/declarative_component_schema.yaml
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
🧬 Code graph analysis (8)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
airbyte_cdk/sources/streams/concurrent/adapters.py (1)
block_simultaneous_read(200-202)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
block_simultaneous_read(89-101)airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
block_simultaneous_read(100-102)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
airbyte_cdk/sources/streams/concurrent/adapters.py (1)
block_simultaneous_read(200-202)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
block_simultaneous_read(89-101)airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
block_simultaneous_read(100-102)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (2)
airbyte_cdk/sources/streams/concurrent/adapters.py (1)
block_simultaneous_read(200-202)airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
block_simultaneous_read(100-102)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (5)
airbyte_cdk/sources/streams/concurrent/adapters.py (3)
name(180-181)block_simultaneous_read(200-202)stream_name(316-317)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (3)
name(50-53)block_simultaneous_read(89-101)generate_partitions(42-46)airbyte_cdk/sources/streams/concurrent/default_stream.py (3)
name(46-47)block_simultaneous_read(100-102)generate_partitions(42-43)airbyte_cdk/sources/concurrent_source/partition_generation_completed_sentinel.py (1)
PartitionGenerationCompletedSentinel(9-24)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
stream_name(118-119)
airbyte_cdk/sources/streams/concurrent/adapters.py (2)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
block_simultaneous_read(89-101)airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
block_simultaneous_read(100-102)
airbyte_cdk/sources/streams/concurrent/default_stream.py (2)
airbyte_cdk/sources/streams/concurrent/adapters.py (1)
block_simultaneous_read(200-202)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
block_simultaneous_read(89-101)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (4)
airbyte_cdk/sources/streams/concurrent/adapters.py (5)
name(180-181)block_simultaneous_read(200-202)as_airbyte_stream(213-214)cursor(196-197)stream_name(316-317)airbyte_cdk/sources/streams/concurrent/default_stream.py (4)
name(46-47)block_simultaneous_read(100-102)as_airbyte_stream(60-84)cursor(96-97)airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (5)
ConcurrentReadProcessor(31-439)start_next_partition_generator(228-335)on_partition_generation_completed(90-134)_collect_all_parent_stream_names(367-406)_on_stream_is_done(408-439)airbyte_cdk/sources/concurrent_source/partition_generation_completed_sentinel.py (1)
PartitionGenerationCompletedSentinel(9-24)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)
airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py (1)
propagate_types_and_parameters(87-188)airbyte_cdk/sources/streams/concurrent/default_stream.py (2)
name(46-47)block_simultaneous_read(100-102)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
create_component(825-858)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Publish SDM to DockerHub
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
2498-2502: Field type should match YAML schema: makeblock_simultaneous_reada non-Optionalstr.The YAML source defines this as
type: stringwithdefault: "", but the generated Python schema makes itOptional[str]. While the factory normalization (model.block_simultaneous_read or "") ensures runtime safety, the type inconsistency is worth fixing. Consider updating the codegen to generatestrinstead ofOptional[str]to match the YAML definition and avoid the appearance of a nullable value when the schema itself doesn't allow it.⛔ Skipped due to learnings
Learnt from: pnilan Repo: airbytehq/airbyte-python-cdk PR: 0 File: :0-0 Timestamp: 2024-12-11T16:34:46.319Z Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.Learnt from: ChristoGrab Repo: airbytehq/airbyte-python-cdk PR: 58 File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0 Timestamp: 2024-11-18T23:40:06.391Z Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
88-101: LGTM: good backward-compatible default + clear contract.Non-abstract default
""is a solid choice to avoid forcing implementations.airbyte_cdk/sources/streams/concurrent/adapters.py (1)
199-203: LGTM: simple, correct delegation.airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)
425-439: Cleanup on stream completion looks good.
Removing only the completed stream (not parents) from active tracking matches the intent and keeps parent lifecycle separate.
| block_simultaneous_read: | ||
| title: Block Simultaneous Read | ||
| description: > | ||
| Optional group name for blocking simultaneous reads. Streams with the same | ||
| block_simultaneous_read value will not be read concurrently. This prevents | ||
| duplicate API calls when a stream is used as both a standalone stream and a | ||
| parent stream, or when multiple streams share the same endpoint/session. | ||
|
|
||
| If set to a non-empty string, the stream will be deferred if: | ||
| 1. Another stream in the same group is currently active | ||
| 2. Any parent stream is in an active group | ||
|
|
||
| Examples: | ||
| - "issues_endpoint" - All streams with this value block each other | ||
| - "" or null - No blocking (default) | ||
|
|
||
| This is useful for APIs that don't allow concurrent access to the same | ||
| endpoint or session. Only applies to ConcurrentDeclarativeSource. | ||
| type: string | ||
| default: "" |
There was a problem hiding this comment.
Doc/schema mismatch for null (either allow it in schema or remove from docs), wdyt?
Your description says "" or null - No blocking (default) (Line 1570), but the schema is type: string (Line 1574), so null would fail validation if someone uses it in YAML.
Proposed fix (option A: allow null)
block_simultaneous_read:
title: Block Simultaneous Read
description: >
@@
- - "" or null - No blocking (default)
+ - "" or null - No blocking (default)
@@
- type: string
+ type:
+ - string
+ - "null"
default: ""Alternative (option B: keep string-only and remove “null” from docs)
- - "" or null - No blocking (default)
+ - "" - No blocking (default)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| block_simultaneous_read: | |
| title: Block Simultaneous Read | |
| description: > | |
| Optional group name for blocking simultaneous reads. Streams with the same | |
| block_simultaneous_read value will not be read concurrently. This prevents | |
| duplicate API calls when a stream is used as both a standalone stream and a | |
| parent stream, or when multiple streams share the same endpoint/session. | |
| If set to a non-empty string, the stream will be deferred if: | |
| 1. Another stream in the same group is currently active | |
| 2. Any parent stream is in an active group | |
| Examples: | |
| - "issues_endpoint" - All streams with this value block each other | |
| - "" or null - No blocking (default) | |
| This is useful for APIs that don't allow concurrent access to the same | |
| endpoint or session. Only applies to ConcurrentDeclarativeSource. | |
| type: string | |
| default: "" | |
| block_simultaneous_read: | |
| title: Block Simultaneous Read | |
| description: > | |
| Optional group name for blocking simultaneous reads. Streams with the same | |
| block_simultaneous_read value will not be read concurrently. This prevents | |
| duplicate API calls when a stream is used as both a standalone stream and a | |
| parent stream, or when multiple streams share the same endpoint/session. | |
| If set to a non-empty string, the stream will be deferred if: | |
| 1. Another stream in the same group is currently active | |
| 2. Any parent stream is in an active group | |
| Examples: | |
| - "issues_endpoint" - All streams with this value block each other | |
| - "" or null - No blocking (default) | |
| This is useful for APIs that don't allow concurrent access to the same | |
| endpoint or session. Only applies to ConcurrentDeclarativeSource. | |
| type: | |
| - string | |
| - "null" | |
| default: "" |
| block_simultaneous_read: | |
| title: Block Simultaneous Read | |
| description: > | |
| Optional group name for blocking simultaneous reads. Streams with the same | |
| block_simultaneous_read value will not be read concurrently. This prevents | |
| duplicate API calls when a stream is used as both a standalone stream and a | |
| parent stream, or when multiple streams share the same endpoint/session. | |
| If set to a non-empty string, the stream will be deferred if: | |
| 1. Another stream in the same group is currently active | |
| 2. Any parent stream is in an active group | |
| Examples: | |
| - "issues_endpoint" - All streams with this value block each other | |
| - "" or null - No blocking (default) | |
| This is useful for APIs that don't allow concurrent access to the same | |
| endpoint or session. Only applies to ConcurrentDeclarativeSource. | |
| type: string | |
| default: "" | |
| block_simultaneous_read: | |
| title: Block Simultaneous Read | |
| description: > | |
| Optional group name for blocking simultaneous reads. Streams with the same | |
| block_simultaneous_read value will not be read concurrently. This prevents | |
| duplicate API calls when a stream is used as both a standalone stream and a | |
| parent stream, or when multiple streams share the same endpoint/session. | |
| If set to a non-empty string, the stream will be deferred if: | |
| 1. Another stream in the same group is currently active | |
| 2. Any parent stream is in an active group | |
| Examples: | |
| - "issues_endpoint" - All streams with this value block each other | |
| - "" - No blocking (default) | |
| This is useful for APIs that don't allow concurrent access to the same | |
| endpoint or session. Only applies to ConcurrentDeclarativeSource. | |
| type: string | |
| default: "" |
🤖 Prompt for AI Agents
In @airbyte_cdk/sources/declarative/declarative_component_schema.yaml around
lines 1556 - 1575, The docs claim block_simultaneous_read accepts "" or null but
the schema defines only type: string, causing validation errors; update the
declarative_component_schema.yaml to make block_simultaneous_read accept null by
changing its type to allow both string and null (e.g., type: [ "string", "null"
] and keep default ""), or alternatively remove the word "null" from the
description so the docs match the current type: string; ensure the change
references the block_simultaneous_read entry and adjust the default/description
accordingly.
| False, | ||
| description="Enables stream requests caching. This field is automatically set by the CDK.", | ||
| description="Enables stream requests caching. When set to true, repeated requests to the same URL will return cached responses. Parent streams automatically have caching enabled. Only set this to false if you are certain that caching should be disabled, as it may negatively impact performance when the same data is needed multiple times (e.g., for scroll-based pagination APIs where caching causes duplicate records).", | ||
| title="Use Cache", | ||
| ) |
There was a problem hiding this comment.
use_cache description reads like default is True, but Field default is False
The updated wording says “Only set this to false…” while the default is already False; would you tweak the YAML description to avoid confusing users (e.g., “Only set this to true if…” or flip the default if that’s the intent), wdyt?
🤖 Prompt for AI Agents
In @airbyte_cdk/sources/declarative/models/declarative_component_schema.py
around lines 2746 - 2749, The Field use_cache in declarative_component_schema.py
currently has default=False but the description implies a default of True;
update the description to match the default (e.g., change the sentence “Only set
this to false if you are certain that caching should be disabled…” to “Only set
this to true if you are certain that caching should be enabled…”) or, if the
intent was to enable caching by default, flip the Field default to True—locate
the use_cache Field definition and make the description and default consistent.
|
/autofix
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py:
- Around line 1238-1248: The failing linter flagged formatting in the assertion
block; run the code formatter (ruff format) to fix whitespace/line-wrapping and
ensure list comprehension and multi-line statements follow project style. In
practice, reformat the block that builds children_active and the two assertions
referencing handler._active_stream_names so the list comprehension and asserts
conform to ruff rules (e.g., proper indentation and line breaks).
🧹 Nitpick comments (1)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (1)
948-974: Misleading test name - consider renaming?The test name
test_retry_blocked_stream_after_blocker_donesuggests it tests the retry mechanism after a blocker completes, but the actual behavior (as the docstring correctly states) is verifying that streams with different groups don't block each other.Would something like
test_different_groups_dont_block_each_otherortest_streams_with_different_groups_run_concurrentlybetter describe the test's intent, wdyt?
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (2)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (5)
ConcurrentReadProcessor(31-439)start_next_partition_generator(228-335)on_partition_generation_completed(90-134)_collect_all_parent_stream_names(367-406)_on_stream_is_done(408-439)airbyte_cdk/sources/concurrent_source/partition_generation_completed_sentinel.py (1)
PartitionGenerationCompletedSentinel(9-24)
🪛 GitHub Actions: Linters
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
[warning] 1238-1248: Ruff format suggested code style changes in test_concurrent_read_processor.py. Run 'ruff format' to apply formatting changes.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-shopify
- GitHub Check: Check: destination-motherduck
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
🔇 Additional comments (3)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (3)
800-838: LGTM!The helper methods
_create_mock_streamand_create_mock_stream_with_parentare well-structured and effectively reduce boilerplate across the tests. The parent relationship mocking correctly mirrors the retriever/partition_router pattern used in the actual implementation.
1249-1308: Nice edge case coverage!These two tests (
test_child_without_flag_blocked_by_parent_with_flagandtest_child_with_flag_not_blocked_by_parent_without_flag) nicely cover the asymmetric blocking behavior. This ensures the feature correctly respects the parent's blocking group regardless of the child's configuration, which is important for the use case where a parent API endpoint needs protection.
1310-1347: LGTM - validates the core group-based blocking feature!This test is particularly important as it validates the primary use case mentioned in the PR objectives (source-intercom). It confirms that unrelated streams sharing a resource (same group name) correctly block each other, which is the key feature of
block_simultaneous_read.
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (1)
948-974: Test name doesn't match behavior - could we rename it, wdyt?The test name
test_retry_blocked_stream_after_blocker_donesuggests it tests retry behavior after a blocker completes, but the docstring and test body actually verify that streams in different groups don't block each other. Maybe rename totest_different_groups_do_not_block_each_otherto better reflect what's being tested?✏️ Suggested rename
- def test_retry_blocked_stream_after_blocker_done(self): - """Test that independent streams with different groups don't block each other""" + def test_different_groups_do_not_block_each_other(self): + """Test that independent streams with different groups don't block each other"""
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (1)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (4)
start_next_partition_generator(228-335)on_partition_generation_completed(90-134)_collect_all_parent_stream_names(367-406)_on_stream_is_done(408-439)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-intercom
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-shopify
- GitHub Check: SDM Docker Image Build
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (8)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (8)
800-838: LGTM!The setUp and helper methods are well-structured. The
_create_mock_stream_with_parenthelper correctly mocks theretriever.partition_router.parent_stream_configspath that_collect_all_parent_stream_namesuses. Nice job making these helpers reusable across all the test cases!
840-872: LGTM!Good coverage of the self-blocking scenario. The test correctly verifies the three key outcomes: no stream started (None result), stream moved back to queue, and appropriate logging.
874-947: LGTM!Nice coverage of both parent and grandparent blocking scenarios. Testing the multi-level hierarchy ensures the recursive
_collect_all_parent_stream_nameslogic works correctly.
976-1014: LGTM!Good test of the integration between
on_partition_generation_completedandstart_next_partition_generator. The comments explaining the parent re-activation behavior (lines 1009-1010) are helpful for understanding the expected semantics.
1016-1096: LGTM!Good coverage of queue reordering, the empty-string default case, and recursive parent collection. These tests ensure the foundational behaviors work correctly.
1098-1173: LGTM!Great job testing the distinction between parent deactivation on partition generation completion vs. stream deactivation on stream completion. This is an important behavioral nuance in the blocking mechanism.
1175-1248: LGTM!Comprehensive test of multiple blocked streams. The f-string assertion message at lines 1243-1245 is a nice touch for debugging failures. Good coverage of the group-based blocking with parent-child relationships.
1250-1348: LGTM!Excellent edge case coverage! The asymmetric flag scenarios (lines 1250-1309) are particularly valuable for verifying that blocking is determined by the parent's flag status. And
test_unrelated_streams_in_same_group_block_each_otherconfirms group-based blocking works independently of parent-child relationships.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In @unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py:
- Around line 948-974: The test function name
test_retry_blocked_stream_after_blocker_done is misleading because the body
asserts that streams in different groups start without blocking; rename the test
function to a descriptive name such as
test_different_groups_do_not_block_each_other and update any references to the
test (e.g., its declaration and any test markers) so the new name reflects the
behavior being validated (verify that
ConcurrentReadProcessor.start_next_partition_generator allows stream2 to start
while stream1 is active when they have different block_simultaneous_read
groups).
- Around line 1350-1460: The file contains a duplicate test method definition
for test_child_starts_after_parent_completes_via_partition_complete_sentinel;
remove the duplicate definition so only one test function with that name remains
(delete the second/identical block of
test_child_starts_after_parent_completes_via_partition_complete_sentinel),
ensuring the remaining test still uses the same setup and assertions.
🧹 Nitpick comments (2)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2)
286-292: Minor optimization:_collect_all_parent_stream_namesis called twice.The method is called once at line 286 for checking blocked parents and again at line 319 for marking parents as active. Since the stream hasn't changed between these calls, you could cache the result to avoid the redundant traversal, wdyt?
♻️ Suggested optimization
# No blocking - start this stream # Mark stream as active before starting self._active_stream_names.add(stream_name) self._streams_currently_generating_partitions.append(stream_name) # Track this stream in its group if it has one if stream_group: if stream_group not in self._active_groups: self._active_groups[stream_group] = set() self._active_groups[stream_group].add(stream_name) self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'") - # Also mark all parent streams as active (they will be read from during partition generation) - parent_streams = self._collect_all_parent_stream_names(stream_name) + # Also mark all parent streams as active (they will be read from during partition generation) + # Reuse parent_streams from the blocking check above for parent_stream_name in parent_streams:And move the
parent_streams = self._collect_all_parent_stream_names(stream_name)call (line 286) earlier before the blocking check, storing it in a variable that can be reused.Also applies to: 318-330
391-400: Consider adding a comment documenting the internal coupling or proposing a public interface method.The chain
_stream_partition_generator._stream_slicer._partition_routerdoes rely on internal structure, and this pattern is isolated to this function. The defensivehasattrchecks and fallback path toretriever.partition_routerhelp mitigate the risk. However, if either path fails, it silently returns an empty set rather than surfacing the issue. Adding a clarifying comment about why this coupling exists, or exploring whether the stream could expose parent streams through a public interface, might make this more maintainable long-term. What do you think—would a comment be sufficient, or does this warrant a deeper refactor?
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.pyunit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
🔇 Additional comments (8)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (4)
69-88: LGTM!The data structures for tracking active streams and groups are well-chosen. The initialization logging will be helpful for debugging blocking behavior in production.
102-123: Nice approach to parent lifecycle management.Deactivating parent streams immediately after partition generation completes is a solid design choice—it frees up the blocking group for other streams to proceed. The logging is informative without being excessive.
175-179: Good addition for retry trigger.This ensures blocked streams get a chance to start when a blocking stream completes via partition sentinel. Nice symmetry with the same logic in
on_partition_generation_completed.
430-444: LGTM!The cleanup logic correctly removes only the completed stream from active tracking, and the comment at line 431 makes the intent clear. Good practice to delete empty groups to prevent unbounded growth.
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (4)
797-838: Nice test utilities!The helper methods
_create_mock_streamand_create_mock_stream_with_parentare clean and make the tests readable. Good use of the retriever-based path which matches one of the lookup strategies in the production code.
840-946: Solid test coverage for blocking scenarios.These tests nicely cover self-blocking, parent blocking, and grandparent blocking. The assertions on both return values and internal state (like
_active_stream_names) provide good confidence.
1016-1072: Great edge case coverage!
test_blocked_stream_added_to_end_of_queueandtest_no_defer_when_flag_falseare important edge cases. The queue reordering test verifies that non-blocking streams can skip ahead, which is key to the feature's usability.
1250-1309: Excellent asymmetric blocking tests!These two tests (
test_child_without_flag_blocked_by_parent_with_flagandtest_child_with_flag_not_blocked_by_parent_without_flag) cover important edge cases around flag inheritance. The behavior where a child is blocked by a parent with the flag (even if the child doesn't have it) but not vice versa is well-documented through these tests.
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
Outdated
Show resolved
Hide resolved
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In @unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py:
- Around line 994-999: Test calls handler.start_next_partition_generator() which
already appends "parent" into handler._streams_currently_generating_partitions,
so remove the manual duplicate append of "parent" (the line that does
handler._streams_currently_generating_partitions.append("parent")) and keep the
subsequent setup of handler._streams_to_running_partitions["parent"] = set() so
the test still marks parent as having no running partitions.
- Around line 1229-1233: The test duplicates adding "parent" to the handler's
generation set: remove the manual append to
handler._streams_currently_generating_partitions (the line that appends
"parent") because start_next_partition_generator() already adds it; ensure
_streams_to_running_partitions["parent"] = set() and the sentinel creation/call
to handler.on_partition_generation_completed(sentinel) remain unchanged so the
test behavior is preserved.
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (2)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (5)
ConcurrentReadProcessor(31-445)start_next_partition_generator(234-341)on_partition_generation_completed(90-134)_collect_all_parent_stream_names(373-412)_on_stream_is_done(414-445)airbyte_cdk/sources/concurrent_source/partition_generation_completed_sentinel.py (1)
PartitionGenerationCompletedSentinel(9-24)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (14)
- GitHub Check: Analyze (python)
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: destination-motherduck
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
🔇 Additional comments (6)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (6)
800-838: Well-designed test helpers!The helper methods
_create_mock_streamand_create_mock_stream_with_parentnicely encapsulate the mock setup and make the tests readable. The parent relationship mocking correctly mirrors the structure expected by_collect_all_parent_stream_namesin the processor.
840-911: Good coverage of basic deferral scenarios.These tests effectively verify the core blocking mechanics for self-active and parent-active cases. The logger assertions using
any()with string matching provide good diagnostic coverage, wdyt?
1016-1045: Nice queue ordering verification!This test clearly demonstrates the "blocked streams move to end of queue" behavior, which is crucial for understanding the retry mechanism. The explicit ordering assertions make the expected behavior very clear.
1250-1309: Excellent edge case coverage!These two tests (
test_child_without_flag_blocked_by_parent_with_flagandtest_child_with_flag_not_blocked_by_parent_without_flag) effectively document and verify the asymmetric blocking behavior. This is exactly the kind of nuanced behavior that needs explicit test coverage. The naming clearly communicates the scenario being tested.
1311-1348: Key test for the shared endpoint use case!This test validates the primary use case mentioned in the PR (source-intercom duplicate concurrent calls). It's great that you have explicit coverage for unrelated streams blocking each other purely based on group membership.
1350-1404: Comprehensive end-to-end test!This test nicely demonstrates the complete lifecycle: parent blocks child, parent completes, child unblocks and starts. The verification of the STARTED trace message at the end adds confidence that the proper status messages are emitted. This is valuable for understanding the expected runtime behavior.
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
Outdated
Show resolved
Hide resolved
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
Dismissed
Show dismissed
Hide dismissed
|
/prerelease
|
Brian Lai (brianjlai)
left a comment
There was a problem hiding this comment.
I have some mixed feelings about how we represent this stream grouping to support blocking from the language. It feels like allowing it to be a string and just grouping to together ourselves might be a little bit error prone for users. It might take some redesigning of how we parse these groups, but I'm wondering if an interesting approach would be something like a StreamGroup which would in turn contain a set of streams that needs to be unblocking. And potentially config options like blocking or whatever other ones we might think of. And maybe StreamGroup is itself a subclass of a Stream. Granted I haven't dedicated a ton of time to fleshing this out more, but maybe this is something we can sync on.
I think the other thing that worries me about this PR is that it feels quite risky because all of this grouping and pausing logic along the processing queue and which partitions we put on the queue feels like a lot of complexity and if we're not careful, we might introduce a deadlock for all of our connectors. I'm having a hard time parsing through the logic and getting a clear picture of what edge cases I might be missing honestly.
So two things would be:
- I think us doing an in-person review of this over Zoom would be pretty helpful
- I think the logic is meant to be a no-op if no streams define a blocking group so then
self_stream_block_simultaneous_readwill all be"". But the logic is still pretty deeply integrated across the whole flow so a bug here could affect all our connectors. Maybe we can try to find a better way of isolating this behavior, and that's where that dedicated stream group component might be interesting.
This feels like a legitimate full fledged project if I'm being honest because there are some DX considerations on how to represent this and maybe some ways we need to derisk this change a bit more.
| # All streams in the queue are currently blocked | ||
| return None | ||
|
|
||
| def is_done(self) -> bool: |
There was a problem hiding this comment.
Add a check if no streams in _stream_instances_to_start_partition_generation
There was a problem hiding this comment.
also worth it to maybe check that active stream groups is empty too
Brian Lai (brianjlai)
left a comment
There was a problem hiding this comment.
I have a few things and a lot of comments to clarify my understanding of the flow and what is expected to happen under different scenarios.
We already chatted over Zoom about the changes to the interface to more explicitly define stream groupings and the blocking actions, Daryna Ishchenko (@darynaishchenko) can approve that portion while I am out on PTO.
| hasattr(stream, "_stream_partition_generator") | ||
| and hasattr(stream._stream_partition_generator, "_stream_slicer") | ||
| and hasattr(stream._stream_partition_generator._stream_slicer, "_partition_router") | ||
| ): |
There was a problem hiding this comment.
This feels pretty messy to have to perform such a careful assertion on a bunch of potentially null private fields. I know it'll be too much work to make these public, but can we create a helper method on DefaultStream called get_partition_router() that does all this for us? and then we can avoid all these hasattr and we just get back the partition router or None? wdyt?
| # Add back to the END of the queue for retry later | ||
| self._stream_instances_to_start_partition_generation.append(stream) | ||
| self._logger.info( | ||
| f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream." |
There was a problem hiding this comment.
I don't really understand this case. Why do we put the stream into the back of the stream_instances_to_start if this stream is already active. If this stream is already active, it's presumably already generating partitions and syncing so why do we want to retry it by putting it in the back of the queue?
| self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'") | ||
|
|
||
| # Also mark all parent streams as active (they will be read from during partition generation) | ||
| parent_streams = self._collect_all_parent_stream_names(stream_name) |
There was a problem hiding this comment.
why do we need to call self._collect_all_parent_stream_names(stream_name) again? Above on line 286, we already get the parent streams https://github.com/airbytehq/airbyte-python-cdk/pull/870/files#diff-125b82e622876f39aefe23c0fee9fc828262e7db2556cb7fb7a7a1b9bdb2fd77R286 .
| ) | ||
| yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status) | ||
|
|
||
| # Remove only this stream from active set (NOT parents) |
There was a problem hiding this comment.
Is the reason why we only remove this stream from the active set because we are already removing the parents from the active set when partition generation is completed in on_partition_generation_completed()
| and p in self._active_stream_names | ||
| ] | ||
|
|
||
| if blocked_by_parents: |
There was a problem hiding this comment.
One scenario I want to make sure I understand and that we have covered is when a substream child_stream (w/ parent stream a_stream with a blocking group) is started before it's parent.
So let's say that we happen to have the child_stream which is the first in the list of _stream_instances_to_start_partition_generation.
- We go through this method and start the stream because it is not blocked by any active streams
- We mark all the parent streams which includes
a_streamas active and we set the parent's active group - Parent
a_streamis read from the list - We see the group is already active from step 2.
- We put it back on the end of the list
- When we trigger
on_partition_generation_completed()forchild_streamwe remove the parents from the active groups. We then callstart_next_partition_generator(). - We process parent
a_stream. It is no longer in a blocking group so we start that stream. - We add
a_streamto active streams and it's blocking group
This is how I interpreted the flow below. Does that sound right to you Anatolii Yatsuk (@tolik0) ?
…tream_groups - Remove block_simultaneous_read property from DeclarativeStream schema - Add top-level stream_groups with StreamGroup and BlockSimultaneousSyncsAction - ConcurrentDeclarativeSource parses stream_groups and injects block_simultaneous_read into stream configs before factory processing - Internal blocking logic in ConcurrentReadProcessor unchanged - Update tests for new interface Co-Authored-By: unknown <>
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
2769-2769: Could we align theuse_cachedescription with itsFalsedefault, wdyt?The description still reads like disabling cache is exceptional, which is confusing with the current default and behavior notes.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte_cdk/sources/declarative/models/declarative_component_schema.py` at line 2769, Update the description string for the use_cache field in declarative_component_schema.py so it reflects that the default is False and that enabling caching is opt-in; remove wording that implies disabling is exceptional. Specifically, edit the description on the use_cache field to state that caching is off by default, explain what enabling it does (repeated requests to the same URL will be cached), note that parent streams may enable caching automatically if applicable, and warn about potential duplicate records for scroll-based pagination when caching is enabled.
🧹 Nitpick comments (4)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2)
85-93: Could we tightenstreamsvalidation to avoid no-op or duplicate entries, wdyt?Right now
streamscan be empty and can include duplicates. Adding simple constraints would catch misconfigurations earlier.🔧 Proposed schema hardening
streams: title: Streams description: > List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream"). type: array + minItems: 1 + uniqueItems: true items: type: string🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml` around lines 85 - 93, The streams array currently allows empty or duplicate entries; update the streams schema block (the "streams" property in declarative_component_schema.yaml) to enforce minItems: 1 and uniqueItems: true and tighten item validation (e.g., require string pattern matching JSON refs like "^#/definitions/.+") so misconfigured or no-op stream lists are rejected early.
87-90: Could we clarify the expectedstreamsidentifier format in docs, wdyt?The current text says “JSON references,” but many manifests define streams inline. A concrete accepted format/example here would reduce author confusion.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml` around lines 87 - 90, Update the "streams" description in declarative_component_schema.yaml to explicitly document the accepted identifier formats: a JSON Reference string (e.g., "#/definitions/my_stream") and inline stream definitions (an object with the stream schema), and include a short example for each format to reduce author confusion; target the "streams" property description text so it lists both formats and shows one-line examples.unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
5217-5358: Could we tighten this test’s scope and nested-parent assertion forblock_simultaneous_read, wdyt?On Line 5217 the test name/docstring reads as stream-groups end-to-end, but this test is really validating post-injection factory behavior. Also, could we add an assertion that the parent stream instantiated inside
child_stream’sSubstreamPartitionRouterkeeps"issues_endpoint"too? That would better protect the parent-traversal path used by concurrent blocking logic.Suggested assertion addition
assert isinstance(child_stream, DefaultStream) assert child_stream.name == "child" assert child_stream.block_simultaneous_read == "issues_endpoint" + embedded_parent_stream = ( + child_stream._stream_partition_generator._stream_slicer.parent_stream_configs[0].stream + ) + assert embedded_parent_stream.block_simultaneous_read == "issues_endpoint"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@unit_tests/sources/declarative/parsers/test_model_to_component_factory.py` around lines 5217 - 5358, Rename the test/docstring to reflect it validates factory behavior (post-injection) rather than end-to-end stream-groups and tighten scope to only check factory-created streams; then add an assertion that the parent stream object used by the child stream's SubstreamPartitionRouter retains the injected block_simultaneous_read value. Specifically, in this test after creating child_stream (DefaultStream) inspect the SubstreamPartitionRouter instance (referenced by child_stream.partition_router or similar on the created DefaultStream) and assert that its ParentStreamConfig / instantiated parent stream (the entry corresponding to the "#/parent_stream" manifest) has block_simultaneous_read == "issues_endpoint". Ensure the new assertion references SubstreamPartitionRouter and ParentStreamConfig (or the actual attribute names on child_stream.partition_router) so the factory’s nested-parent traversal is covered.unit_tests/sources/declarative/test_concurrent_declarative_source.py (1)
5155-5218: Could we lock down duplicate stream membership behavior with one more case, wdyt?If a stream appears in multiple groups, the current mapping will silently overwrite with the last group. Adding an explicit test for that would make the intended precedence (or future validation expectation) clear.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@unit_tests/sources/declarative/test_concurrent_declarative_source.py` around lines 5155 - 5218, Add a new pytest.param to the existing parametrize block that covers duplicate stream membership: create a manifest where "stream_groups" contains "group_a" and "group_b" and both list a stream named "dup_stream" (type "DeclarativeStream"), and set the expected mapping to reflect the current precedence (last-wins) by mapping "dup_stream" to "group_b"; give the case an id like "duplicate_membership_last_wins" so the test documents and locks down the current behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@airbyte_cdk/sources/declarative/concurrent_declarative_source.py`:
- Around line 552-565: When iterating stream_groups, do not treat string refs as
opaque keys: resolve any string stream_ref (e.g., "#/definitions/..") to the
actual stream definition dict and read its "name" field (use the same
reference-resolution helper or definitions map used elsewhere) and then assign
stream_name_to_group[stream_name] = group_name; if a stream_name is already
present mapped to a different group, reject with a clear exception or error (no
silent last-write-wins). Update the logic that handles isinstance(stream_ref,
str) to perform resolution to a dict, extract the runtime "name", and
enforce/raise on conflicting group assignments for the same stream_name.
---
Duplicate comments:
In `@airbyte_cdk/sources/declarative/models/declarative_component_schema.py`:
- Line 2769: Update the description string for the use_cache field in
declarative_component_schema.py so it reflects that the default is False and
that enabling caching is opt-in; remove wording that implies disabling is
exceptional. Specifically, edit the description on the use_cache field to state
that caching is off by default, explain what enabling it does (repeated requests
to the same URL will be cached), note that parent streams may enable caching
automatically if applicable, and warn about potential duplicate records for
scroll-based pagination when caching is enabled.
---
Nitpick comments:
In `@airbyte_cdk/sources/declarative/declarative_component_schema.yaml`:
- Around line 85-93: The streams array currently allows empty or duplicate
entries; update the streams schema block (the "streams" property in
declarative_component_schema.yaml) to enforce minItems: 1 and uniqueItems: true
and tighten item validation (e.g., require string pattern matching JSON refs
like "^#/definitions/.+") so misconfigured or no-op stream lists are rejected
early.
- Around line 87-90: Update the "streams" description in
declarative_component_schema.yaml to explicitly document the accepted identifier
formats: a JSON Reference string (e.g., "#/definitions/my_stream") and inline
stream definitions (an object with the stream schema), and include a short
example for each format to reduce author confusion; target the "streams"
property description text so it lists both formats and shows one-line examples.
In `@unit_tests/sources/declarative/parsers/test_model_to_component_factory.py`:
- Around line 5217-5358: Rename the test/docstring to reflect it validates
factory behavior (post-injection) rather than end-to-end stream-groups and
tighten scope to only check factory-created streams; then add an assertion that
the parent stream object used by the child stream's SubstreamPartitionRouter
retains the injected block_simultaneous_read value. Specifically, in this test
after creating child_stream (DefaultStream) inspect the SubstreamPartitionRouter
instance (referenced by child_stream.partition_router or similar on the created
DefaultStream) and assert that its ParentStreamConfig / instantiated parent
stream (the entry corresponding to the "#/parent_stream" manifest) has
block_simultaneous_read == "issues_endpoint". Ensure the new assertion
references SubstreamPartitionRouter and ParentStreamConfig (or the actual
attribute names on child_stream.partition_router) so the factory’s nested-parent
traversal is covered.
In `@unit_tests/sources/declarative/test_concurrent_declarative_source.py`:
- Around line 5155-5218: Add a new pytest.param to the existing parametrize
block that covers duplicate stream membership: create a manifest where
"stream_groups" contains "group_a" and "group_b" and both list a stream named
"dup_stream" (type "DeclarativeStream"), and set the expected mapping to reflect
the current precedence (last-wins) by mapping "dup_stream" to "group_b"; give
the case an id like "duplicate_membership_last_wins" so the test documents and
locks down the current behavior.
ℹ️ Review info
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
airbyte_cdk/sources/declarative/concurrent_declarative_source.pyairbyte_cdk/sources/declarative/declarative_component_schema.yamlairbyte_cdk/sources/declarative/models/declarative_component_schema.pyairbyte_cdk/sources/declarative/parsers/model_to_component_factory.pyunit_tests/sources/declarative/parsers/test_model_to_component_factory.pyunit_tests/sources/declarative/test_concurrent_declarative_source.py
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
| for group_name, group_config in stream_groups.items(): | ||
| streams = group_config.get("streams", []) | ||
| for stream_ref in streams: | ||
| if isinstance(stream_ref, dict): | ||
| # After reference resolution, stream_ref is a full stream definition dict | ||
| stream_name = stream_ref.get("name", "") | ||
| if stream_name: | ||
| stream_name_to_group[stream_name] = group_name | ||
| elif isinstance(stream_ref, str): | ||
| # If not resolved (shouldn't happen normally), extract name from ref path | ||
| # e.g., "#/definitions/my_stream" -> "my_stream" | ||
| if stream_ref.startswith("#/definitions/"): | ||
| stream_name = stream_ref.split("/")[-1] | ||
| stream_name_to_group[stream_name] = group_name |
There was a problem hiding this comment.
Could we resolve string refs to actual stream name values and reject conflicting group assignments, wdyt?
Line 564 currently maps #/.../<definition_key> to <definition_key>, but downstream matching uses runtime stream name (Line 412). If definition keys differ from stream names, blocking is silently skipped. Also, duplicate stream membership across groups currently becomes last-write-wins.
💡 Suggested hardening diff
@@
def _build_stream_name_to_group(manifest: Mapping[str, Any]) -> Dict[str, str]:
@@
- for group_name, group_config in stream_groups.items():
+ def _resolve_ref_to_stream_name(stream_ref: str) -> Optional[str]:
+ if not stream_ref.startswith("#/"):
+ return None
+ node: Any = manifest
+ for part in stream_ref.removeprefix("#/").split("/"):
+ if not isinstance(node, Mapping) or part not in node:
+ return None
+ node = node[part]
+ if isinstance(node, Mapping):
+ resolved_name = node.get("name")
+ if isinstance(resolved_name, str) and resolved_name:
+ return resolved_name
+ return None
+
+ def _assign(stream_name: str, group_name: str) -> None:
+ existing_group = stream_name_to_group.get(stream_name)
+ if existing_group and existing_group != group_name:
+ raise ValueError(
+ f"Stream '{stream_name}' is assigned to multiple stream_groups: "
+ f"'{existing_group}' and '{group_name}'."
+ )
+ stream_name_to_group[stream_name] = group_name
+
+ for group_name, group_config in stream_groups.items():
streams = group_config.get("streams", [])
for stream_ref in streams:
if isinstance(stream_ref, dict):
# After reference resolution, stream_ref is a full stream definition dict
stream_name = stream_ref.get("name", "")
if stream_name:
- stream_name_to_group[stream_name] = group_name
+ _assign(stream_name, group_name)
elif isinstance(stream_ref, str):
# If not resolved (shouldn't happen normally), extract name from ref path
# e.g., "#/definitions/my_stream" -> "my_stream"
- if stream_ref.startswith("#/definitions/"):
- stream_name = stream_ref.split("/")[-1]
- stream_name_to_group[stream_name] = group_name
+ resolved_name = _resolve_ref_to_stream_name(stream_ref)
+ if resolved_name:
+ _assign(resolved_name, group_name)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@airbyte_cdk/sources/declarative/concurrent_declarative_source.py` around
lines 552 - 565, When iterating stream_groups, do not treat string refs as
opaque keys: resolve any string stream_ref (e.g., "#/definitions/..") to the
actual stream definition dict and read its "name" field (use the same
reference-resolution helper or definitions map used elsewhere) and then assign
stream_name_to_group[stream_name] = group_name; if a stream_name is already
present mapped to a different group, reject with a clear exception or error (no
silent last-write-wins). Update the logic that handles isinstance(stream_ref,
str) to perform resolution to a dict, extract the runtime "name", and
enforce/raise on conflicting group assignments for the same stream_name.
- Add stream_name_to_group parameter to ModelToComponentFactory.__init__() - Add set_stream_name_to_group() method for post-init configuration - Factory now looks up block_simultaneous_read from its own mapping - Remove config injection hack from ConcurrentDeclarativeSource.streams() - Update tests to use factory-based approach instead of extra fields Co-Authored-By: unknown <>
…oded dict - Test now defines stream_groups with references in the manifest YAML - Uses _build_stream_name_to_group() to derive the mapping from manifest - Removed test_set_stream_name_to_group (redundant with the manifest-based test) - Added ConcurrentDeclarativeSource import for _build_stream_name_to_group Co-Authored-By: unknown <>
Summary
Adds a
block_simultaneous_readfeature to prevent multiple streams from running concurrently when they share the same resource. The feature is configured via a top-levelstream_groupsstructure in the manifest (rather than per-stream properties).Interface
stream_groupsis a top-level manifest property (alongsidestreams,definitions,check, etc.)crm_objects), a list of stream$refreferences, and anactionBlockSimultaneousSyncsActionis the only action type for nowImplementation
Schema (
declarative_component_schema.yaml): Removed per-streamblock_simultaneous_readproperty fromDeclarativeStream. Added top-levelstream_groupswithStreamGroupandBlockSimultaneousSyncsActiondefinitions.Pydantic models (
declarative_component_schema.py): AddedBlockSimultaneousSyncsAction,StreamGroupclasses. Addedstream_groups: Optional[Dict[str, StreamGroup]]toDeclarativeSource1/DeclarativeSource2.ConcurrentDeclarativeSource: Added_build_stream_name_to_group()that parses the resolvedstream_groupsconfig and builds astream_name -> group_namemapping. Thestreams()method injectsblock_simultaneous_readinto individual stream configs before they reach the factory.ModelToComponentFactory: Usesgetattr(model, "block_simultaneous_read", "")to safely read the injected extra field from stream models.ConcurrentReadProcessor(core blocking logic): Unchanged. Still uses string-based group identifiers internally — streams with the same non-empty group name block each other, with deferral/retry and parent-child awareness.Blocking Behavior
First use case: source-intercom — prevents duplicate concurrent requests to the companies endpoint.
Resolves: https://github.com/airbytehq/oncall/issues/8346
Review & Testing Checklist for Human
stream_groupsreference resolution: AfterManifestReferenceResolverprocesses the manifest, stream refs instream_groups.streamsshould be resolved to full stream definition dicts. Confirm_build_stream_name_to_group()correctly extracts stream names from resolved dicts. Test with a real connector manifest (e.g. source-intercom).getattrfor extra fields in factory:ModelToComponentFactoryreadsblock_simultaneous_readviagetattr()from pydantic model extra fields (injected byConcurrentDeclarativeSource). Confirm pydantic'sExtra.allowonDeclarativeStreamactually exposes injected fields throughgetattr.stream_groupsusesadditionalPropertieswith$ref: "#/definitions/StreamGroup"pattern. Verify this validates correctly against real manifests and that the codegen-produced pydantic models match.stream_groupsconfig format to confirm end-to-end blocking behavior.Recommended test plan:
stream_groupsreferencing 2+ streams via$refConcurrentDeclarativeSourceand verify streams in the same group are read sequentiallyNotes
stream_groupscontinue to work unchanged.ConcurrentReadProcessorstill uses string-based group identifiers, so the blocking logic is unchanged.Summary by CodeRabbit
New Features
Bug Fixes
Tests