Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e8697e0
Add block_simultaneous_read to DefaultStream
tolik0 Dec 30, 2025
1b4a9f0
Change `block_simultaneous_read` to string
tolik0 Jan 6, 2026
ec8dd6f
Fix StreamFacade
tolik0 Jan 6, 2026
450d7cf
Fix NoneType error when all streams are blocked
tolik0 Jan 9, 2026
3e7de2f
Fix unit tests
tolik0 Jan 12, 2026
b7fa9a5
Auto-fix lint and format issues
Jan 12, 2026
314ded1
Add retry deferred streams on stream completion
tolik0 Jan 13, 2026
80d8b2b
Fix unit tests
tolik0 Jan 13, 2026
8c06ce6
More fixes for unit tests
tolik0 Jan 13, 2026
49b0174
refactor: replace per-stream block_simultaneous_read with top-level s…
devin-ai-integration[bot] Feb 25, 2026
219f7df
refactor: move stream_name_to_group into ModelToComponentFactory
devin-ai-integration[bot] Feb 25, 2026
0390f4e
refactor: use stream_groups manifest in factory test instead of hardc…
devin-ai-integration[bot] Feb 25, 2026
cd55bfd
fix: only include parent stream in stream_groups to avoid deadlock
devin-ai-integration[bot] Feb 26, 2026
5066ec7
style: fix ruff format for long line
devin-ai-integration[bot] Feb 26, 2026
61562c4
refactor: move _build_stream_name_to_group into ModelToComponentFactory
devin-ai-integration[bot] Feb 27, 2026
ed82738
refactor: resolve stream_groups from actual stream instances instead …
devin-ai-integration[bot] Mar 3, 2026
7ba206f
Fix stream format in schema
tolik0 Mar 4, 2026
d09ee9b
refactor: add get_partition_router() helper to DefaultStream
devin-ai-integration[bot] Mar 4, 2026
94c4b82
feat: validate no parent-child streams share a group to prevent deadlock
devin-ai-integration[bot] Mar 4, 2026
0874f12
feat: assert partition generation queue is empty when all streams are…
devin-ai-integration[bot] Mar 4, 2026
c868fdb
refactor: move inline imports to module level in default_stream.py an…
devin-ai-integration[bot] Mar 4, 2026
1fffc69
fix: unwrap GroupingPartitionRouter in get_partition_router() to dete…
devin-ai-integration[bot] Mar 4, 2026
5911051
fix: handle GroupingPartitionRouter at call sites instead of in get_p…
devin-ai-integration[bot] Mar 4, 2026
d01ee31
feat: check active_groups is empty in is_done() safety check
devin-ai-integration[bot] Mar 4, 2026
9029049
test: add missing unit tests for GroupingPartitionRouter, active_grou…
devin-ai-integration[bot] Mar 4, 2026
756a966
fix: make deadlock validation check all ancestors, not just direct pa…
devin-ai-integration[bot] Mar 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 218 additions & 14 deletions airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Large diffs are not rendered by default.

71 changes: 70 additions & 1 deletion airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,19 @@
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
ModelToComponentFactory,
)
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
GroupingPartitionRouter,
)
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
SubstreamPartitionRouter,
)
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.spec.spec import Spec
from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition
from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem
from airbyte_cdk.sources.utils.slice_logger import (
AlwaysLogSliceLogger,
Expand Down Expand Up @@ -405,6 +412,8 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
if api_budget_model:
self._constructor.set_api_budget(api_budget_model, self._config)

prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs))

source_streams = [
self._constructor.create_component(
(
Expand All @@ -416,10 +425,70 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
self._config,
emit_connector_builder_messages=self._emit_connector_builder_messages,
)
for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
for stream_config in prepared_configs
]

self._apply_stream_groups(source_streams)

return source_streams

def _apply_stream_groups(self, streams: List[AbstractStream]) -> None:
"""Set block_simultaneous_read on streams based on the manifest's stream_groups config.
Iterates over the resolved manifest's stream_groups and matches group membership
against actual created stream instances by name. Validates that no stream shares a
group with any of its parent streams, which would cause a deadlock.
"""
stream_groups = self._source_config.get("stream_groups", {})
if not stream_groups:
return

# Build stream_name -> group_name mapping from the resolved manifest
stream_name_to_group: Dict[str, str] = {}
for group_name, group_config in stream_groups.items():
for stream_ref in group_config.get("streams", []):
if isinstance(stream_ref, dict):
stream_name = stream_ref.get("name", "")
if stream_name:
stream_name_to_group[stream_name] = group_name

# Validate no stream shares a group with any of its ancestor streams
stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams}

