diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index e68318cd4..de14d55e5 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3752,6 +3752,22 @@ definitions: title: Incremental Stream description: Component used to coordinate how records are extracted across stream slices and request pages when the state provided. "$ref": "#/definitions/DeclarativeStream" + api_retention_period: + title: API Retention Period + description: | + The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss. + This is useful for APIs like Stripe Events API which only retain data for 30 days. + * **PT1H**: 1 hour + * **P1D**: 1 day + * **P1W**: 1 week + * **P1M**: 1 month + * **P1Y**: 1 year + * **P30D**: 30 days + type: string + examples: + - "P30D" + - "P90D" + - "P1Y" $parameters: type: object additionalProperties: true diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index f0379368d..d1f2ca41e 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -3,6 +3,7 @@ # import copy +import datetime import logging import threading import time @@ -658,3 +659,21 @@ def get_global_state( if stream_state and "state" in stream_state else None ) + + def get_cursor_datetime_from_state( + self, stream_state: Mapping[str, Any] + ) -> datetime.datetime | None: + """Extract and parse the cursor datetime from the global cursor in per-partition state. + + For per-partition cursors, the global cursor is stored under the "state" key. + This method delegates to the underlying cursor factory to parse the datetime. + + Returns None if the global cursor is not present or cannot be parsed. + """ + global_state = stream_state.get(self._GLOBAL_STATE_KEY) + if not global_state or not isinstance(global_state, dict): + return None + + # Create a cursor to delegate the parsing + cursor = self._cursor_factory.create(stream_state={}, runtime_lookback_window=None) + return cursor.get_cursor_datetime_from_state(global_state) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 5d2f0521f..04dffcaff 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -2885,6 +2883,12 @@ class StateDelegatingStream(BaseModel): description="Component used to coordinate how records are extracted across stream slices and request pages when the state provided.", title="Incremental Stream", ) + api_retention_period: Optional[str] = Field( + None, + description="The data retention period of the incremental API (ISO8601 duration). If the cursor value is older than this retention period, the connector will automatically fall back to a full refresh to avoid data loss.\nThis is useful for APIs like Stripe Events API which only retain data for 30 days.\n * **PT1H**: 1 hour\n * **P1D**: 1 day\n * **P1W**: 1 week\n * **P1M**: 1 month\n * **P1Y**: 1 year\n * **P30D**: 30 days\n", + examples=["P30D", "P90D", "P1Y"], + title="API Retention Period", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 2bd7d268d..bb3c1e653 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -11,6 +11,7 @@ import re from functools import partial from typing import ( + TYPE_CHECKING, Any, Callable, Dict, @@ -27,6 +28,11 @@ get_type_hints, ) +if TYPE_CHECKING: + from airbyte_cdk.legacy.sources.declarative.incremental.datetime_based_cursor import ( + DatetimeBasedCursor, + ) + from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream from isodate import parse_duration from pydantic.v1 import BaseModel @@ -3548,7 +3554,6 @@ def create_state_delegating_stream( self, model: StateDelegatingStreamModel, config: Config, - has_parent_state: Optional[bool] = None, **kwargs: Any, ) -> DefaultStream: if ( @@ -3559,18 +3564,105 @@ def create_state_delegating_stream( f"state_delegating_stream, full_refresh_stream name and incremental_stream must have equal names. Instead has {model.name}, {model.full_refresh_stream.name} and {model.incremental_stream.name}." ) - stream_model = self._get_state_delegating_stream_model( - False if has_parent_state is None else has_parent_state, model - ) + if model.api_retention_period: + for stream_model in (model.full_refresh_stream, model.incremental_stream): + if isinstance(stream_model.incremental_sync, IncrementingCountCursorModel): + raise ValueError( + f"Stream '{model.name}' uses IncrementingCountCursor which is not supported " + f"with api_retention_period. IncrementingCountCursor does not use datetime-based " + f"cursors, so cursor age validation cannot be performed." + ) + + stream_state = self._connector_state_manager.get_stream_state(model.name, None) + + if not stream_state: + return self._create_component_from_model( # type: ignore[no-any-return] + model.full_refresh_stream, config=config, **kwargs + ) + + incremental_stream: DefaultStream = self._create_component_from_model( + model.incremental_stream, config=config, **kwargs + ) # type: ignore[assignment] + + # Only run cursor age validation for streams that are in the configured + # catalog (or when no catalog was provided, e.g. during discover / connector + # builder). Streams not selected by the user but instantiated as parent-stream + # dependencies must not go through this path because it emits state messages + # that the destination does not know about, causing "Stream not found" crashes. + stream_is_in_catalog = ( + not self._stream_name_to_configured_stream # no catalog → validate by default + or model.name in self._stream_name_to_configured_stream + ) + if model.api_retention_period and stream_is_in_catalog: + full_refresh_stream: DefaultStream = self._create_component_from_model( + model.full_refresh_stream, config=config, **kwargs + ) # type: ignore[assignment] + if self._is_cursor_older_than_retention_period( + stream_state, + full_refresh_stream.cursor, + incremental_stream.cursor, + model.api_retention_period, + model.name, + ): + self._connector_state_manager.update_state_for_stream(model.name, None, {}) + state_message = self._connector_state_manager.create_state_message(model.name, None) + self._message_repository.emit_message(state_message) + return full_refresh_stream + + return incremental_stream + + @staticmethod + def _is_cursor_older_than_retention_period( + stream_state: Mapping[str, Any], + full_refresh_cursor: Cursor, + incremental_cursor: Cursor, + api_retention_period: str, + stream_name: str, + ) -> bool: + """Check if the cursor value in the state is older than the API's retention period. + + Checks cursors in sequence: full refresh cursor first, then incremental cursor. + FinalStateCursor returns now() for completed full refresh state (NO_CURSOR_STATE_KEY), + which is always within retention, so we use incremental. For other states, it returns + None and we fall back to checking the incremental cursor. + + Returns True if the cursor is older than the retention period (should use full refresh). + Returns False if the cursor is within the retention period (safe to use incremental). + """ + retention_duration = parse_duration(api_retention_period) + retention_cutoff = datetime.datetime.now(datetime.timezone.utc) - retention_duration + + # Check full refresh cursor first + cursor_datetime = full_refresh_cursor.get_cursor_datetime_from_state(stream_state) + + # If full refresh cursor returns None, check incremental cursor + if cursor_datetime is None: + cursor_datetime = incremental_cursor.get_cursor_datetime_from_state(stream_state) + + if cursor_datetime is None: + # Neither cursor could parse the state - fall back to full refresh to be safe + return True - return self._create_component_from_model(stream_model, config=config, **kwargs) # type: ignore[no-any-return] # DeclarativeStream will be created as stream_model is alwyas DeclarativeStreamModel + if cursor_datetime < retention_cutoff: + logging.warning( + f"Stream '{stream_name}' has a cursor value older than " + f"the API's retention period of {api_retention_period} " + f"(cutoff: {retention_cutoff.isoformat()}). " + f"Falling back to full refresh to avoid data loss." + ) + return True + + return False def _get_state_delegating_stream_model( - self, has_parent_state: bool, model: StateDelegatingStreamModel + self, + model: StateDelegatingStreamModel, + parent_state: Optional[Mapping[str, Any]] = None, ) -> DeclarativeStreamModel: + """Return the appropriate underlying stream model based on state.""" return ( model.incremental_stream - if self._connector_state_manager.get_stream_state(model.name, None) or has_parent_state + if self._connector_state_manager.get_stream_state(model.name, None) or parent_state else model.full_refresh_stream ) @@ -3901,17 +3993,13 @@ def create_substream_partition_router( def create_parent_stream_config_with_substream_wrapper( self, model: ParentStreamConfigModel, config: Config, *, stream_name: str, **kwargs: Any ) -> Any: - # getting the parent state child_state = self._connector_state_manager.get_stream_state(stream_name, None) - # This flag will be used exclusively for StateDelegatingStream when a parent stream is created - has_parent_state = bool( - self._connector_state_manager.get_stream_state(stream_name, None) - if model.incremental_dependency - else False + parent_state: Optional[Mapping[str, Any]] = ( + child_state if model.incremental_dependency and child_state else None ) connector_state_manager = self._instantiate_parent_stream_state_manager( - child_state, config, model, has_parent_state + child_state, config, model, parent_state ) substream_factory = ModelToComponentFactory( @@ -3943,7 +4031,7 @@ def _instantiate_parent_stream_state_manager( child_state: MutableMapping[str, Any], config: Config, model: ParentStreamConfigModel, - has_parent_state: bool, + parent_state: Optional[Mapping[str, Any]] = None, ) -> ConnectorStateManager: """ With DefaultStream, the state needs to be provided during __init__ of the cursor as opposed to the @@ -3955,21 +4043,18 @@ def _instantiate_parent_stream_state_manager( """ if model.incremental_dependency and child_state: parent_stream_name = model.stream.name or "" - parent_state = ConcurrentPerPartitionCursor.get_parent_state( + extracted_parent_state = ConcurrentPerPartitionCursor.get_parent_state( child_state, parent_stream_name ) - if not parent_state: - # there are two migration cases: state value from child stream or from global state - parent_state = ConcurrentPerPartitionCursor.get_global_state( + if not extracted_parent_state: + extracted_parent_state = ConcurrentPerPartitionCursor.get_global_state( child_state, parent_stream_name ) - if not parent_state and not isinstance(parent_state, dict): + if not extracted_parent_state and not isinstance(extracted_parent_state, dict): cursor_values = child_state.values() if cursor_values and len(cursor_values) == 1: - # We assume the child state is a pair `{: }` and we will use the - # cursor value as a parent state. incremental_sync_model: Union[ DatetimeBasedCursorModel, IncrementingCountCursorModel, @@ -3977,14 +4062,14 @@ def _instantiate_parent_stream_state_manager( model.stream.incremental_sync # type: ignore # if we are there, it is because there is incremental_dependency and therefore there is an incremental_sync on the parent stream if isinstance(model.stream, DeclarativeStreamModel) else self._get_state_delegating_stream_model( - has_parent_state, model.stream + model.stream, parent_state=parent_state ).incremental_sync ) cursor_field = InterpolatedString.create( incremental_sync_model.cursor_field, parameters=incremental_sync_model.parameters or {}, ).eval(config) - parent_state = AirbyteStateMessage( + extracted_parent_state = AirbyteStateMessage( type=AirbyteStateType.STREAM, stream=AirbyteStreamState( stream_descriptor=StreamDescriptor( @@ -3995,7 +4080,7 @@ def _instantiate_parent_stream_state_manager( ), ), ) - return ConnectorStateManager([parent_state] if parent_state else []) + return ConnectorStateManager([extracted_parent_state] if extracted_parent_state else []) return ConnectorStateManager([]) diff --git a/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte_cdk/sources/streams/concurrent/cursor.py index e3a487183..11eaad235 100644 --- a/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import datetime import functools import logging import threading @@ -89,6 +90,27 @@ def stream_slices(self) -> Iterable[StreamSlice]: """ yield StreamSlice(partition={}, cursor_slice={}) + def get_cursor_datetime_from_state( + self, stream_state: Mapping[str, Any] + ) -> datetime.datetime | None: + """Extract and parse the cursor datetime from the given stream state. + + This method is used by StateDelegatingStream to validate cursor age against + an API's data retention period. Subclasses should implement this method to + extract the cursor value from their specific state structure and parse it + into a datetime object. + + Returns None if the cursor cannot be extracted or parsed, which will cause + StateDelegatingStream to fall back to full refresh (safe default). + + Raises NotImplementedError by default - subclasses must implement this method + if they want to support cursor age validation with api_retention_period. + """ + raise NotImplementedError( + f"{self.__class__.__name__} does not implement get_cursor_datetime_from_state. " + f"Cursor age validation with api_retention_period is not supported for this cursor type." + ) + class FinalStateCursor(Cursor): """Cursor that is used to guarantee at least one state message is emitted for a concurrent stream.""" @@ -134,6 +156,22 @@ def ensure_at_least_one_state_emitted(self) -> None: def should_be_synced(self, record: Record) -> bool: return True + def get_cursor_datetime_from_state( + self, stream_state: Mapping[str, Any] + ) -> datetime.datetime | None: + """Return now() if state indicates a completed full refresh, else None. + + When the state has NO_CURSOR_STATE_KEY: True, it means the previous sync was a + completed full refresh. Returning now() indicates the cursor is "current" and + within any retention period, so we should use incremental sync. + + For any other state format, return None to indicate this cursor cannot parse it, + allowing the incremental cursor to handle the state instead. + """ + if stream_state.get(NO_CURSOR_STATE_KEY): + return datetime.datetime.now(datetime.timezone.utc) + return None + class ConcurrentCursor(Cursor): _START_BOUNDARY = 0 @@ -568,3 +606,47 @@ def reduce_slice_range(self, stream_slice: StreamSlice) -> StreamSlice: ) else: return stream_slice + + def get_cursor_datetime_from_state( + self, stream_state: Mapping[str, Any] + ) -> datetime.datetime | None: + """Extract and parse the cursor datetime from the given stream state. + + For concurrent cursors, the state can be in two formats: + 1. Sequential/legacy format: {cursor_field: cursor_value} + 2. Concurrent format: {state_type: "date-range", slices: [...]} + + Returns the cursor datetime if present and parseable, otherwise returns None. + """ + # Check if state is in concurrent format (need to convert to dict for type compatibility) + mutable_state: MutableMapping[str, Any] = dict(stream_state) + if self._connector_state_converter.is_state_message_compatible(mutable_state): + slices = stream_state.get("slices", []) + if not slices: + return None + # Get the most recent cursor value from the first slice (after merging) + first_slice = slices[0] + cursor_value = first_slice.get( + self._connector_state_converter.MOST_RECENT_RECORD_KEY + ) or first_slice.get(self._connector_state_converter.END_KEY) + if not cursor_value: + return None + try: + parsed_value = self._connector_state_converter.parse_value(cursor_value) + if isinstance(parsed_value, datetime.datetime): + return parsed_value + return None + except (ValueError, TypeError): + return None + + # Sequential/legacy format: {cursor_field: cursor_value} + cursor_value = stream_state.get(self._cursor_field.cursor_field_key) + if not cursor_value: + return None + try: + parsed_value = self._connector_state_converter.parse_value(cursor_value) + if isinstance(parsed_value, datetime.datetime): + return parsed_value + return None + except (ValueError, TypeError): + return None diff --git a/unit_tests/sources/declarative/test_state_delegating_stream.py b/unit_tests/sources/declarative/test_state_delegating_stream.py index 1239fe653..7cb717e42 100644 --- a/unit_tests/sources/declarative/test_state_delegating_stream.py +++ b/unit_tests/sources/declarative/test_state_delegating_stream.py @@ -2,10 +2,13 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. # +import copy import json +import logging from unittest.mock import MagicMock import freezegun +import pytest from airbyte_cdk.models import ( AirbyteStateBlob, @@ -253,3 +256,667 @@ def test_incremental_retriever(): {"id": 4, "name": "item_4", "updated_at": "2024-02-01"}, ] assert expected_incremental == incremental_records + + +def _create_manifest_with_retention_period(api_retention_period: str) -> dict: + """Create a manifest with api_retention_period set on the StateDelegatingStream.""" + manifest = copy.deepcopy(_MANIFEST) + manifest["definitions"]["TestStream"]["api_retention_period"] = api_retention_period + return manifest + + +@freezegun.freeze_time("2024-07-15") +def test_cursor_age_validation_falls_back_to_full_refresh_when_cursor_too_old(): + """Test that when cursor is older than retention period, full refresh is used.""" + manifest = _create_manifest_with_retention_period("P7D") + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/items"), + HttpResponse( + body=json.dumps( + [ + {"id": 1, "name": "item_1", "updated_at": "2024-07-13"}, + {"id": 2, "name": "item_2", "updated_at": "2024-07-14"}, + ] + ) + ), + ) + + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="TestStream", namespace=None), + stream_state=AirbyteStateBlob(updated_at="2024-07-01"), + ), + ) + ] + source = ConcurrentDeclarativeSource( + source_config=manifest, config=_CONFIG, catalog=None, state=state + ) + configured_catalog = create_configured_catalog(source, _CONFIG) + + records = get_records(source, _CONFIG, configured_catalog, state) + expected = [ + {"id": 1, "name": "item_1", "updated_at": "2024-07-13"}, + {"id": 2, "name": "item_2", "updated_at": "2024-07-14"}, + ] + assert expected == records + + +@freezegun.freeze_time("2024-07-15") +def test_cursor_age_validation_clears_state_when_falling_back_to_full_refresh(): + """Test that state is cleared when cursor is older than retention period.""" + manifest = _create_manifest_with_retention_period("P7D") + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/items"), + HttpResponse( + body=json.dumps( + [ + {"id": 1, "name": "item_1", "updated_at": "2024-07-13"}, + {"id": 2, "name": "item_2", "updated_at": "2024-07-14"}, + ] + ) + ), + ) + + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="TestStream", namespace=None), + stream_state=AirbyteStateBlob(updated_at="2024-07-01"), + ), + ) + ] + source = ConcurrentDeclarativeSource( + source_config=manifest, config=_CONFIG, catalog=None, state=state + ) + configured_catalog = create_configured_catalog(source, _CONFIG) + + all_messages = list( + source.read(logger=MagicMock(), config=_CONFIG, catalog=configured_catalog, state=state) + ) + + state_messages = [msg for msg in all_messages if msg.type == Type.STATE] + assert len(state_messages) > 0, "Expected at least one state message" + first_state = state_messages[0].state.stream.stream_state + assert first_state == AirbyteStateBlob(), ( + f"Expected first state message to be empty (clearing stale state), got: {first_state}" + ) + + +@freezegun.freeze_time("2024-07-15") +def test_cursor_age_validation_uses_incremental_when_cursor_within_retention(): + """Test that when cursor is within retention period, incremental sync is used.""" + manifest = _create_manifest_with_retention_period("P30D") + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest( + url="https://api.test.com/items_with_filtration?start=2024-07-13&end=2024-07-15" + ), + HttpResponse( + body=json.dumps( + [ + {"id": 3, "name": "item_3", "updated_at": "2024-07-14"}, + ] + ) + ), + ) + + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="TestStream", namespace=None), + stream_state=AirbyteStateBlob(updated_at="2024-07-13"), + ), + ) + ] + source = ConcurrentDeclarativeSource( + source_config=manifest, config=_CONFIG, catalog=None, state=state + ) + configured_catalog = create_configured_catalog(source, _CONFIG) + + records = get_records(source, _CONFIG, configured_catalog, state) + expected = [ + {"id": 3, "name": "item_3", "updated_at": "2024-07-14"}, + ] + assert expected == records + + +@freezegun.freeze_time("2024-07-15") +def test_cursor_age_validation_with_1_day_retention_falls_back(): + """Test cursor age validation with P1D retention period falls back to full refresh.""" + manifest = _create_manifest_with_retention_period("P1D") + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/items"), + HttpResponse(body=json.dumps([{"id": 1, "updated_at": "2024-07-14"}])), + ) + + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="TestStream", namespace=None), + stream_state=AirbyteStateBlob(updated_at="2024-07-13"), + ), + ) + ] + source = ConcurrentDeclarativeSource( + source_config=manifest, config=_CONFIG, catalog=None, state=state + ) + configured_catalog = create_configured_catalog(source, _CONFIG) + + records = get_records(source, _CONFIG, configured_catalog, state) + assert len(records) == 1 + + +@freezegun.freeze_time("2024-07-15") +def test_cursor_age_validation_emits_warning_when_falling_back(caplog): + """Test that a warning is emitted when cursor is older than retention period.""" + manifest = _create_manifest_with_retention_period("P7D") + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/items"), + HttpResponse(body=json.dumps([{"id": 1, "updated_at": "2024-07-14"}])), + ) + + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="TestStream", namespace=None), + stream_state=AirbyteStateBlob(updated_at="2024-07-01"), + ), + ) + ] + + with caplog.at_level(logging.WARNING): + source = ConcurrentDeclarativeSource( + source_config=manifest, config=_CONFIG, catalog=None, state=state + ) + configured_catalog = create_configured_catalog(source, _CONFIG) + get_records(source, _CONFIG, configured_catalog, state) + + warning_messages = [r.message for r in caplog.records if r.levelno == logging.WARNING] + assert any( + "TestStream" in msg and "older than" in msg and "P7D" in msg for msg in warning_messages + ), f"Expected warning about stale cursor not found. Warnings: {warning_messages}" + + +@freezegun.freeze_time("2024-07-15") +def test_cursor_age_validation_with_per_partition_state_uses_global_cursor(): + """Test that per-partition state structure uses global cursor for age validation.""" + manifest = _create_manifest_with_retention_period("P7D") + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/items"), + HttpResponse( + body=json.dumps( + [ + {"id": 1, "name": "item_1", "updated_at": "2024-07-13"}, + ] + ) + ), + ) + + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="TestStream", namespace=None), + stream_state=AirbyteStateBlob( + state={"updated_at": "2024-07-01"}, + states=[ + { + "partition": {"parent_id": "1"}, + "cursor": {"updated_at": "2024-07-10"}, + }, + { + "partition": {"parent_id": "2"}, + "cursor": {"updated_at": "2024-07-05"}, + }, + ], + use_global_cursor=False, + ), + ), + ) + ] + source = ConcurrentDeclarativeSource( + source_config=manifest, config=_CONFIG, catalog=None, state=state + ) + configured_catalog = create_configured_catalog(source, _CONFIG) + + records = get_records(source, _CONFIG, configured_catalog, state) + assert len(records) == 1 + + +@freezegun.freeze_time("2024-07-15") +def test_cursor_age_validation_with_per_partition_state_falls_back_to_full_refresh(): + """Test that per-partition state falls back to full refresh. + + When per-partition state is provided but the stream uses a ConcurrentCursor (not + ConcurrentPerPartitionCursor), the cursor cannot extract a datetime from the + per-partition format and returns None, causing a full refresh fallback. + """ + manifest = _create_manifest_with_retention_period("P30D") + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/items"), + HttpResponse( + body=json.dumps( + [ + {"id": 3, "name": "item_3", "updated_at": "2024-07-14"}, + ] + ) + ), + ) + + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="TestStream", namespace=None), + stream_state=AirbyteStateBlob( + state={"updated_at": "2024-07-10"}, + states=[ + { + "partition": {"parent_id": "1"}, + "cursor": {"updated_at": "2024-07-10"}, + }, + ], + use_global_cursor=False, + ), + ), + ) + ] + source = ConcurrentDeclarativeSource( + source_config=manifest, config=_CONFIG, catalog=None, state=state + ) + configured_catalog = create_configured_catalog(source, _CONFIG) + + records = get_records(source, _CONFIG, configured_catalog, state) + assert len(records) == 1 + + +def _create_manifest_with_incrementing_count_cursor(api_retention_period: str) -> dict: + """Create a manifest with IncrementingCountCursor and api_retention_period.""" + manifest = copy.deepcopy(_MANIFEST) + manifest["definitions"]["TestStream"]["api_retention_period"] = api_retention_period + + incrementing_cursor = { + "type": "IncrementingCountCursor", + "cursor_field": "id", + "start_value": 0, + } + manifest["definitions"]["TestStream"]["full_refresh_stream"]["incremental_sync"] = ( + incrementing_cursor + ) + manifest["definitions"]["TestStream"]["incremental_stream"]["incremental_sync"] = ( + incrementing_cursor + ) + return manifest + + +def test_cursor_age_validation_raises_error_for_incrementing_count_cursor(): + """Test that IncrementingCountCursor with api_retention_period raises ValueError.""" + manifest = _create_manifest_with_incrementing_count_cursor("P7D") + + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="TestStream", namespace=None), + stream_state=AirbyteStateBlob(id=100), + ), + ) + ] + + source = ConcurrentDeclarativeSource( + source_config=manifest, config=_CONFIG, catalog=None, state=state + ) + + with pytest.raises(ValueError, match="IncrementingCountCursor"): + source.discover(logger=MagicMock(), config=_CONFIG) + + +def test_cursor_age_validation_raises_error_for_unparseable_cursor(): + """Test that unparseable cursor datetime raises ValueError when api_retention_period is set.""" + manifest = _create_manifest_with_retention_period("P7D") + + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="TestStream", namespace=None), + stream_state=AirbyteStateBlob(updated_at="not-a-date"), + ), + ) + ] + + source = ConcurrentDeclarativeSource( + source_config=manifest, config=_CONFIG, catalog=None, state=state + ) + + with pytest.raises(ValueError, match="not-a-date"): + source.discover(logger=MagicMock(), config=_CONFIG) + + +@freezegun.freeze_time("2024-07-15") +def test_final_state_cursor_falls_back_to_full_refresh_when_state_unparseable(): + """When state is a final state (NO_CURSOR_STATE_KEY), ConcurrentCursor cannot parse it, + so both cursors return None and the implementation falls back to full refresh as the safe default.""" + manifest = _create_manifest_with_retention_period("P7D") + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/items"), + HttpResponse( + body=json.dumps( + [ + {"id": 1, "name": "item_1", "updated_at": "2024-07-14"}, + ] + ) + ), + ) + + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="TestStream", namespace=None), + stream_state=AirbyteStateBlob(__ab_no_cursor_state_message=True), + ), + ) + ] + source = ConcurrentDeclarativeSource( + source_config=manifest, config=_CONFIG, catalog=None, state=state + ) + configured_catalog = create_configured_catalog(source, _CONFIG) + + records = get_records(source, _CONFIG, configured_catalog, state) + assert len(records) == 1 + + +_PARENT_CHILD_MANIFEST: dict = { + "version": "6.0.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["ChildStream"]}, + "definitions": { + "ParentStream": { + "type": "StateDelegatingStream", + "name": "ParentStream", + "full_refresh_stream": { + "type": "DeclarativeStream", + "name": "ParentStream", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": {}, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "/parents", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "incremental_sync": { + "type": "DatetimeBasedCursor", + "start_datetime": { + "datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}" + }, + "end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"}, + "datetime_format": "%Y-%m-%d", + "cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"], + "cursor_field": "updated_at", + }, + }, + "incremental_stream": { + "type": "DeclarativeStream", + "name": "ParentStream", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": {}, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "/parents_incremental", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + "incremental_sync": { + "type": "DatetimeBasedCursor", + "start_datetime": { + "datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}" + }, + "end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"}, + "datetime_format": "%Y-%m-%d", + "cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"], + "cursor_granularity": "P1D", + "step": "P15D", + "cursor_field": "updated_at", + "start_time_option": { + "type": "RequestOption", + "field_name": "start", + "inject_into": "request_parameter", + }, + "end_time_option": { + "type": "RequestOption", + "field_name": "end", + "inject_into": "request_parameter", + }, + }, + }, + }, + "ChildStream": { + "type": "DeclarativeStream", + "name": "ChildStream", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": {}, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "/children/{{ stream_slice.parent_id }}", + "http_method": "GET", + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "partition_router": { + "type": "SubstreamPartitionRouter", + "parent_stream_configs": [ + { + "stream": "#/definitions/ParentStream", + "parent_key": "id", + "partition_field": "parent_id", + "incremental_dependency": True, + } + ], + }, + }, + "incremental_sync": { + "type": "DatetimeBasedCursor", + "start_datetime": { + "datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}" + }, + "end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"}, + "datetime_format": "%Y-%m-%d", + "cursor_datetime_formats": ["%Y-%m-%d"], + "cursor_field": "updated_at", + }, + }, + }, + "streams": [{"$ref": "#/definitions/ChildStream"}], + "spec": { + "connection_specification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": [], + "properties": {}, + "additionalProperties": True, + }, + "documentation_url": "https://example.org", + "type": "Spec", + }, +} + + +def _create_parent_child_manifest_with_retention_period( + api_retention_period: str, +) -> dict: + manifest = copy.deepcopy(_PARENT_CHILD_MANIFEST) + manifest["definitions"]["ParentStream"]["api_retention_period"] = api_retention_period + return manifest + + +@freezegun.freeze_time("2024-07-15") +def test_parent_state_delegating_stream_retention_falls_back_to_full_refresh(): + """When parent StateDelegatingStream has old cursor in child state, retention triggers full refresh for parent.""" + manifest = _create_parent_child_manifest_with_retention_period("P7D") + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/parents"), + HttpResponse( + body=json.dumps([{"id": 1, "name": "parent_1", "updated_at": "2024-07-14"}]) + ), + ) + http_mocker.get( + HttpRequest(url="https://api.test.com/children/1"), + HttpResponse( + body=json.dumps([{"id": 10, "name": "child_1", "updated_at": "2024-07-14"}]) + ), + ) + + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="ChildStream", namespace=None), + stream_state=AirbyteStateBlob( + use_global_cursor=False, + state={"updated_at": "2024-07-14"}, + states=[], + parent_state={"ParentStream": {"updated_at": "2024-06-01"}}, + lookback_window=0, + ), + ), + ) + ] + source = ConcurrentDeclarativeSource( + source_config=manifest, config=_CONFIG, catalog=None, state=state + ) + configured_catalog = create_configured_catalog(source, _CONFIG) + records = get_records(source, _CONFIG, configured_catalog, state) + assert len(records) == 1 + + +@freezegun.freeze_time("2024-07-15") +def test_unconfigured_parent_stream_does_not_emit_state_on_retention_fallback(): + """When a parent StateDelegatingStream has stale cursor state but is NOT in the + configured catalog (only the child is selected), no state message should be emitted + for the parent. Previously this would emit a state message for the parent stream, + causing the destination to crash with 'Stream not found'.""" + manifest = _create_parent_child_manifest_with_retention_period("P7D") + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/parents"), + HttpResponse( + body=json.dumps([{"id": 1, "name": "parent_1", "updated_at": "2024-07-14"}]) + ), + ) + http_mocker.get( + HttpRequest(url="https://api.test.com/children/1"), + HttpResponse( + body=json.dumps([{"id": 10, "name": "child_1", "updated_at": "2024-07-14"}]) + ), + ) + + # ParentStream has stale state (older than 7 days) but ParentStream is NOT + # in the configured catalog — only ChildStream is selected. + state = [ + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="ParentStream", namespace=None), + stream_state=AirbyteStateBlob(updated_at="2024-06-01"), + ), + ), + AirbyteStateMessage( + type=AirbyteStateType.STREAM, + stream=AirbyteStreamState( + stream_descriptor=StreamDescriptor(name="ChildStream", namespace=None), + stream_state=AirbyteStateBlob( + use_global_cursor=False, + state={"updated_at": "2024-07-14"}, + states=[], + parent_state={"ParentStream": {"updated_at": "2024-06-01"}}, + lookback_window=0, + ), + ), + ), + ] + source = ConcurrentDeclarativeSource( + source_config=manifest, config=_CONFIG, catalog=None, state=state + ) + configured_catalog = create_configured_catalog(source, _CONFIG) + + all_messages = list( + source.read(logger=MagicMock(), config=_CONFIG, catalog=configured_catalog, state=state) + ) + + # No state message should reference ParentStream since it's not in the catalog + state_messages = [msg for msg in all_messages if msg.type == Type.STATE] + parent_state_messages = [ + msg + for msg in state_messages + if msg.state.stream.stream_descriptor.name == "ParentStream" + ] + assert len(parent_state_messages) == 0, ( + f"Expected no state messages for unconfigured ParentStream, " + f"but got {len(parent_state_messages)}: {parent_state_messages}" + ) diff --git a/unit_tests/sources/streams/concurrent/test_cursor.py b/unit_tests/sources/streams/concurrent/test_cursor.py index 34c92800d..13fe1df87 100644 --- a/unit_tests/sources/streams/concurrent/test_cursor.py +++ b/unit_tests/sources/streams/concurrent/test_cursor.py @@ -13,6 +13,7 @@ from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.message import MessageRepository +from airbyte_cdk.sources.streams import NO_CURSOR_STATE_KEY from airbyte_cdk.sources.streams.concurrent.clamping import ( ClampingEndProvider, ClampingStrategy, @@ -24,6 +25,7 @@ ConcurrentCursor, CursorField, CursorValueType, + FinalStateCursor, ) from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ( @@ -1387,3 +1389,25 @@ def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then ) == True ) + + +@freezegun.freeze_time("2024-07-15") +def test_final_state_cursor_get_cursor_datetime_from_state_returns_now_for_no_cursor_state(): + """FinalStateCursor returns now() for NO_CURSOR_STATE_KEY state, else None. + + When state has NO_CURSOR_STATE_KEY: True, it means the previous sync was a completed + full refresh. Returning now() indicates the cursor is "current" and within any + retention period, so we should use incremental sync. + """ + cursor = FinalStateCursor("test_stream", None, Mock(spec=MessageRepository)) + + result_with_no_cursor_key = cursor.get_cursor_datetime_from_state({NO_CURSOR_STATE_KEY: True}) + assert result_with_no_cursor_key == datetime(2024, 7, 15, tzinfo=timezone.utc) + + result_without_no_cursor_key = cursor.get_cursor_datetime_from_state( + {"some_other_key": "value"} + ) + assert result_without_no_cursor_key is None + + result_with_empty_state = cursor.get_cursor_datetime_from_state({}) + assert result_with_empty_state is None