diff --git a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py index 905999a4d..cc39f888f 100644 --- a/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py +++ b/airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py @@ -13,8 +13,15 @@ ) from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager +from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import ( + GroupingPartitionRouter, +) +from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( + SubstreamPartitionRouter, +) from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream +from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition @@ -66,17 +73,61 @@ def __init__( self._streams_done: Set[str] = set() self._exceptions_per_stream_name: dict[str, List[Exception]] = {} + # Track which streams (by name) are currently active + # A stream is "active" if it's generating partitions or has partitions being read + self._active_stream_names: Set[str] = set() + + # Store blocking group names for streams that require blocking simultaneous reads + # Maps stream name -> group name (empty string means no blocking) + self._stream_block_simultaneous_read: Dict[str, str] = { + stream.name: stream.block_simultaneous_read for stream in stream_instances_to_read_from + } + + # Track which groups are currently active + # Maps group name -> set of stream names in that group + self._active_groups: Dict[str, Set[str]] = {} + + for stream in stream_instances_to_read_from: + if stream.block_simultaneous_read: + self._logger.info( + f"Stream '{stream.name}' is in blocking group '{stream.block_simultaneous_read}'. " + f"Will defer starting this stream if another stream in the same group or its parents are active." + ) + def on_partition_generation_completed( self, sentinel: PartitionGenerationCompletedSentinel ) -> Iterable[AirbyteMessage]: """ This method is called when a partition generation is completed. 1. Remove the stream from the list of streams currently generating partitions - 2. If the stream is done, mark it as such and return a stream status message - 3. If there are more streams to read from, start the next partition generator + 2. Deactivate parent streams (they were only needed for partition generation) + 3. If the stream is done, mark it as such and return a stream status message + 4. If there are more streams to read from, start the next partition generator """ stream_name = sentinel.stream.name self._streams_currently_generating_partitions.remove(sentinel.stream.name) + + # Deactivate all parent streams now that partition generation is complete + # Parents were only needed to generate slices, they can now be reused + parent_streams = self._collect_all_parent_stream_names(stream_name) + for parent_stream_name in parent_streams: + if parent_stream_name in self._active_stream_names: + self._logger.debug(f"Removing '{parent_stream_name}' from active streams") + self._active_stream_names.discard(parent_stream_name) + + # Remove from active groups + parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") + if parent_group: + if parent_group in self._active_groups: + self._active_groups[parent_group].discard(parent_stream_name) + if not self._active_groups[parent_group]: + del self._active_groups[parent_group] + self._logger.info( + f"Parent stream '{parent_stream_name}' (group '{parent_group}') deactivated after " + f"partition generation completed for child '{stream_name}'. " + f"Blocked streams in the queue will be retried on next start_next_partition_generator call." + ) + # It is possible for the stream to already be done if no partitions were generated # If the partition generation process was completed and there are no partitions left to process, the stream is done if ( @@ -85,7 +136,9 @@ def on_partition_generation_completed( ): yield from self._on_stream_is_done(stream_name) if self._stream_instances_to_start_partition_generation: - yield self.start_next_partition_generator() # type:ignore # None may be yielded + status_message = self.start_next_partition_generator() + if status_message: + yield status_message def on_partition(self, partition: Partition) -> None: """ @@ -113,6 +166,7 @@ def on_partition_complete_sentinel( 1. Close the partition 2. If the stream is done, mark it as such and return a stream status message 3. Emit messages that were added to the message repository + 4. If there are more streams to read from, start the next partition generator """ partition = sentinel.partition @@ -125,6 +179,11 @@ def on_partition_complete_sentinel( and len(partitions_running) == 0 ): yield from self._on_stream_is_done(partition.stream_name()) + # Try to start the next stream in the queue (may be a deferred stream) + if self._stream_instances_to_start_partition_generation: + status_message = self.start_next_partition_generator() + if status_message: + yield status_message yield from self._message_repository.consume_queue() def on_record(self, record: Record) -> Iterable[AirbyteMessage]: @@ -181,24 +240,112 @@ def _flag_exception(self, stream_name: str, exception: Exception) -> None: def start_next_partition_generator(self) -> Optional[AirbyteMessage]: """ - Start the next partition generator. - 1. Pop the next stream to read from - 2. Submit the partition generator to the thread pool manager - 3. Add the stream to the list of streams currently generating partitions - 4. Return a stream status message + Submits the next partition generator to the thread pool. + + A stream will be deferred (moved to end of queue) if: + 1. The stream itself has block_simultaneous_read=True AND is already active + 2. Any parent stream has block_simultaneous_read=True AND is currently active + + This prevents simultaneous reads of streams that shouldn't be accessed concurrently. + + :return: A status message if a partition generator was started, otherwise None """ - if self._stream_instances_to_start_partition_generation: + if not self._stream_instances_to_start_partition_generation: + return None + + # Remember initial queue size to avoid infinite loops if all streams are blocked + max_attempts = len(self._stream_instances_to_start_partition_generation) + attempts = 0 + + while self._stream_instances_to_start_partition_generation and attempts < max_attempts: + attempts += 1 + + # Pop the first stream from the queue stream = self._stream_instances_to_start_partition_generation.pop(0) + stream_name = stream.name + stream_group = self._stream_block_simultaneous_read.get(stream_name, "") + + # Check if this stream has a blocking group and is already active as parent stream + # (i.e. being read from during partition generation for another stream) + if stream_group and stream_name in self._active_stream_names: + # Add back to the END of the queue for retry later + self._stream_instances_to_start_partition_generation.append(stream) + self._logger.info( + f"Deferring stream '{stream_name}' (group '{stream_group}') because it's already active. Trying next stream." + ) + continue # Try the next stream in the queue + + # Check if this stream's group is already active (another stream in the same group is running) + if ( + stream_group + and stream_group in self._active_groups + and self._active_groups[stream_group] + ): + # Add back to the END of the queue for retry later + self._stream_instances_to_start_partition_generation.append(stream) + active_streams_in_group = self._active_groups[stream_group] + self._logger.info( + f"Deferring stream '{stream_name}' (group '{stream_group}') because other stream(s) " + f"{active_streams_in_group} in the same group are active. Trying next stream." + ) + continue # Try the next stream in the queue + + # Check if any parent streams have a blocking group and are currently active + parent_streams = self._collect_all_parent_stream_names(stream_name) + blocked_by_parents = [ + p + for p in parent_streams + if self._stream_block_simultaneous_read.get(p, "") + and p in self._active_stream_names + ] + + if blocked_by_parents: + # Add back to the END of the queue for retry later + self._stream_instances_to_start_partition_generation.append(stream) + parent_groups = { + self._stream_block_simultaneous_read.get(p, "") for p in blocked_by_parents + } + self._logger.info( + f"Deferring stream '{stream_name}' because parent stream(s) " + f"{blocked_by_parents} (groups {parent_groups}) are active. Trying next stream." + ) + continue # Try the next stream in the queue + + # No blocking - start this stream + # Mark stream as active before starting + self._active_stream_names.add(stream_name) + self._streams_currently_generating_partitions.append(stream_name) + + # Track this stream in its group if it has one + if stream_group: + if stream_group not in self._active_groups: + self._active_groups[stream_group] = set() + self._active_groups[stream_group].add(stream_name) + self._logger.debug(f"Added '{stream_name}' to active group '{stream_group}'") + + # Also mark all parent streams as active (they will be read from during partition generation) + for parent_stream_name in parent_streams: + parent_group = self._stream_block_simultaneous_read.get(parent_stream_name, "") + if parent_group: + self._active_stream_names.add(parent_stream_name) + if parent_group not in self._active_groups: + self._active_groups[parent_group] = set() + self._active_groups[parent_group].add(parent_stream_name) + self._logger.info( + f"Marking parent stream '{parent_stream_name}' (group '{parent_group}') as active " + f"(will be read during partition generation for '{stream_name}')" + ) + self._thread_pool_manager.submit(self._partition_enqueuer.generate_partitions, stream) - self._streams_currently_generating_partitions.append(stream.name) - self._logger.info(f"Marking stream {stream.name} as STARTED") - self._logger.info(f"Syncing stream: {stream.name} ") + self._logger.info(f"Marking stream {stream_name} as STARTED") + self._logger.info(f"Syncing stream: {stream_name}") return stream_status_as_airbyte_message( stream.as_airbyte_stream(), AirbyteStreamStatus.STARTED, ) - else: - return None + + # All streams in the queue are currently blocked + return None def is_done(self) -> bool: """ @@ -214,6 +361,21 @@ def is_done(self) -> bool: for stream_name in self._stream_name_to_instance.keys() ] ) + if is_done and self._stream_instances_to_start_partition_generation: + stuck_stream_names = [ + s.name for s in self._stream_instances_to_start_partition_generation + ] + raise AirbyteTracedException( + message="Partition generation queue is not empty after all streams completed.", + internal_message=f"Streams {stuck_stream_names} remained in the partition generation queue after all streams were marked done.", + failure_type=FailureType.system_error, + ) + if is_done and self._active_groups: + raise AirbyteTracedException( + message="Active stream groups are not empty after all streams completed.", + internal_message=f"Groups {dict(self._active_groups)} still active after all streams were marked done.", + failure_type=FailureType.system_error, + ) if is_done and self._exceptions_per_stream_name: error_message = generate_failed_streams_error_message(self._exceptions_per_stream_name) self._logger.info(error_message) @@ -230,6 +392,32 @@ def is_done(self) -> bool: def _is_stream_done(self, stream_name: str) -> bool: return stream_name in self._streams_done + def _collect_all_parent_stream_names(self, stream_name: str) -> Set[str]: + """Recursively collect all parent stream names for a given stream. + + For example, if we have: epics -> issues -> comments + Then for comments, this returns {issues, epics}. + """ + parent_names: Set[str] = set() + stream = self._stream_name_to_instance.get(stream_name) + + if not stream: + return parent_names + + partition_router = ( + stream.get_partition_router() if isinstance(stream, DefaultStream) else None + ) + if isinstance(partition_router, GroupingPartitionRouter): + partition_router = partition_router.underlying_partition_router + + if isinstance(partition_router, SubstreamPartitionRouter): + for parent_config in partition_router.parent_stream_configs: + parent_name = parent_config.stream.name + parent_names.add(parent_name) + parent_names.update(self._collect_all_parent_stream_names(parent_name)) + + return parent_names + def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: self._logger.info( f"Read {self._record_counter[stream_name]} records from {stream_name} stream" @@ -246,3 +434,19 @@ def _on_stream_is_done(self, stream_name: str) -> Iterable[AirbyteMessage]: else AirbyteStreamStatus.COMPLETE ) yield stream_status_as_airbyte_message(stream.as_airbyte_stream(), stream_status) + + # Remove only this stream from active set (NOT parents) + if stream_name in self._active_stream_names: + self._active_stream_names.discard(stream_name) + + # Remove from active groups + stream_group = self._stream_block_simultaneous_read.get(stream_name, "") + if stream_group: + if stream_group in self._active_groups: + self._active_groups[stream_group].discard(stream_name) + if not self._active_groups[stream_group]: + del self._active_groups[stream_group] + self._logger.info( + f"Stream '{stream_name}' (group '{stream_group}') is no longer active. " + f"Blocked streams in the queue will be retried on next start_next_partition_generator call." + ) diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 45fe6aa2d..292615692 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -76,12 +76,19 @@ from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( ModelToComponentFactory, ) +from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import ( + GroupingPartitionRouter, +) +from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( + SubstreamPartitionRouter, +) from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING from airbyte_cdk.sources.declarative.spec.spec import Spec from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository from airbyte_cdk.sources.message.repository import InMemoryMessageRepository from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream +from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem from airbyte_cdk.sources.utils.slice_logger import ( AlwaysLogSliceLogger, @@ -405,6 +412,8 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i if api_budget_model: self._constructor.set_api_budget(api_budget_model, self._config) + prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) + source_streams = [ self._constructor.create_component( ( @@ -416,10 +425,70 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i self._config, emit_connector_builder_messages=self._emit_connector_builder_messages, ) - for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) + for stream_config in prepared_configs ] + + self._apply_stream_groups(source_streams) + return source_streams + def _apply_stream_groups(self, streams: List[AbstractStream]) -> None: + """Set block_simultaneous_read on streams based on the manifest's stream_groups config. + + Iterates over the resolved manifest's stream_groups and matches group membership + against actual created stream instances by name. Validates that no stream shares a + group with any of its parent streams, which would cause a deadlock. + """ + stream_groups = self._source_config.get("stream_groups", {}) + if not stream_groups: + return + + # Build stream_name -> group_name mapping from the resolved manifest + stream_name_to_group: Dict[str, str] = {} + for group_name, group_config in stream_groups.items(): + for stream_ref in group_config.get("streams", []): + if isinstance(stream_ref, dict): + stream_name = stream_ref.get("name", "") + if stream_name: + stream_name_to_group[stream_name] = group_name + + # Validate no stream shares a group with any of its ancestor streams + stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams} + + def _collect_all_ancestor_names(stream_name: str) -> Set[str]: + """Recursively collect all ancestor stream names.""" + ancestors: Set[str] = set() + inst = stream_name_to_instance.get(stream_name) + if not isinstance(inst, DefaultStream): + return ancestors + router = inst.get_partition_router() + if isinstance(router, GroupingPartitionRouter): + router = router.underlying_partition_router + if not isinstance(router, SubstreamPartitionRouter): + return ancestors + for parent_config in router.parent_stream_configs: + parent_name = parent_config.stream.name + ancestors.add(parent_name) + ancestors.update(_collect_all_ancestor_names(parent_name)) + return ancestors + + for stream in streams: + if not isinstance(stream, DefaultStream) or stream.name not in stream_name_to_group: + continue + group_name = stream_name_to_group[stream.name] + for ancestor_name in _collect_all_ancestor_names(stream.name): + if stream_name_to_group.get(ancestor_name) == group_name: + raise ValueError( + f"Stream '{stream.name}' and its parent stream '{ancestor_name}' " + f"are both in group '{group_name}'. " + f"A child stream must not share a group with its parent to avoid deadlock." + ) + + # Apply group to matching stream instances + for stream in streams: + if isinstance(stream, DefaultStream) and stream.name in stream_name_to_group: + stream.block_simultaneous_read = stream_name_to_group[stream.name] + @staticmethod def _initialize_cache_for_parent_streams( stream_configs: List[Dict[str, Any]], diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index e04a82c0d..1ef8b5fcf 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -45,6 +45,15 @@ properties: "$ref": "#/definitions/ConcurrencyLevel" api_budget: "$ref": "#/definitions/HTTPAPIBudget" + stream_groups: + title: Stream Groups + description: > + Groups of streams that share a common resource and should not be read simultaneously. + Each group defines a set of stream references and an action that controls how concurrent + reads are managed. Only applies to ConcurrentDeclarativeSource. + type: object + additionalProperties: + "$ref": "#/definitions/StreamGroup" max_concurrent_async_job_count: title: Maximum Concurrent Asynchronous Jobs description: Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information. @@ -63,6 +72,43 @@ properties: description: A description of the connector. It will be presented on the Source documentation page. additionalProperties: false definitions: + StreamGroup: + title: Stream Group + description: > + A group of streams that share a common resource and should not be read simultaneously. + Streams in the same group will be blocked from concurrent reads based on the specified action. + type: object + required: + - streams + - action + properties: + streams: + title: Streams + description: > + List of references to streams that belong to this group. + type: array + items: + anyOf: + - "$ref": "#/definitions/DeclarativeStream" + action: + title: Action + description: The action to apply to streams in this group. + "$ref": "#/definitions/BlockSimultaneousSyncsAction" + BlockSimultaneousSyncsAction: + title: Block Simultaneous Syncs Action + description: > + Action that prevents streams in the same group from being read concurrently. + When applied to a stream group, streams with this action will be deferred if + another stream in the same group is currently active. + This is useful for APIs that don't allow concurrent access to the same + endpoint or session. Only applies to ConcurrentDeclarativeSource. + type: object + required: + - type + properties: + type: + type: string + enum: [BlockSimultaneousSyncsAction] AddedFieldDefinition: title: Definition Of Field To Add description: Defines the field to add on a record. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index b78a07021..fabec77e5 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -2307,6 +2305,23 @@ class Config: ) +class BlockSimultaneousSyncsAction(BaseModel): + type: Literal["BlockSimultaneousSyncsAction"] + + +class StreamGroup(BaseModel): + streams: List[str] = Field( + ..., + description='List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream").', + title="Streams", + ) + action: BlockSimultaneousSyncsAction = Field( + ..., + description="The action to apply to streams in this group.", + title="Action", + ) + + class Spec(BaseModel): type: Literal["Spec"] connection_specification: Dict[str, Any] = Field( @@ -2347,6 +2362,11 @@ class Config: spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None api_budget: Optional[HTTPAPIBudget] = None + stream_groups: Optional[Dict[str, StreamGroup]] = Field( + None, + description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.", + title="Stream Groups", + ) max_concurrent_async_job_count: Optional[Union[int, str]] = Field( None, description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.", @@ -2382,6 +2402,11 @@ class Config: spec: Optional[Spec] = None concurrency_level: Optional[ConcurrencyLevel] = None api_budget: Optional[HTTPAPIBudget] = None + stream_groups: Optional[Dict[str, StreamGroup]] = Field( + None, + description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.", + title="Stream Groups", + ) max_concurrent_async_job_count: Optional[Union[int, str]] = Field( None, description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.", @@ -2741,7 +2766,7 @@ class HttpRequester(BaseModelWithDeprecations): ) use_cache: Optional[bool] = Field( False, - description="Enables stream requests caching. This field is automatically set by the CDK.", + description="Enables stream requests caching. When set to true, repeated requests to the same URL will return cached responses. Parent streams automatically have caching enabled. Only set this to false if you are certain that caching should be disabled, as it may negatively impact performance when the same data is needed multiple times (e.g., for scroll-based pagination APIs where caching causes duplicate records).", title="Use Cache", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py index 667d088ab..e7b24f614 100644 --- a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py @@ -85,6 +85,21 @@ def cursor(self) -> Cursor: :return: The cursor associated with this stream. """ + @property + def block_simultaneous_read(self) -> str: + """ + Override to return a non-empty group name if this stream should block simultaneous reads. + When a non-empty string is returned, prevents starting partition generation for this stream if: + - Another stream with the same group name is already active + - Any of its parent streams are in an active group + + This allows grouping multiple streams that share the same resource (e.g., API endpoint or session) + to prevent them from running concurrently, even if they don't have a parent-child relationship. + + :return: Group name for blocking (non-empty string), or "" to allow concurrent reading + """ + return "" # Default: allow concurrent reading + @abstractmethod def check_availability(self) -> StreamAvailability: """ diff --git a/airbyte_cdk/sources/streams/concurrent/adapters.py b/airbyte_cdk/sources/streams/concurrent/adapters.py index 41674bdae..a9bc47e0d 100644 --- a/airbyte_cdk/sources/streams/concurrent/adapters.py +++ b/airbyte_cdk/sources/streams/concurrent/adapters.py @@ -196,6 +196,11 @@ def cursor_field(self) -> Union[str, List[str]]: def cursor(self) -> Optional[Cursor]: # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor return self._cursor + @property + def block_simultaneous_read(self) -> str: + """Returns the blocking group name from the underlying stream""" + return self._abstract_stream.block_simultaneous_read + # FIXME the lru_cache seems to be mostly there because of typing issue @lru_cache(maxsize=None) def get_json_schema(self) -> Mapping[str, Any]: diff --git a/airbyte_cdk/sources/streams/concurrent/default_stream.py b/airbyte_cdk/sources/streams/concurrent/default_stream.py index f5d4ccf2e..6cc6e44d4 100644 --- a/airbyte_cdk/sources/streams/concurrent/default_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/default_stream.py @@ -6,6 +6,13 @@ from typing import Any, Callable, Iterable, List, Mapping, Optional, Union from airbyte_cdk.models import AirbyteStream, SyncMode +from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import ( + ConcurrentPerPartitionCursor, +) +from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter +from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( + StreamSlicerPartitionGenerator, +) from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField @@ -26,6 +33,7 @@ def __init__( cursor: Cursor, namespace: Optional[str] = None, supports_file_transfer: bool = False, + block_simultaneous_read: str = "", ) -> None: self._stream_partition_generator = partition_generator self._name = name @@ -36,6 +44,7 @@ def __init__( self._cursor = cursor self._namespace = namespace self._supports_file_transfer = supports_file_transfer + self._block_simultaneous_read = block_simultaneous_read def generate_partitions(self) -> Iterable[Partition]: yield from self._stream_partition_generator.generate() @@ -94,6 +103,24 @@ def log_stream_sync_configuration(self) -> None: def cursor(self) -> Cursor: return self._cursor + @property + def block_simultaneous_read(self) -> str: + """Returns the blocking group name for this stream, or empty string if no blocking""" + return self._block_simultaneous_read + + @block_simultaneous_read.setter + def block_simultaneous_read(self, value: str) -> None: + self._block_simultaneous_read = value + + def get_partition_router(self) -> PartitionRouter | None: + """Return the partition router for this stream, or None if not available.""" + if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator): + return None + stream_slicer = self._stream_partition_generator._stream_slicer + if not isinstance(stream_slicer, ConcurrentPerPartitionCursor): + return None + return stream_slicer._partition_router + def check_availability(self) -> StreamAvailability: """ Check stream availability by attempting to read the first record of the stream. diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index dcdc2bcff..7b7763cdb 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -5214,6 +5214,62 @@ def test_catalog_defined_cursor_field_stream_missing(): assert stream._cursor_field.supports_catalog_defined_cursor_field == True +def test_block_simultaneous_read_from_stream_groups(): + """Test that factory-created streams default to empty block_simultaneous_read. + + The factory no longer handles stream_groups — that's done by + ConcurrentDeclarativeSource._apply_stream_groups after stream creation. + This test verifies the factory creates streams without group info. + """ + content = """ + definitions: + parent_stream: + type: DeclarativeStream + name: "parent" + primary_key: "id" + retriever: + type: SimpleRetriever + requester: + type: HttpRequester + url_base: "https://api.example.com" + path: "/parent" + http_method: "GET" + authenticator: + type: BearerAuthenticator + api_token: "{{ config['api_key'] }}" + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + schema_loader: + type: InlineSchemaLoader + schema: + type: object + properties: + id: + type: string + """ + + config = {"api_key": "test_key"} + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + + factory = ModelToComponentFactory() + + parent_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["definitions"]["parent_stream"], {} + ) + parent_stream: DefaultStream = factory.create_component( + model_type=DeclarativeStreamModel, component_definition=parent_manifest, config=config + ) + + assert isinstance(parent_stream, DefaultStream) + assert parent_stream.name == "parent" + assert parent_stream.block_simultaneous_read == "" + + def get_schema_loader(stream: DefaultStream): assert isinstance( stream._stream_partition_generator._partition_factory._schema_loader, diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index bde6c35b1..bf1f61610 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -61,6 +61,7 @@ from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( StreamSlicerPartitionGenerator, ) +from airbyte_cdk.sources.message.repository import InMemoryMessageRepository from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.checkpoint import Cursor from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor @@ -5150,3 +5151,370 @@ def test_given_record_selector_is_filtering_when_read_then_raise_error(): with pytest.raises(ValueError): list(source.read(logger=source.logger, config=input_config, catalog=catalog, state=[])) + + +def _make_default_stream(name: str) -> DefaultStream: + """Create a minimal DefaultStream instance for testing.""" + from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor + + cursor = FinalStateCursor( + stream_name=name, stream_namespace=None, message_repository=InMemoryMessageRepository() + ) + return DefaultStream( + partition_generator=Mock(), + name=name, + json_schema={}, + primary_key=[], + cursor_field=None, + logger=logging.getLogger(f"test.{name}"), + cursor=cursor, + ) + + +def _make_child_stream_with_parent(child_name: str, parent_stream: DefaultStream) -> DefaultStream: + """Create a DefaultStream that has a SubstreamPartitionRouter pointing to parent_stream.""" + from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import ( + ConcurrentCursorFactory, + ConcurrentPerPartitionCursor, + ) + from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( + ParentStreamConfig, + SubstreamPartitionRouter, + ) + from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( + DeclarativePartitionFactory, + StreamSlicerPartitionGenerator, + ) + from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor + from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( + EpochValueConcurrentStreamStateConverter, + ) + + partition_router = SubstreamPartitionRouter( + parent_stream_configs=[ + ParentStreamConfig( + stream=parent_stream, + parent_key="id", + partition_field="parent_id", + config={}, + parameters={}, + ) + ], + config={}, + parameters={}, + ) + + cursor_factory = ConcurrentCursorFactory(lambda *args, **kwargs: Mock()) + message_repository = InMemoryMessageRepository() + state_converter = EpochValueConcurrentStreamStateConverter() + + per_partition_cursor = ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory, + partition_router=partition_router, + stream_name=child_name, + stream_namespace=None, + stream_state={}, + message_repository=message_repository, + connector_state_manager=Mock(), + connector_state_converter=state_converter, + cursor_field=Mock(cursor_field_key="updated_at"), + ) + + partition_factory = Mock(spec=DeclarativePartitionFactory) + partition_generator = StreamSlicerPartitionGenerator( + partition_factory=partition_factory, + stream_slicer=per_partition_cursor, + ) + + cursor = FinalStateCursor( + stream_name=child_name, stream_namespace=None, message_repository=message_repository + ) + return DefaultStream( + partition_generator=partition_generator, + name=child_name, + json_schema={}, + primary_key=[], + cursor_field=None, + logger=logging.getLogger(f"test.{child_name}"), + cursor=cursor, + ) + + +@pytest.mark.parametrize( + "source_config,stream_names,expected_groups", + [ + pytest.param( + {}, + ["my_stream"], + {"my_stream": ""}, + id="no_stream_groups", + ), + pytest.param( + {"stream_groups": {}}, + ["my_stream"], + {"my_stream": ""}, + id="empty_stream_groups", + ), + pytest.param( + { + "stream_groups": { + "crm_objects": { + "streams": [ + {"name": "deals", "type": "DeclarativeStream"}, + {"name": "companies", "type": "DeclarativeStream"}, + ], + "action": {"type": "BlockSimultaneousSyncsAction"}, + } + } + }, + ["deals", "companies", "no_group"], + {"deals": "crm_objects", "companies": "crm_objects", "no_group": ""}, + id="single_group_with_unmatched_stream", + ), + pytest.param( + { + "stream_groups": { + "group_a": { + "streams": [{"name": "stream1", "type": "DeclarativeStream"}], + "action": {"type": "BlockSimultaneousSyncsAction"}, + }, + "group_b": { + "streams": [ + {"name": "stream2", "type": "DeclarativeStream"}, + {"name": "stream3", "type": "DeclarativeStream"}, + ], + "action": {"type": "BlockSimultaneousSyncsAction"}, + }, + } + }, + ["stream1", "stream2", "stream3"], + {"stream1": "group_a", "stream2": "group_b", "stream3": "group_b"}, + id="multiple_groups", + ), + ], +) +def test_apply_stream_groups(source_config, stream_names, expected_groups): + """Test _apply_stream_groups sets block_simultaneous_read on matching stream instances.""" + streams = [_make_default_stream(name) for name in stream_names] + + source = Mock() + source._source_config = source_config + + ConcurrentDeclarativeSource._apply_stream_groups(source, streams) + + for stream in streams: + assert stream.block_simultaneous_read == expected_groups[stream.name] + + +def test_apply_stream_groups_raises_on_parent_child_in_same_group(): + """Test _apply_stream_groups raises ValueError when a child and its parent are in the same group.""" + parent = _make_default_stream("parent_stream") + child = _make_child_stream_with_parent("child_stream", parent) + + source = Mock() + source._source_config = { + "stream_groups": { + "my_group": { + "streams": [ + {"name": "parent_stream", "type": "DeclarativeStream"}, + {"name": "child_stream", "type": "DeclarativeStream"}, + ], + "action": {"type": "BlockSimultaneousSyncsAction"}, + } + } + } + + with pytest.raises(ValueError, match="child stream must not share a group with its parent"): + ConcurrentDeclarativeSource._apply_stream_groups(source, [parent, child]) + + +def test_apply_stream_groups_allows_parent_child_in_different_groups(): + """Test _apply_stream_groups allows a child and its parent in different groups.""" + parent = _make_default_stream("parent_stream") + child = _make_child_stream_with_parent("child_stream", parent) + + source = Mock() + source._source_config = { + "stream_groups": { + "group_a": { + "streams": [{"name": "parent_stream", "type": "DeclarativeStream"}], + "action": {"type": "BlockSimultaneousSyncsAction"}, + }, + "group_b": { + "streams": [{"name": "child_stream", "type": "DeclarativeStream"}], + "action": {"type": "BlockSimultaneousSyncsAction"}, + }, + } + } + + ConcurrentDeclarativeSource._apply_stream_groups(source, [parent, child]) + + assert parent.block_simultaneous_read == "group_a" + assert child.block_simultaneous_read == "group_b" + + +def _make_child_stream_with_grouping_router( + child_name: str, parent_stream: DefaultStream +) -> DefaultStream: + """Create a DefaultStream with GroupingPartitionRouter wrapping SubstreamPartitionRouter.""" + from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import ( + ConcurrentCursorFactory, + ConcurrentPerPartitionCursor, + ) + from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import ( + GroupingPartitionRouter, + ) + from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( + ParentStreamConfig, + SubstreamPartitionRouter, + ) + from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( + DeclarativePartitionFactory, + StreamSlicerPartitionGenerator, + ) + from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor + from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( + EpochValueConcurrentStreamStateConverter, + ) + + substream_router = SubstreamPartitionRouter( + parent_stream_configs=[ + ParentStreamConfig( + stream=parent_stream, + parent_key="id", + partition_field="parent_id", + config={}, + parameters={}, + ) + ], + config={}, + parameters={}, + ) + + grouping_router = GroupingPartitionRouter( + group_size=10, + underlying_partition_router=substream_router, + config={}, + ) + + cursor_factory = ConcurrentCursorFactory(lambda *args, **kwargs: Mock()) + message_repository = InMemoryMessageRepository() + state_converter = EpochValueConcurrentStreamStateConverter() + + per_partition_cursor = ConcurrentPerPartitionCursor( + cursor_factory=cursor_factory, + partition_router=grouping_router, + stream_name=child_name, + stream_namespace=None, + stream_state={}, + message_repository=message_repository, + connector_state_manager=Mock(), + connector_state_converter=state_converter, + cursor_field=Mock(cursor_field_key="updated_at"), + ) + + partition_factory = Mock(spec=DeclarativePartitionFactory) + partition_generator = StreamSlicerPartitionGenerator( + partition_factory=partition_factory, + stream_slicer=per_partition_cursor, + ) + + cursor = FinalStateCursor( + stream_name=child_name, stream_namespace=None, message_repository=message_repository + ) + return DefaultStream( + partition_generator=partition_generator, + name=child_name, + json_schema={}, + primary_key=[], + cursor_field=None, + logger=logging.getLogger(f"test.{child_name}"), + cursor=cursor, + ) + + +def test_apply_stream_groups_raises_on_grandparent_child_in_same_group(): + """Test _apply_stream_groups detects deadlock when a grandchild and grandparent share a group.""" + grandparent = _make_default_stream("grandparent_stream") + parent = _make_child_stream_with_parent("parent_stream", grandparent) + child = _make_child_stream_with_parent("child_stream", parent) + + source = Mock() + source._source_config = { + "stream_groups": { + "my_group": { + "streams": [ + {"name": "grandparent_stream", "type": "DeclarativeStream"}, + {"name": "child_stream", "type": "DeclarativeStream"}, + ], + "action": {"type": "BlockSimultaneousSyncsAction"}, + } + } + } + + with pytest.raises(ValueError, match="child stream must not share a group with its parent"): + ConcurrentDeclarativeSource._apply_stream_groups(source, [grandparent, parent, child]) + + +def test_apply_stream_groups_raises_on_parent_child_in_same_group_with_grouping_router(): + """Test _apply_stream_groups detects deadlock when GroupingPartitionRouter wraps SubstreamPartitionRouter.""" + parent = _make_default_stream("parent_stream") + child = _make_child_stream_with_grouping_router("child_stream", parent) + + source = Mock() + source._source_config = { + "stream_groups": { + "my_group": { + "streams": [ + {"name": "parent_stream", "type": "DeclarativeStream"}, + {"name": "child_stream", "type": "DeclarativeStream"}, + ], + "action": {"type": "BlockSimultaneousSyncsAction"}, + } + } + } + + with pytest.raises(ValueError, match="child stream must not share a group with its parent"): + ConcurrentDeclarativeSource._apply_stream_groups(source, [parent, child]) + + +@pytest.mark.parametrize( + "stream_factory,expected_type", + [ + pytest.param( + lambda: _make_default_stream("plain_stream"), + type(None), + id="no_partition_router_returns_none", + ), + pytest.param( + lambda: _make_child_stream_with_parent("child", _make_default_stream("parent")), + "SubstreamPartitionRouter", + id="substream_returns_substream_router", + ), + pytest.param( + lambda: _make_child_stream_with_grouping_router( + "child", _make_default_stream("parent") + ), + "GroupingPartitionRouter", + id="grouping_returns_grouping_router", + ), + ], +) +def test_get_partition_router(stream_factory, expected_type): + """Test DefaultStream.get_partition_router returns the correct router type.""" + from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import ( + GroupingPartitionRouter, + ) + from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( + SubstreamPartitionRouter, + ) + + stream = stream_factory() + router = stream.get_partition_router() + + if expected_type is type(None): + assert router is None + elif expected_type == "SubstreamPartitionRouter": + assert isinstance(router, SubstreamPartitionRouter) + elif expected_type == "GroupingPartitionRouter": + assert isinstance(router, GroupingPartitionRouter) diff --git a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py index a681f75eb..acfa03129 100644 --- a/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py +++ b/unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py @@ -28,8 +28,15 @@ ) from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException from airbyte_cdk.sources.concurrent_source.thread_pool_manager import ThreadPoolManager +from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import ( + GroupingPartitionRouter, +) +from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( + SubstreamPartitionRouter, +) from airbyte_cdk.sources.message import LogMessage, MessageRepository from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream +from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.partition_enqueuer import PartitionEnqueuer from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition @@ -792,3 +799,729 @@ def test_start_next_partition_generator(self): self._thread_pool_manager.submit.assert_called_with( self._partition_enqueuer.generate_partitions, self._stream ) + + +class TestBlockSimultaneousRead(unittest.TestCase): + """Tests for block_simultaneous_read functionality""" + + def setUp(self): + self._partition_enqueuer = Mock(spec=PartitionEnqueuer) + self._thread_pool_manager = Mock(spec=ThreadPoolManager) + self._logger = Mock(spec=logging.Logger) + self._slice_logger = Mock(spec=SliceLogger) + self._message_repository = Mock(spec=MessageRepository) + self._message_repository.consume_queue.return_value = [] + self._partition_reader = Mock(spec=PartitionReader) + + def _create_mock_stream(self, name: str, block_simultaneous_read: str = ""): + """Helper to create a mock stream""" + stream = Mock(spec=AbstractStream) + stream.name = name + stream.block_simultaneous_read = block_simultaneous_read + stream.as_airbyte_stream.return_value = AirbyteStream( + name=name, + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh], + ) + stream.cursor.ensure_at_least_one_state_emitted = Mock() + return stream + + def _create_mock_stream_with_parent( + self, name: str, parent_stream, block_simultaneous_read: str = "" + ): + """Helper to create a mock stream with a parent stream.""" + stream = Mock(spec=DefaultStream) + stream.name = name + stream.block_simultaneous_read = block_simultaneous_read + stream.as_airbyte_stream.return_value = AirbyteStream( + name=name, + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh], + ) + stream.cursor.ensure_at_least_one_state_emitted = Mock() + + mock_partition_router = Mock(spec=SubstreamPartitionRouter) + mock_parent_config = Mock() + mock_parent_config.stream = parent_stream + mock_partition_router.parent_stream_configs = [mock_parent_config] + stream.get_partition_router.return_value = mock_partition_router + + return stream + + def test_defer_stream_when_self_active(self): + """Test that a stream is deferred when it's already active""" + stream = self._create_mock_stream("stream1", block_simultaneous_read="api_group") + + handler = ConcurrentReadProcessor( + [stream], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + # Mark stream as active + handler._active_stream_names.add("stream1") + + # Try to start the stream again + result = handler.start_next_partition_generator() + + # Should return None (no stream started) + assert result is None + + # Stream should be back in the queue + assert len(handler._stream_instances_to_start_partition_generation) == 1 + assert handler._stream_instances_to_start_partition_generation[0] == stream + + # Logger should have been called to log deferral + assert any( + "Deferring stream 'stream1' (group 'api_group') because it's already active" + in str(call) + for call in self._logger.info.call_args_list + ) + + def test_defer_stream_when_parent_active(self): + """Test that a stream is deferred when its parent is active""" + parent_stream = self._create_mock_stream("parent", block_simultaneous_read="api_group") + child_stream = self._create_mock_stream_with_parent( + "child", parent_stream, block_simultaneous_read="api_group" + ) + + handler = ConcurrentReadProcessor( + [parent_stream, child_stream], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + # Mark parent as active + handler._active_stream_names.add("parent") + + # Remove parent from queue (simulate it's already started) + handler._stream_instances_to_start_partition_generation = [child_stream] + + # Try to start child + result = handler.start_next_partition_generator() + + # Should return None (child deferred) + assert result is None + + # Child should be back in the queue + assert len(handler._stream_instances_to_start_partition_generation) == 1 + assert handler._stream_instances_to_start_partition_generation[0] == child_stream + + # Logger should have been called + assert any( + "Deferring stream 'child' because parent stream(s)" in str(call) + for call in self._logger.info.call_args_list + ) + + def test_defer_stream_when_grandparent_active(self): + """Test that a stream is deferred when its grandparent is active""" + grandparent = self._create_mock_stream("grandparent", block_simultaneous_read="api_group") + parent = self._create_mock_stream_with_parent( + "parent", grandparent, block_simultaneous_read="api_group" + ) + child = self._create_mock_stream_with_parent( + "child", parent, block_simultaneous_read="api_group" + ) + + handler = ConcurrentReadProcessor( + [grandparent, parent, child], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + # Mark grandparent as active + handler._active_stream_names.add("grandparent") + + # Only child in queue + handler._stream_instances_to_start_partition_generation = [child] + + # Try to start child + result = handler.start_next_partition_generator() + + # Should return None (child deferred because grandparent is active) + assert result is None + + # Child should be back in the queue + assert len(handler._stream_instances_to_start_partition_generation) == 1 + + def test_different_groups_do_not_block_each_other(self): + """Test that independent streams with different groups don't block each other""" + stream1 = self._create_mock_stream("stream1", block_simultaneous_read="group1") + stream2 = self._create_mock_stream("stream2", block_simultaneous_read="group2") + + handler = ConcurrentReadProcessor( + [stream1, stream2], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + # Start stream1 + handler.start_next_partition_generator() + assert "stream1" in handler._active_stream_names + + # Stream2 should start successfully even though stream1 is active + # because they're in different groups + result = handler.start_next_partition_generator() + + # Should start stream2 (different group, no blocking) + assert result is not None + assert "stream2" in handler._active_stream_names + assert len(handler._stream_instances_to_start_partition_generation) == 0 + + def test_retry_blocked_stream_after_partition_generation(self): + """Test that blocked stream is retried after partition generation completes""" + parent = self._create_mock_stream("parent", block_simultaneous_read="api_group") + child = self._create_mock_stream_with_parent( + "child", parent, block_simultaneous_read="api_group" + ) + + handler = ConcurrentReadProcessor( + [parent, child], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + # Start parent + handler.start_next_partition_generator() + assert "parent" in handler._active_stream_names + + # Complete partition generation for parent (parent has no partitions, so it's done) + sentinel = PartitionGenerationCompletedSentinel(parent) + messages = list(handler.on_partition_generation_completed(sentinel)) + + # Child should have been started automatically by on_partition_generation_completed + # (it calls start_next_partition_generator internally) + assert "child" in handler._active_stream_names + + # Parent should be RE-ACTIVATED because child needs to read from it during partition generation + # This is the correct behavior - prevents simultaneous reads of parent + assert "parent" in handler._active_stream_names + + # Verify the queue is now empty (both streams were started) + assert len(handler._stream_instances_to_start_partition_generation) == 0 + + def test_blocked_stream_added_to_end_of_queue(self): + """Test that blocked streams are added to the end of the queue""" + stream1 = self._create_mock_stream("stream1", block_simultaneous_read="api_group") + stream2 = self._create_mock_stream("stream2", block_simultaneous_read="") + stream3 = self._create_mock_stream("stream3", block_simultaneous_read="") + + handler = ConcurrentReadProcessor( + [stream1, stream2, stream3], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + # Mark stream1 as active + handler._active_stream_names.add("stream1") + + # Try to start streams in order: stream1, stream2, stream3 + result1 = handler.start_next_partition_generator() + + # stream1 should be deferred, stream2 should start + assert result1 is not None + assert "stream2" in handler._active_stream_names + + # Queue should now be [stream3, stream1] (stream1 moved to end) + assert len(handler._stream_instances_to_start_partition_generation) == 2 + assert handler._stream_instances_to_start_partition_generation[0] == stream3 + assert handler._stream_instances_to_start_partition_generation[1] == stream1 + + def test_no_defer_when_flag_false(self): + """Test that blocking doesn't occur when block_simultaneous_read="" """ + stream = self._create_mock_stream("stream1", block_simultaneous_read="") + + handler = ConcurrentReadProcessor( + [stream], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + # Mark stream as active + handler._active_stream_names.add("stream1") + + # Try to start the stream again (should succeed because flag is False) + result = handler.start_next_partition_generator() + + # Should return a status message (stream started) + assert result is not None + assert isinstance(result, AirbyteMessage) + + # Queue should be empty + assert len(handler._stream_instances_to_start_partition_generation) == 0 + + def test_collect_parent_streams_multi_level(self): + """Test that _collect_all_parent_stream_names works recursively""" + grandparent = self._create_mock_stream("grandparent") + parent = self._create_mock_stream_with_parent("parent", grandparent) + child = self._create_mock_stream_with_parent("child", parent) + + handler = ConcurrentReadProcessor( + [grandparent, parent, child], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + # Collect parents for child + parents = handler._collect_all_parent_stream_names("child") + + # Should include both parent and grandparent + assert "parent" in parents + assert "grandparent" in parents + assert len(parents) == 2 + + def test_deactivate_parents_when_partition_generation_completes(self): + """Test that parent streams are deactivated when partition generation completes""" + parent = self._create_mock_stream("parent", block_simultaneous_read="api_group") + child = self._create_mock_stream_with_parent( + "child", parent, block_simultaneous_read="api_group" + ) + + handler = ConcurrentReadProcessor( + [parent, child], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + # Manually mark both as active (simulating partition generation for child) + handler._active_stream_names.add("parent") + handler._active_stream_names.add("child") + handler._streams_currently_generating_partitions.append("child") + + # Ensure child has running partitions (so it doesn't trigger _on_stream_is_done) + mock_partition = Mock(spec=Partition) + mock_partition.stream_name.return_value = "child" + handler._streams_to_running_partitions["child"] = {mock_partition} + + # Remove both streams from the queue so start_next_partition_generator doesn't start them + # This simulates the scenario where both streams have already been started + handler._stream_instances_to_start_partition_generation = [] + + # Complete partition generation for child + sentinel = PartitionGenerationCompletedSentinel(child) + + list(handler.on_partition_generation_completed(sentinel)) + + # Parent should be deactivated (it was only needed for partition generation) + assert "parent" not in handler._active_stream_names + + # Child should still be active (it's reading records) + assert "child" in handler._active_stream_names + + def test_deactivate_only_stream_when_done(self): + """Test that only the stream itself is deactivated when done, not parents""" + parent = self._create_mock_stream("parent", block_simultaneous_read="api_group") + child = self._create_mock_stream_with_parent( + "child", parent, block_simultaneous_read="api_group" + ) + + handler = ConcurrentReadProcessor( + [parent, child], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + # Mark both as active + handler._active_stream_names.add("parent") + handler._active_stream_names.add("child") + + # Start child and mark it as done + handler._stream_instances_to_start_partition_generation = [] + handler._streams_currently_generating_partitions = [] + handler._streams_to_running_partitions["child"] = set() + + # Call _on_stream_is_done for child + list(handler._on_stream_is_done("child")) + + # Child should be deactivated + assert "child" not in handler._active_stream_names + + # Parent should still be active (not deactivated) + assert "parent" in handler._active_stream_names + + def test_multiple_blocked_streams_retry_in_order(self): + """Test that multiple blocked streams are retried in order""" + parent = self._create_mock_stream("parent", block_simultaneous_read="api_group") + child1 = self._create_mock_stream_with_parent( + "child1", parent, block_simultaneous_read="api_group" + ) + child2 = self._create_mock_stream_with_parent( + "child2", parent, block_simultaneous_read="api_group" + ) + + handler = ConcurrentReadProcessor( + [parent, child1, child2], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + # Start parent + result = handler.start_next_partition_generator() + assert result is not None + assert "parent" in handler._active_stream_names + assert "api_group" in handler._active_groups + assert "parent" in handler._active_groups["api_group"] + + # Try to start next stream (child1) - should be deferred because parent is active + result = handler.start_next_partition_generator() + assert result is None # child1 was deferred + + # After first deferral, we should still have 2 streams in queue (child1 moved to end) + assert len(handler._stream_instances_to_start_partition_generation) == 2 + # child1 was moved to the back, so the queue has the other child first + queue_streams = handler._stream_instances_to_start_partition_generation + assert child1 in queue_streams + assert child2 in queue_streams + + # Try to start next stream (child2) - should also be deferred + result = handler.start_next_partition_generator() + assert result is None # child2 was deferred + + # Both streams still in queue, but order may have changed + assert len(handler._stream_instances_to_start_partition_generation) == 2 + + # Verify neither child is active yet (both blocked by parent) + assert "child1" not in handler._active_stream_names + assert "child2" not in handler._active_stream_names + + # Verify deferral was logged for both children + logger_calls = [str(call) for call in self._logger.info.call_args_list] + assert any("Deferring stream 'child1'" in call for call in logger_calls) + assert any("Deferring stream 'child2'" in call for call in logger_calls) + + # Simulate parent completing partition generation (parent has no partitions, so it's done) + sentinel = PartitionGenerationCompletedSentinel(parent) + list(handler.on_partition_generation_completed(sentinel)) + + # After parent completes, one of the children should start (whichever was first in queue) + # We know at least one child started because the queue shrunk + assert len(handler._stream_instances_to_start_partition_generation) == 1 + + # Verify that exactly one child is now active + children_active = [ + name for name in ["child1", "child2"] if name in handler._active_stream_names + ] + assert len(children_active) == 1, ( + f"Expected exactly one child active, got: {children_active}" + ) + + # Parent should be re-activated because the active child needs to read from it + assert "parent" in handler._active_stream_names + + def test_child_without_flag_blocked_by_parent_with_flag(self): + """Test that a child WITHOUT block_simultaneous_read is blocked by parent WITH the flag""" + # Parent has the flag, child does NOT + parent = self._create_mock_stream("parent", block_simultaneous_read="api_group") + child = self._create_mock_stream_with_parent("child", parent, block_simultaneous_read="") + + handler = ConcurrentReadProcessor( + [parent, child], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + # Mark parent as active and already started (remove from queue) + handler._active_stream_names.add("parent") + handler._stream_instances_to_start_partition_generation.remove(parent) + + # Try to start child (should be deferred even though child doesn't have the flag) + result = handler.start_next_partition_generator() + + # Child should be deferred because parent has block_simultaneous_read="api_group" and is active + assert result is None # No stream started + assert "child" not in handler._active_stream_names + # Child should be moved to end of queue (still 1 stream in queue) + assert len(handler._stream_instances_to_start_partition_generation) == 1 + assert handler._stream_instances_to_start_partition_generation[0] == child + + def test_child_with_flag_not_blocked_by_parent_without_flag(self): + """Test that a child WITH block_simultaneous_read is NOT blocked by parent WITHOUT the flag""" + # Parent does NOT have the flag, child does + parent = self._create_mock_stream("parent", block_simultaneous_read="") + child = self._create_mock_stream_with_parent( + "child", parent, block_simultaneous_read="api_group" + ) + + handler = ConcurrentReadProcessor( + [parent, child], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + # Mark parent as active and already started (remove from queue) + handler._active_stream_names.add("parent") + handler._stream_instances_to_start_partition_generation.remove(parent) + + # Try to start child (should succeed even though parent is active) + result = handler.start_next_partition_generator() + + # Child should start successfully because parent doesn't have block_simultaneous_read + assert result is not None # Stream started + assert "child" in handler._active_stream_names + # Queue should now be empty (both streams started) + assert len(handler._stream_instances_to_start_partition_generation) == 0 + + def test_unrelated_streams_in_same_group_block_each_other(self): + """Test that multiple unrelated streams with the same group name block each other""" + # Create three unrelated streams (no parent-child relationship) in the same group + stream1 = self._create_mock_stream("stream1", block_simultaneous_read="shared_endpoint") + stream2 = self._create_mock_stream("stream2", block_simultaneous_read="shared_endpoint") + stream3 = self._create_mock_stream("stream3", block_simultaneous_read="shared_endpoint") + + handler = ConcurrentReadProcessor( + [stream1, stream2, stream3], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + # Start stream1 + result = handler.start_next_partition_generator() + assert result is not None + assert "stream1" in handler._active_stream_names + assert "shared_endpoint" in handler._active_groups + assert "stream1" in handler._active_groups["shared_endpoint"] + + # Try to start stream2 (should be deferred because it's in the same group) + result = handler.start_next_partition_generator() + # stream2 should be deferred, stream3 should also be deferred + # All three are in same group, only stream1 is active + assert result is None # No stream started + + # Both stream2 and stream3 should be in the queue + assert len(handler._stream_instances_to_start_partition_generation) == 2 + + # Verify logger was called with deferral message + assert any( + "Deferring stream 'stream2'" in str(call) and "shared_endpoint" in str(call) + for call in self._logger.info.call_args_list + ) + + def test_child_starts_after_parent_completes_via_partition_complete_sentinel(self): + """Test that child stream starts after parent completes via on_partition_complete_sentinel""" + parent = self._create_mock_stream("parent", block_simultaneous_read="api_group") + child = self._create_mock_stream_with_parent( + "child", parent, block_simultaneous_read="api_group" + ) + + handler = ConcurrentReadProcessor( + [parent, child], + self._partition_enqueuer, + self._thread_pool_manager, + self._logger, + self._slice_logger, + self._message_repository, + self._partition_reader, + ) + + # Start parent + handler.start_next_partition_generator() + assert "parent" in handler._active_stream_names + + # Try to start child (should be deferred) + result = handler.start_next_partition_generator() + assert result is None + assert "child" not in handler._active_stream_names + assert len(handler._stream_instances_to_start_partition_generation) == 1 + + # Create a partition for parent and add it to running partitions + # (parent is already in _streams_currently_generating_partitions from start_next_partition_generator) + mock_partition = Mock(spec=Partition) + mock_partition.stream_name.return_value = "parent" + handler._streams_to_running_partitions["parent"].add(mock_partition) + + # Complete partition generation for parent + sentinel_gen = PartitionGenerationCompletedSentinel(parent) + list(handler.on_partition_generation_completed(sentinel_gen)) + + # Now complete the partition (this triggers stream done) + sentinel_complete = PartitionCompleteSentinel(mock_partition) + messages = list(handler.on_partition_complete_sentinel(sentinel_complete)) + + # Child should have been started automatically + assert "child" in handler._active_stream_names + assert len(handler._stream_instances_to_start_partition_generation) == 0 + + # Verify a STARTED message was emitted for child + started_messages = [ + msg + for msg in messages + if msg.type == MessageType.TRACE + and msg.trace.stream_status + and msg.trace.stream_status.status == AirbyteStreamStatus.STARTED + ] + assert len(started_messages) == 1 + assert started_messages[0].trace.stream_status.stream_descriptor.name == "child" + + +def test_is_done_raises_when_partition_generation_queue_not_empty(): + """Test is_done raises AirbyteTracedException if streams remain in the partition generation queue.""" + partition_enqueuer = Mock(spec=PartitionEnqueuer) + thread_pool_manager = Mock(spec=ThreadPoolManager) + logger = Mock(spec=logging.Logger) + slice_logger = Mock(spec=SliceLogger) + message_repository = Mock(spec=MessageRepository) + message_repository.consume_queue.return_value = [] + partition_reader = Mock(spec=PartitionReader) + + stream = Mock(spec=AbstractStream) + stream.name = "stuck_stream" + stream.block_simultaneous_read = "" + stream.as_airbyte_stream.return_value = AirbyteStream( + name="stuck_stream", + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh], + ) + + handler = ConcurrentReadProcessor( + [stream], + partition_enqueuer, + thread_pool_manager, + logger, + slice_logger, + message_repository, + partition_reader, + ) + + # Artificially mark the stream as done without removing it from the partition generation queue + handler._streams_done.add("stuck_stream") + + with pytest.raises(AirbyteTracedException, match="remained in the partition generation queue"): + handler.is_done() + + +def test_is_done_raises_when_active_groups_not_empty(): + """Test is_done raises AirbyteTracedException if active groups remain after all streams complete.""" + partition_enqueuer = Mock(spec=PartitionEnqueuer) + thread_pool_manager = Mock(spec=ThreadPoolManager) + logger = Mock(spec=logging.Logger) + slice_logger = Mock(spec=SliceLogger) + message_repository = Mock(spec=MessageRepository) + message_repository.consume_queue.return_value = [] + partition_reader = Mock(spec=PartitionReader) + + stream = Mock(spec=AbstractStream) + stream.name = "stuck_stream" + stream.block_simultaneous_read = "my_group" + stream.as_airbyte_stream.return_value = AirbyteStream( + name="stuck_stream", + json_schema={}, + supported_sync_modes=[SyncMode.full_refresh], + ) + + handler = ConcurrentReadProcessor( + [stream], + partition_enqueuer, + thread_pool_manager, + logger, + slice_logger, + message_repository, + partition_reader, + ) + + # Mark stream as done but leave the group active (simulating a bug) + handler._streams_done.add("stuck_stream") + handler._stream_instances_to_start_partition_generation.clear() + handler._active_groups["my_group"] = {"stuck_stream"} + + with pytest.raises( + AirbyteTracedException, match="still active after all streams were marked done" + ): + handler.is_done() + + +def test_collect_parent_stream_names_unwraps_grouping_partition_router(): + """Test _collect_all_parent_stream_names unwraps GroupingPartitionRouter to find parents.""" + partition_enqueuer = Mock(spec=PartitionEnqueuer) + thread_pool_manager = Mock(spec=ThreadPoolManager) + logger = Mock(spec=logging.Logger) + slice_logger = Mock(spec=SliceLogger) + message_repository = Mock(spec=MessageRepository) + message_repository.consume_queue.return_value = [] + partition_reader = Mock(spec=PartitionReader) + + parent_stream = Mock(spec=AbstractStream) + parent_stream.name = "parent" + parent_stream.block_simultaneous_read = "" + + # Child has a GroupingPartitionRouter wrapping a SubstreamPartitionRouter + child_stream = Mock(spec=DefaultStream) + child_stream.name = "child" + child_stream.block_simultaneous_read = "" + + mock_substream_router = Mock(spec=SubstreamPartitionRouter) + mock_parent_config = Mock() + mock_parent_config.stream = parent_stream + mock_substream_router.parent_stream_configs = [mock_parent_config] + + mock_grouping_router = Mock(spec=GroupingPartitionRouter) + mock_grouping_router.underlying_partition_router = mock_substream_router + child_stream.get_partition_router.return_value = mock_grouping_router + + handler = ConcurrentReadProcessor( + [parent_stream, child_stream], + partition_enqueuer, + thread_pool_manager, + logger, + slice_logger, + message_repository, + partition_reader, + ) + + parent_names = handler._collect_all_parent_stream_names("child") + assert parent_names == {"parent"}