def _collect_all_ancestor_names(stream_name: str) -> Set[str]:
"""Recursively collect all ancestor stream names."""
ancestors: Set[str] = set()
inst = stream_name_to_instance.get(stream_name)
if not isinstance(inst, DefaultStream):
return ancestors
router = inst.get_partition_router()
if isinstance(router, GroupingPartitionRouter):
router = router.underlying_partition_router
if not isinstance(router, SubstreamPartitionRouter):
return ancestors
for parent_config in router.parent_stream_configs:
parent_name = parent_config.stream.name
ancestors.add(parent_name)
ancestors.update(_collect_all_ancestor_names(parent_name))
return ancestors

for stream in streams:
if not isinstance(stream, DefaultStream) or stream.name not in stream_name_to_group:
continue
group_name = stream_name_to_group[stream.name]
for ancestor_name in _collect_all_ancestor_names(stream.name):
if stream_name_to_group.get(ancestor_name) == group_name:
raise ValueError(
f"Stream '{stream.name}' and its parent stream '{ancestor_name}' "
f"are both in group '{group_name}'. "
f"A child stream must not share a group with its parent to avoid deadlock."
)

# Apply group to matching stream instances
for stream in streams:
if isinstance(stream, DefaultStream) and stream.name in stream_name_to_group:
stream.block_simultaneous_read = stream_name_to_group[stream.name]

@staticmethod
def _initialize_cache_for_parent_streams(
stream_configs: List[Dict[str, Any]],
Expand Down
46 changes: 46 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ properties:
"$ref": "#/definitions/ConcurrencyLevel"
api_budget:
"$ref": "#/definitions/HTTPAPIBudget"
stream_groups:
title: Stream Groups
description: >
Groups of streams that share a common resource and should not be read simultaneously.
Each group defines a set of stream references and an action that controls how concurrent
reads are managed. Only applies to ConcurrentDeclarativeSource.
type: object
additionalProperties:
"$ref": "#/definitions/StreamGroup"
max_concurrent_async_job_count:
title: Maximum Concurrent Asynchronous Jobs
description: Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.
Expand All @@ -63,6 +72,43 @@ properties:
description: A description of the connector. It will be presented on the Source documentation page.
additionalProperties: false
definitions:
StreamGroup:
title: Stream Group
description: >
A group of streams that share a common resource and should not be read simultaneously.
Streams in the same group will be blocked from concurrent reads based on the specified action.
type: object
required:
- streams
- action
properties:
streams:
title: Streams
description: >
List of references to streams that belong to this group.
type: array
items:
anyOf:
- "$ref": "#/definitions/DeclarativeStream"
action:
title: Action
description: The action to apply to streams in this group.
"$ref": "#/definitions/BlockSimultaneousSyncsAction"
BlockSimultaneousSyncsAction:
title: Block Simultaneous Syncs Action
description: >
Action that prevents streams in the same group from being read concurrently.
When applied to a stream group, streams with this action will be deferred if
another stream in the same group is currently active.
This is useful for APIs that don't allow concurrent access to the same
endpoint or session. Only applies to ConcurrentDeclarativeSource.
type: object
required:
- type
properties:
type:
type: string
enum: [BlockSimultaneousSyncsAction]
AddedFieldDefinition:
title: Definition Of Field To Add
description: Defines the field to add on a record.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -2307,6 +2305,23 @@ class Config:
)


class BlockSimultaneousSyncsAction(BaseModel):
type: Literal["BlockSimultaneousSyncsAction"]


class StreamGroup(BaseModel):
streams: List[str] = Field(
...,
description='List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream").',
title="Streams",
)
action: BlockSimultaneousSyncsAction = Field(
...,
description="The action to apply to streams in this group.",
title="Action",
)


