From 92d4fadda18004428dddefbc458523c87235b620 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 20 Feb 2026 15:50:44 +0000 Subject: [PATCH 1/3] feat: add intermediate state checkpointing during pagination When records are sorted in ascending order by cursor field, the CDK will now emit state checkpoints every N pages (default: 5) during pagination within a partition. This prevents loss of all progress when a sync fails mid-pagination due to rate limits or errors. Changes: - Add emit_intermediate_state() to ConcurrentCursor - Extend PaginationTracker with page counting and checkpoint triggering - Call on_page_complete() in SimpleRetriever._read_pages() - Wire up checkpoint cursor in model_to_component_factory Co-Authored-By: gl_anatolii.yatsuk@airbyte.io --- .../parsers/model_to_component_factory.py | 21 +++++++++-- .../retrievers/pagination_tracker.py | 17 ++++++++- .../retrievers/simple_retriever.py | 1 + .../sources/streams/concurrent/cursor.py | 36 +++++++++++++++++++ 4 files changed, 71 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 2bd7d268d..0c0dbc4a6 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -545,7 +545,10 @@ LocalFileSystemFileWriter, NoopFileWriter, ) -from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import PaginationTracker +from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import ( + DEFAULT_PAGES_PER_CHECKPOINT_INTERVAL, + PaginationTracker, +) from airbyte_cdk.sources.declarative.schema import ( ComplexFieldType, DefaultSchemaLoader, @@ -3471,8 +3474,15 @@ def _get_url(req: Requester) -> str: def _create_pagination_tracker_factory( self, model: Optional[PaginationResetModel], cursor: Cursor ) -> Callable[[], PaginationTracker]: + checkpoint_cursor: Optional[ConcurrentCursor] = ( + cursor if isinstance(cursor, ConcurrentCursor) else None + ) + if model is None: - return lambda: PaginationTracker() + return lambda: PaginationTracker( + checkpoint_cursor=checkpoint_cursor, + pages_per_checkpoint_interval=DEFAULT_PAGES_PER_CHECKPOINT_INTERVAL if checkpoint_cursor else None, + ) # Until we figure out a way to use any cursor for PaginationTracker, we will have to have this cursor selector logic cursor_factory: Callable[[], Optional[ConcurrentCursor]] = lambda: None @@ -3494,7 +3504,12 @@ def _create_pagination_tracker_factory( raise ValueError(f"Unknown PaginationReset action: {model.action}") limit = model.limits.number_of_records if model and model.limits else None - return lambda: PaginationTracker(cursor_factory(), limit) + return lambda: PaginationTracker( + cursor_factory(), + limit, + checkpoint_cursor=checkpoint_cursor, + pages_per_checkpoint_interval=DEFAULT_PAGES_PER_CHECKPOINT_INTERVAL if checkpoint_cursor else None, + ) def _get_log_formatter( self, log_formatter: Callable[[Response], Any] | None, name: str diff --git a/airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py b/airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py index 4987ea38c..e541a8b0a 100644 --- a/airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py +++ b/airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py @@ -7,13 +7,19 @@ from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor from airbyte_cdk.utils.traced_exception import AirbyteTracedException +DEFAULT_PAGES_PER_CHECKPOINT_INTERVAL = 5 + class PaginationTracker: _record_count: int _number_of_attempt_with_same_slice: int def __init__( - self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None + self, + cursor: Optional[ConcurrentCursor] = None, + max_number_of_records: Optional[int] = None, + checkpoint_cursor: Optional[ConcurrentCursor] = None, + pages_per_checkpoint_interval: Optional[int] = None, ) -> None: """ Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all @@ -24,6 +30,9 @@ def __init__( """ self._cursor = cursor self._limit = max_number_of_records + self._checkpoint_cursor = checkpoint_cursor + self._pages_per_checkpoint_interval = pages_per_checkpoint_interval + self._page_count = 0 self._reset() """ @@ -40,6 +49,12 @@ def observe(self, record: Record) -> None: if self._cursor: self._cursor.observe(record) + def on_page_complete(self, stream_slice: StreamSlice) -> None: + if self._checkpoint_cursor and self._pages_per_checkpoint_interval: + self._page_count += 1 + if self._page_count % self._pages_per_checkpoint_interval == 0: + self._checkpoint_cursor.emit_intermediate_state(stream_slice) + def has_reached_limit(self) -> bool: return self._limit is not None and self._record_count >= self._limit diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 1f2eb1c66..d3cada1d9 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -418,6 +418,7 @@ def _read_pages( ) reset_pagination = False else: + pagination_tracker.on_page_complete(stream_slice) last_page_token_value = ( next_page_token.get("next_page_token") if next_page_token else None ) diff --git a/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte_cdk/sources/streams/concurrent/cursor.py index e3a487183..19c4611dc 100644 --- a/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -286,6 +286,42 @@ def close_partition(self, partition: Partition) -> None: self._emit_state_message() self._has_closed_at_least_one_slice = True + def emit_intermediate_state(self, stream_slice: StreamSlice) -> None: + if not self._is_ascending_order: + return + + with self._lock: + most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( + stream_slice + ) + if most_recent_cursor_value is None: + return + + if self._slice_boundary_fields: + if "slices" not in self._concurrent_state: + return + start_value = self._connector_state_converter.parse_value( + stream_slice[self._slice_boundary_fields[self._START_BOUNDARY]] + ) + self._concurrent_state["slices"].append( + { + self._connector_state_converter.START_KEY: start_value, + self._connector_state_converter.END_KEY: most_recent_cursor_value, + self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, + } + ) + else: + self._concurrent_state["slices"].append( + { + self._connector_state_converter.START_KEY: self.start, + self._connector_state_converter.END_KEY: most_recent_cursor_value, + self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value, + } + ) + + self._merge_partitions() + self._emit_state_message() + def _add_slice_to_state(self, partition: Partition) -> None: most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get( partition.to_slice() From fed610031d1faf889bb4ecf1085800d21f118086 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 20 Feb 2026 15:55:08 +0000 Subject: [PATCH 2/3] fix: ruff format and add unit tests for intermediate checkpointing Co-Authored-By: gl_anatolii.yatsuk@airbyte.io --- .../parsers/model_to_component_factory.py | 8 +- .../retrievers/test_pagination_tracker.py | 38 +++++ .../sources/streams/concurrent/test_cursor.py | 135 ++++++++++++++++++ 3 files changed, 179 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 0c0dbc4a6..014fda589 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3481,7 +3481,9 @@ def _create_pagination_tracker_factory( if model is None: return lambda: PaginationTracker( checkpoint_cursor=checkpoint_cursor, - pages_per_checkpoint_interval=DEFAULT_PAGES_PER_CHECKPOINT_INTERVAL if checkpoint_cursor else None, + pages_per_checkpoint_interval=DEFAULT_PAGES_PER_CHECKPOINT_INTERVAL + if checkpoint_cursor + else None, ) # Until we figure out a way to use any cursor for PaginationTracker, we will have to have this cursor selector logic @@ -3508,7 +3510,9 @@ def _create_pagination_tracker_factory( cursor_factory(), limit, checkpoint_cursor=checkpoint_cursor, - pages_per_checkpoint_interval=DEFAULT_PAGES_PER_CHECKPOINT_INTERVAL if checkpoint_cursor else None, + pages_per_checkpoint_interval=DEFAULT_PAGES_PER_CHECKPOINT_INTERVAL + if checkpoint_cursor + else None, ) def _get_log_formatter( diff --git a/unit_tests/sources/declarative/retrievers/test_pagination_tracker.py b/unit_tests/sources/declarative/retrievers/test_pagination_tracker.py index 6b8207cc1..4757866a0 100644 --- a/unit_tests/sources/declarative/retrievers/test_pagination_tracker.py +++ b/unit_tests/sources/declarative/retrievers/test_pagination_tracker.py @@ -20,6 +20,44 @@ _A_STREAM_SLICE = StreamSlice(cursor_slice={"stream slice": "slice value"}, partition={}) +@pytest.mark.parametrize( + "pages_per_interval, total_pages, expected_checkpoint_calls", + [ + pytest.param(5, 4, 0, id="below_interval_no_checkpoint"), + pytest.param(5, 5, 1, id="exactly_one_interval"), + pytest.param(5, 10, 2, id="two_intervals"), + pytest.param(5, 12, 2, id="past_second_interval_but_not_third"), + pytest.param(3, 9, 3, id="three_intervals_with_smaller_page_size"), + pytest.param(1, 3, 3, id="checkpoint_every_page"), + ], +) +def test_on_page_complete_triggers_checkpoint_at_interval( + pages_per_interval: int, total_pages: int, expected_checkpoint_calls: int +) -> None: + checkpoint_cursor = Mock(spec=ConcurrentCursor) + tracker = PaginationTracker( + checkpoint_cursor=checkpoint_cursor, + pages_per_checkpoint_interval=pages_per_interval, + ) + + for _ in range(total_pages): + tracker.on_page_complete(_A_STREAM_SLICE) + + assert checkpoint_cursor.emit_intermediate_state.call_count == expected_checkpoint_calls + + +def test_on_page_complete_without_checkpoint_cursor_is_noop() -> None: + tracker = PaginationTracker() + tracker.on_page_complete(_A_STREAM_SLICE) + + +def test_on_page_complete_without_interval_is_noop() -> None: + checkpoint_cursor = Mock(spec=ConcurrentCursor) + tracker = PaginationTracker(checkpoint_cursor=checkpoint_cursor) + tracker.on_page_complete(_A_STREAM_SLICE) + checkpoint_cursor.emit_intermediate_state.assert_not_called() + + class TestPaginationTracker(TestCase): def setUp(self) -> None: self._cursor = Mock(spec=ConcurrentCursor) diff --git a/unit_tests/sources/streams/concurrent/test_cursor.py b/unit_tests/sources/streams/concurrent/test_cursor.py index 34c92800d..13b61a264 100644 --- a/unit_tests/sources/streams/concurrent/test_cursor.py +++ b/unit_tests/sources/streams/concurrent/test_cursor.py @@ -1387,3 +1387,138 @@ def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then ) == True ) + + +def test_emit_intermediate_state_with_boundary_fields_emits_state(): + message_repository = Mock(spec=MessageRepository) + state_manager = Mock(spec=ConnectorStateManager) + cursor = ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + deepcopy(_NO_STATE), + message_repository, + state_manager, + EpochValueConcurrentStreamStateConverter(is_sequential_state=True), + CursorField(_A_CURSOR_FIELD_KEY), + _SLICE_BOUNDARY_FIELDS, + None, + EpochValueConcurrentStreamStateConverter.get_end_provider(), + _NO_LOOKBACK_WINDOW, + ) + + stream_slice = StreamSlice( + partition={_LOWER_SLICE_BOUNDARY_FIELD: 10, _UPPER_SLICE_BOUNDARY_FIELD: 100}, + cursor_slice={}, + ) + partition = _partition(stream_slice) + cursor.observe( + Record( + data={_A_CURSOR_FIELD_KEY: 50}, + associated_slice=partition.to_slice(), + stream_name=_A_STREAM_NAME, + ) + ) + + cursor.emit_intermediate_state(stream_slice) + + message_repository.emit_message.assert_called_once() + + +def test_emit_intermediate_state_without_boundary_fields_emits_state(): + message_repository = Mock(spec=MessageRepository) + state_manager = Mock(spec=ConnectorStateManager) + cursor = ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + deepcopy(_NO_STATE), + message_repository, + state_manager, + EpochValueConcurrentStreamStateConverter(is_sequential_state=True), + CursorField(_A_CURSOR_FIELD_KEY), + None, + None, + EpochValueConcurrentStreamStateConverter.get_end_provider(), + _NO_LOOKBACK_WINDOW, + ) + + partition = _partition(_NO_SLICE) + cursor.observe( + Record( + data={_A_CURSOR_FIELD_KEY: 50}, + associated_slice=partition.to_slice(), + stream_name=_A_STREAM_NAME, + ) + ) + + cursor.emit_intermediate_state(partition.to_slice()) + + message_repository.emit_message.assert_called_once() + + +def test_emit_intermediate_state_when_not_ascending_order_does_not_emit(): + message_repository = Mock(spec=MessageRepository) + state_manager = Mock(spec=ConnectorStateManager) + cursor = ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + deepcopy(_NO_STATE), + message_repository, + state_manager, + EpochValueConcurrentStreamStateConverter(is_sequential_state=True), + CursorField(_A_CURSOR_FIELD_KEY), + _SLICE_BOUNDARY_FIELDS, + None, + EpochValueConcurrentStreamStateConverter.get_end_provider(), + _NO_LOOKBACK_WINDOW, + ) + + stream_slice = StreamSlice( + partition={_LOWER_SLICE_BOUNDARY_FIELD: 10, _UPPER_SLICE_BOUNDARY_FIELD: 100}, + cursor_slice={}, + ) + partition = _partition(stream_slice) + cursor.observe( + Record( + data={_A_CURSOR_FIELD_KEY: 50}, + associated_slice=partition.to_slice(), + stream_name=_A_STREAM_NAME, + ) + ) + cursor.observe( + Record( + data={_A_CURSOR_FIELD_KEY: 30}, + associated_slice=partition.to_slice(), + stream_name=_A_STREAM_NAME, + ) + ) + + cursor.emit_intermediate_state(stream_slice) + + message_repository.emit_message.assert_not_called() + + +def test_emit_intermediate_state_when_no_records_observed_does_not_emit(): + message_repository = Mock(spec=MessageRepository) + state_manager = Mock(spec=ConnectorStateManager) + cursor = ConcurrentCursor( + _A_STREAM_NAME, + _A_STREAM_NAMESPACE, + deepcopy(_NO_STATE), + message_repository, + state_manager, + EpochValueConcurrentStreamStateConverter(is_sequential_state=True), + CursorField(_A_CURSOR_FIELD_KEY), + _SLICE_BOUNDARY_FIELDS, + None, + EpochValueConcurrentStreamStateConverter.get_end_provider(), + _NO_LOOKBACK_WINDOW, + ) + + stream_slice = StreamSlice( + partition={_LOWER_SLICE_BOUNDARY_FIELD: 10, _UPPER_SLICE_BOUNDARY_FIELD: 100}, + cursor_slice={}, + ) + + cursor.emit_intermediate_state(stream_slice) + + message_repository.emit_message.assert_not_called() From e0ef3eb3c8f5e3ca0c7628fdb931e3f620f4852a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 20 Feb 2026 16:04:35 +0000 Subject: [PATCH 3/3] feat: add pages_per_checkpoint_interval to declarative schema for incremental sync cursors Co-Authored-By: gl_anatolii.yatsuk@airbyte.io --- .../declarative_component_schema.yaml | 8 ++++++++ .../models/declarative_component_schema.py | 10 ++++++++++ .../parsers/model_to_component_factory.py | 19 ++++++++++--------- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 84aaa6c53..056926a7a 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -876,6 +876,10 @@ definitions: title: Inject Start Value Into Outgoing HTTP Request description: Optionally configures how the start value will be sent in requests to the source API. "$ref": "#/definitions/RequestOption" + pages_per_checkpoint_interval: + title: Pages Per Checkpoint Interval + description: The number of pages to fetch before emitting an intermediate state checkpoint during pagination. This enables resuming long-running syncs closer to where they failed rather than from the beginning of the current slice. Only effective when records are returned in ascending cursor order. Defaults to disabled. + type: integer $parameters: type: object additionalProperties: true @@ -1160,6 +1164,10 @@ definitions: - "P1W" - "P1M" - "P1Y" + pages_per_checkpoint_interval: + title: Pages Per Checkpoint Interval + description: The number of pages to fetch before emitting an intermediate state checkpoint during pagination. This enables resuming long-running syncs closer to where they failed rather than from the beginning of the current slice. Only effective when records are returned in ascending cursor order. Defaults to disabled. + type: integer $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 5d2f0521f..af0c52979 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1604,6 +1604,11 @@ class IncrementingCountCursor(BaseModel): description="Optionally configures how the start value will be sent in requests to the source API.", title="Inject Start Value Into Outgoing HTTP Request", ) + pages_per_checkpoint_interval: Optional[int] = Field( + None, + description="The number of pages to fetch before emitting an intermediate state checkpoint during pagination. This enables resuming long-running syncs closer to where they failed rather than from the beginning of the current slice. Only effective when records are returned in ascending cursor order. Defaults to disabled.", + title="Pages Per Checkpoint Interval", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -1720,6 +1725,11 @@ class DatetimeBasedCursor(BaseModel): examples=["P1W", "{{ config['step_increment'] }}"], title="Step", ) + pages_per_checkpoint_interval: Optional[int] = Field( + None, + description="The number of pages to fetch before emitting an intermediate state checkpoint during pagination. This enables resuming long-running syncs closer to where they failed rather than from the beginning of the current slice. Only effective when records are returned in ascending cursor order. Defaults to disabled.", + title="Pages Per Checkpoint Interval", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 014fda589..630283e9b 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -546,7 +546,6 @@ NoopFileWriter, ) from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import ( - DEFAULT_PAGES_PER_CHECKPOINT_INTERVAL, PaginationTracker, ) from airbyte_cdk.sources.declarative.schema import ( @@ -3466,24 +3465,28 @@ def _get_url(req: Requester) -> str: additional_query_properties=query_properties, log_formatter=self._get_log_formatter(log_formatter, name), pagination_tracker_factory=self._create_pagination_tracker_factory( - model.pagination_reset, cursor + model.pagination_reset, + cursor, + incremental_sync.pages_per_checkpoint_interval if incremental_sync else None, ), parameters=model.parameters or {}, ) def _create_pagination_tracker_factory( - self, model: Optional[PaginationResetModel], cursor: Cursor + self, + model: Optional[PaginationResetModel], + cursor: Cursor, + pages_per_checkpoint_interval: int | None = None, ) -> Callable[[], PaginationTracker]: checkpoint_cursor: Optional[ConcurrentCursor] = ( cursor if isinstance(cursor, ConcurrentCursor) else None ) + effective_interval = pages_per_checkpoint_interval if checkpoint_cursor else None if model is None: return lambda: PaginationTracker( checkpoint_cursor=checkpoint_cursor, - pages_per_checkpoint_interval=DEFAULT_PAGES_PER_CHECKPOINT_INTERVAL - if checkpoint_cursor - else None, + pages_per_checkpoint_interval=effective_interval, ) # Until we figure out a way to use any cursor for PaginationTracker, we will have to have this cursor selector logic @@ -3510,9 +3513,7 @@ def _create_pagination_tracker_factory( cursor_factory(), limit, checkpoint_cursor=checkpoint_cursor, - pages_per_checkpoint_interval=DEFAULT_PAGES_PER_CHECKPOINT_INTERVAL - if checkpoint_cursor - else None, + pages_per_checkpoint_interval=effective_interval, ) def _get_log_formatter(