Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,10 @@ definitions:
title: Inject Start Value Into Outgoing HTTP Request
description: Optionally configures how the start value will be sent in requests to the source API.
"$ref": "#/definitions/RequestOption"
pages_per_checkpoint_interval:
title: Pages Per Checkpoint Interval
description: The number of pages to fetch before emitting an intermediate state checkpoint during pagination. This enables resuming long-running syncs closer to where they failed rather than from the beginning of the current slice. Only effective when records are returned in ascending cursor order. Defaults to disabled.
type: integer
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -1160,6 +1164,10 @@ definitions:
- "P1W"
- "P1M"
- "P1Y"
pages_per_checkpoint_interval:
title: Pages Per Checkpoint Interval
description: The number of pages to fetch before emitting an intermediate state checkpoint during pagination. This enables resuming long-running syncs closer to where they failed rather than from the beginning of the current slice. Only effective when records are returned in ascending cursor order. Defaults to disabled.
type: integer
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1604,6 +1604,11 @@ class IncrementingCountCursor(BaseModel):
description="Optionally configures how the start value will be sent in requests to the source API.",
title="Inject Start Value Into Outgoing HTTP Request",
)
pages_per_checkpoint_interval: Optional[int] = Field(
None,
description="The number of pages to fetch before emitting an intermediate state checkpoint during pagination. This enables resuming long-running syncs closer to where they failed rather than from the beginning of the current slice. Only effective when records are returned in ascending cursor order. Defaults to disabled.",
title="Pages Per Checkpoint Interval",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down Expand Up @@ -1720,6 +1725,11 @@ class DatetimeBasedCursor(BaseModel):
examples=["P1W", "{{ config['step_increment'] }}"],
title="Step",
)
pages_per_checkpoint_interval: Optional[int] = Field(
None,
description="The number of pages to fetch before emitting an intermediate state checkpoint during pagination. This enables resuming long-running syncs closer to where they failed rather than from the beginning of the current slice. Only effective when records are returned in ascending cursor order. Defaults to disabled.",
title="Pages Per Checkpoint Interval",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,9 @@
LocalFileSystemFileWriter,
NoopFileWriter,
)
from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import PaginationTracker
from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import (
PaginationTracker,
)
from airbyte_cdk.sources.declarative.schema import (
ComplexFieldType,
DefaultSchemaLoader,
Expand Down Expand Up @@ -3463,16 +3465,29 @@ def _get_url(req: Requester) -> str:
additional_query_properties=query_properties,
log_formatter=self._get_log_formatter(log_formatter, name),
pagination_tracker_factory=self._create_pagination_tracker_factory(
model.pagination_reset, cursor
model.pagination_reset,
cursor,
incremental_sync.pages_per_checkpoint_interval if incremental_sync else None,
),
parameters=model.parameters or {},
)

def _create_pagination_tracker_factory(
self, model: Optional[PaginationResetModel], cursor: Cursor
self,
model: Optional[PaginationResetModel],
cursor: Cursor,
pages_per_checkpoint_interval: int | None = None,
) -> Callable[[], PaginationTracker]:
checkpoint_cursor: Optional[ConcurrentCursor] = (
cursor if isinstance(cursor, ConcurrentCursor) else None
)
effective_interval = pages_per_checkpoint_interval if checkpoint_cursor else None

if model is None:
return lambda: PaginationTracker()
return lambda: PaginationTracker(
checkpoint_cursor=checkpoint_cursor,
pages_per_checkpoint_interval=effective_interval,
)