class Spec(BaseModel):
type: Literal["Spec"]
connection_specification: Dict[str, Any] = Field(
Expand Down Expand Up @@ -2347,6 +2362,11 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
stream_groups: Optional[Dict[str, StreamGroup]] = Field(
None,
description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.",
title="Stream Groups",
)
max_concurrent_async_job_count: Optional[Union[int, str]] = Field(
None,
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
Expand Down Expand Up @@ -2382,6 +2402,11 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
stream_groups: Optional[Dict[str, StreamGroup]] = Field(
None,
description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.",
title="Stream Groups",
)
max_concurrent_async_job_count: Optional[Union[int, str]] = Field(
None,
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
Expand Down Expand Up @@ -2741,7 +2766,7 @@ class HttpRequester(BaseModelWithDeprecations):
)
use_cache: Optional[bool] = Field(
False,
description="Enables stream requests caching. This field is automatically set by the CDK.",
description="Enables stream requests caching. When set to true, repeated requests to the same URL will return cached responses. Parent streams automatically have caching enabled. Only set this to false if you are certain that caching should be disabled, as it may negatively impact performance when the same data is needed multiple times (e.g., for scroll-based pagination APIs where caching causes duplicate records).",
title="Use Cache",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
Expand Down
15 changes: 15 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/abstract_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ def cursor(self) -> Cursor:
:return: The cursor associated with this stream.
"""

@property
def block_simultaneous_read(self) -> str:
"""
Override to return a non-empty group name if this stream should block simultaneous reads.
When a non-empty string is returned, prevents starting partition generation for this stream if:
- Another stream with the same group name is already active
- Any of its parent streams are in an active group
This allows grouping multiple streams that share the same resource (e.g., API endpoint or session)
to prevent them from running concurrently, even if they don't have a parent-child relationship.
:return: Group name for blocking (non-empty string), or "" to allow concurrent reading
"""
return "" # Default: allow concurrent reading

@abstractmethod
def check_availability(self) -> StreamAvailability:
"""
Expand Down
5 changes: 5 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ def cursor_field(self) -> Union[str, List[str]]:
def cursor(self) -> Optional[Cursor]: # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor
return self._cursor

@property
def block_simultaneous_read(self) -> str:
"""Returns the blocking group name from the underlying stream"""
return self._abstract_stream.block_simultaneous_read

# FIXME the lru_cache seems to be mostly there because of typing issue
@lru_cache(maxsize=None)
def get_json_schema(self) -> Mapping[str, Any]:
Expand Down
27 changes: 27 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/default_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
from typing import Any, Callable, Iterable, List, Mapping, Optional, Union

from airbyte_cdk.models import AirbyteStream, SyncMode
from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import (
ConcurrentPerPartitionCursor,
)
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
StreamSlicerPartitionGenerator,
)
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField
Expand All @@ -26,6 +33,7 @@ def __init__(
cursor: Cursor,
namespace: Optional[str] = None,
supports_file_transfer: bool = False,
block_simultaneous_read: str = "",
) -> None:
self._stream_partition_generator = partition_generator
self._name = name
Expand All @@ -36,6 +44,7 @@ def __init__(
self._cursor = cursor
self._namespace = namespace
self._supports_file_transfer = supports_file_transfer
self._block_simultaneous_read = block_simultaneous_read

def generate_partitions(self) -> Iterable[Partition]:
yield from self._stream_partition_generator.generate()
Expand Down Expand Up @@ -94,6 +103,24 @@ def log_stream_sync_configuration(self) -> None:
def cursor(self) -> Cursor:
return self._cursor

@property
def block_simultaneous_read(self) -> str:
"""Returns the blocking group name for this stream, or empty string if no blocking"""
return self._block_simultaneous_read

@block_simultaneous_read.setter
def block_simultaneous_read(self, value: str) -> None:
self._block_simultaneous_read = value

def get_partition_router(self) -> PartitionRouter | None:
"""Return the partition router for this stream, or None if not available."""
if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator):
return None
stream_slicer = self._stream_partition_generator._stream_slicer
if not isinstance(stream_slicer, ConcurrentPerPartitionCursor):
return None
return stream_slicer._partition_router

def check_availability(self) -> StreamAvailability:
"""
Check stream availability by attempting to read the first record of the stream.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5214,6 +5214,62 @@ def test_catalog_defined_cursor_field_stream_missing():
assert stream._cursor_field.supports_catalog_defined_cursor_field == True


def test_block_simultaneous_read_from_stream_groups():
"""Test that factory-created streams default to empty block_simultaneous_read.
The factory no longer handles stream_groups — that's done by
ConcurrentDeclarativeSource._apply_stream_groups after stream creation.
This test verifies the factory creates streams without group info.
"""
content = """
definitions:
parent_stream:
type: DeclarativeStream
name: "parent"
primary_key: "id"
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: "https://api.example.com"
path: "/parent"
http_method: "GET"
authenticator:
type: BearerAuthenticator
api_token: "{{ config['api_key'] }}"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: []
schema_loader:
type: InlineSchemaLoader
schema:
type: object
properties:
id:
type: string
"""

config = {"api_key": "test_key"}

parsed_manifest = YamlDeclarativeSource._parse(content)
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)

factory = ModelToComponentFactory()

parent_manifest = transformer.propagate_types_and_parameters(
"", resolved_manifest["definitions"]["parent_stream"], {}
)
parent_stream: DefaultStream = factory.create_component(
model_type=DeclarativeStreamModel, component_definition=parent_manifest, config=config
)

assert isinstance(parent_stream, DefaultStream)
assert parent_stream.name == "parent"
assert parent_stream.block_simultaneous_read == ""


def get_schema_loader(stream: DefaultStream):
assert isinstance(
stream._stream_partition_generator._partition_factory._schema_loader,
Expand Down
Loading