# Until we figure out a way to use any cursor for PaginationTracker, we will have to have this cursor selector logic
cursor_factory: Callable[[], Optional[ConcurrentCursor]] = lambda: None
Expand All @@ -3494,7 +3509,12 @@ def _create_pagination_tracker_factory(
raise ValueError(f"Unknown PaginationReset action: {model.action}")

limit = model.limits.number_of_records if model and model.limits else None
return lambda: PaginationTracker(cursor_factory(), limit)
return lambda: PaginationTracker(
cursor_factory(),
limit,
checkpoint_cursor=checkpoint_cursor,
pages_per_checkpoint_interval=effective_interval,
)

def _get_log_formatter(
self, log_formatter: Callable[[Response], Any] | None, name: str
Expand Down
17 changes: 16 additions & 1 deletion airbyte_cdk/sources/declarative/retrievers/pagination_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@
from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

DEFAULT_PAGES_PER_CHECKPOINT_INTERVAL = 5


class PaginationTracker:
_record_count: int
_number_of_attempt_with_same_slice: int

def __init__(
self, cursor: Optional[ConcurrentCursor] = None, max_number_of_records: Optional[int] = None
self,
cursor: Optional[ConcurrentCursor] = None,
max_number_of_records: Optional[int] = None,
checkpoint_cursor: Optional[ConcurrentCursor] = None,
pages_per_checkpoint_interval: Optional[int] = None,
) -> None:
"""
Ideally, we would have passed the `Cursor` interface here instead of `ConcurrentCursor` but not all
Expand All @@ -24,6 +30,9 @@ def __init__(
"""
self._cursor = cursor
self._limit = max_number_of_records
self._checkpoint_cursor = checkpoint_cursor
self._pages_per_checkpoint_interval = pages_per_checkpoint_interval
self._page_count = 0
self._reset()

"""
Expand All @@ -40,6 +49,12 @@ def observe(self, record: Record) -> None:
if self._cursor:
self._cursor.observe(record)

def on_page_complete(self, stream_slice: StreamSlice) -> None:
if self._checkpoint_cursor and self._pages_per_checkpoint_interval:
self._page_count += 1
if self._page_count % self._pages_per_checkpoint_interval == 0:
self._checkpoint_cursor.emit_intermediate_state(stream_slice)

def has_reached_limit(self) -> bool:
return self._limit is not None and self._record_count >= self._limit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ def _read_pages(
)
reset_pagination = False
else:
pagination_tracker.on_page_complete(stream_slice)
last_page_token_value = (
next_page_token.get("next_page_token") if next_page_token else None
)
Expand Down
36 changes: 36 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,42 @@ def close_partition(self, partition: Partition) -> None:
self._emit_state_message()
self._has_closed_at_least_one_slice = True

def emit_intermediate_state(self, stream_slice: StreamSlice) -> None:
if not self._is_ascending_order:
return

with self._lock:
most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
stream_slice
)
if most_recent_cursor_value is None:
return

if self._slice_boundary_fields:
if "slices" not in self._concurrent_state:
return
start_value = self._connector_state_converter.parse_value(
stream_slice[self._slice_boundary_fields[self._START_BOUNDARY]]
)
self._concurrent_state["slices"].append(
{
self._connector_state_converter.START_KEY: start_value,
self._connector_state_converter.END_KEY: most_recent_cursor_value,
self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
}
)
else:
self._concurrent_state["slices"].append(
{
self._connector_state_converter.START_KEY: self.start,
self._connector_state_converter.END_KEY: most_recent_cursor_value,
self._connector_state_converter.MOST_RECENT_RECORD_KEY: most_recent_cursor_value,
}
)

self._merge_partitions()
self._emit_state_message()

def _add_slice_to_state(self, partition: Partition) -> None:
most_recent_cursor_value = self._most_recent_cursor_value_per_partition.get(
partition.to_slice()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,44 @@
_A_STREAM_SLICE = StreamSlice(cursor_slice={"stream slice": "slice value"}, partition={})


@pytest.mark.parametrize(
"pages_per_interval, total_pages, expected_checkpoint_calls",
[
pytest.param(5, 4, 0, id="below_interval_no_checkpoint"),
pytest.param(5, 5, 1, id="exactly_one_interval"),
pytest.param(5, 10, 2, id="two_intervals"),
pytest.param(5, 12, 2, id="past_second_interval_but_not_third"),
pytest.param(3, 9, 3, id="three_intervals_with_smaller_page_size"),
pytest.param(1, 3, 3, id="checkpoint_every_page"),
],
)
def test_on_page_complete_triggers_checkpoint_at_interval(
pages_per_interval: int, total_pages: int, expected_checkpoint_calls: int
) -> None:
checkpoint_cursor = Mock(spec=ConcurrentCursor)
tracker = PaginationTracker(
checkpoint_cursor=checkpoint_cursor,
pages_per_checkpoint_interval=pages_per_interval,
)

for _ in range(total_pages):
tracker.on_page_complete(_A_STREAM_SLICE)

assert checkpoint_cursor.emit_intermediate_state.call_count == expected_checkpoint_calls


def test_on_page_complete_without_checkpoint_cursor_is_noop() -> None:
tracker = PaginationTracker()
tracker.on_page_complete(_A_STREAM_SLICE)


def test_on_page_complete_without_interval_is_noop() -> None:
checkpoint_cursor = Mock(spec=ConcurrentCursor)
tracker = PaginationTracker(checkpoint_cursor=checkpoint_cursor)
tracker.on_page_complete(_A_STREAM_SLICE)
checkpoint_cursor.emit_intermediate_state.assert_not_called()


class TestPaginationTracker(TestCase):
def setUp(self) -> None:
self._cursor = Mock(spec=ConcurrentCursor)
Expand Down
135 changes: 135 additions & 0 deletions unit_tests/sources/streams/concurrent/test_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1387,3 +1387,138 @@ def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then
)
== True
)


def test_emit_intermediate_state_with_boundary_fields_emits_state():
message_repository = Mock(spec=MessageRepository)
state_manager = Mock(spec=ConnectorStateManager)
cursor = ConcurrentCursor(
_A_STREAM_NAME,
_A_STREAM_NAMESPACE,
deepcopy(_NO_STATE),
message_repository,
state_manager,
EpochValueConcurrentStreamStateConverter(is_sequential_state=True),
CursorField(_A_CURSOR_FIELD_KEY),
_SLICE_BOUNDARY_FIELDS,
None,
EpochValueConcurrentStreamStateConverter.get_end_provider(),
_NO_LOOKBACK_WINDOW,
)

stream_slice = StreamSlice(
partition={_LOWER_SLICE_BOUNDARY_FIELD: 10, _UPPER_SLICE_BOUNDARY_FIELD: 100},
cursor_slice={},
)
partition = _partition(stream_slice)
cursor.observe(
Record(
data={_A_CURSOR_FIELD_KEY: 50},
associated_slice=partition.to_slice(),
stream_name=_A_STREAM_NAME,
)
)

cursor.emit_intermediate_state(stream_slice)

message_repository.emit_message.assert_called_once()


def test_emit_intermediate_state_without_boundary_fields_emits_state():
message_repository = Mock(spec=MessageRepository)
state_manager = Mock(spec=ConnectorStateManager)
cursor = ConcurrentCursor(
_A_STREAM_NAME,
_A_STREAM_NAMESPACE,
deepcopy(_NO_STATE),
message_repository,
state_manager,
EpochValueConcurrentStreamStateConverter(is_sequential_state=True),
CursorField(_A_CURSOR_FIELD_KEY),
None,
None,
EpochValueConcurrentStreamStateConverter.get_end_provider(),
_NO_LOOKBACK_WINDOW,
)

partition = _partition(_NO_SLICE)
cursor.observe(
Record(
data={_A_CURSOR_FIELD_KEY: 50},
associated_slice=partition.to_slice(),
stream_name=_A_STREAM_NAME,
)
)

cursor.emit_intermediate_state(partition.to_slice())

message_repository.emit_message.assert_called_once()


def test_emit_intermediate_state_when_not_ascending_order_does_not_emit():
message_repository = Mock(spec=MessageRepository)
state_manager = Mock(spec=ConnectorStateManager)
cursor = ConcurrentCursor(
_A_STREAM_NAME,
_A_STREAM_NAMESPACE,
deepcopy(_NO_STATE),
message_repository,
state_manager,
EpochValueConcurrentStreamStateConverter(is_sequential_state=True),
CursorField(_A_CURSOR_FIELD_KEY),
_SLICE_BOUNDARY_FIELDS,
None,
EpochValueConcurrentStreamStateConverter.get_end_provider(),
_NO_LOOKBACK_WINDOW,
)

stream_slice = StreamSlice(
partition={_LOWER_SLICE_BOUNDARY_FIELD: 10, _UPPER_SLICE_BOUNDARY_FIELD: 100},
cursor_slice={},
)
partition = _partition(stream_slice)
cursor.observe(
Record(
data={_A_CURSOR_FIELD_KEY: 50},
associated_slice=partition.to_slice(),
stream_name=_A_STREAM_NAME,
)
)
cursor.observe(
Record(
data={_A_CURSOR_FIELD_KEY: 30},
associated_slice=partition.to_slice(),
stream_name=_A_STREAM_NAME,
)
)

cursor.emit_intermediate_state(stream_slice)

message_repository.emit_message.assert_not_called()


def test_emit_intermediate_state_when_no_records_observed_does_not_emit():
message_repository = Mock(spec=MessageRepository)
state_manager = Mock(spec=ConnectorStateManager)
cursor = ConcurrentCursor(
_A_STREAM_NAME,
_A_STREAM_NAMESPACE,
deepcopy(_NO_STATE),
message_repository,
state_manager,
EpochValueConcurrentStreamStateConverter(is_sequential_state=True),
CursorField(_A_CURSOR_FIELD_KEY),
_SLICE_BOUNDARY_FIELDS,
None,
EpochValueConcurrentStreamStateConverter.get_end_provider(),
_NO_LOOKBACK_WINDOW,
)

stream_slice = StreamSlice(
partition={_LOWER_SLICE_BOUNDARY_FIELD: 10, _UPPER_SLICE_BOUNDARY_FIELD: 100},
cursor_slice={},
)

cursor.emit_intermediate_state(stream_slice)

message_repository.emit_message.assert_not_called()