-
Notifications
You must be signed in to change notification settings - Fork 40
feat(cdk): Add cursor age validation to StateDelegatingStream #890
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
cf01a75
67bc5c8
45772f4
1edeedd
61d8d5d
21da112
0e33418
324344f
da8a5a5
37e046e
dceb70d
c14f963
86d5ea6
567ca7a
be72c5c
2b54cc5
f199583
fbda39f
a2d4b56
1defe9e
a017dff
d3e76d4
d31c26b
653022b
43dc47e
1531b39
b4c24c6
67f9e60
8608b5f
8faa0ae
ea7a757
714c667
16a895e
bddc671
8828eea
6b65b7a
17f857a
1163395
acd7156
8afe8e1
2a4f385
e4f71ff
9340d3c
e021f58
1dcc8ab
6d95923
a3a2073
020d2f5
21bb2a9
2a2459d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,6 +11,7 @@ | |
| import re | ||
| from functools import partial | ||
| from typing import ( | ||
| TYPE_CHECKING, | ||
| Any, | ||
| Callable, | ||
| Dict, | ||
|
|
@@ -27,6 +28,11 @@ | |
| get_type_hints, | ||
| ) | ||
|
|
||
| if TYPE_CHECKING: | ||
| from airbyte_cdk.legacy.sources.declarative.incremental.datetime_based_cursor import ( | ||
| DatetimeBasedCursor, | ||
| ) | ||
|
|
||
| from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream | ||
| from isodate import parse_duration | ||
| from pydantic.v1 import BaseModel | ||
|
|
@@ -3548,7 +3554,6 @@ def create_state_delegating_stream( | |
| self, | ||
| model: StateDelegatingStreamModel, | ||
| config: Config, | ||
| has_parent_state: Optional[bool] = None, | ||
| **kwargs: Any, | ||
| ) -> DefaultStream: | ||
| if ( | ||
|
|
@@ -3559,18 +3564,105 @@ def create_state_delegating_stream( | |
| f"state_delegating_stream, full_refresh_stream name and incremental_stream must have equal names. Instead has {model.name}, {model.full_refresh_stream.name} and {model.incremental_stream.name}." | ||
| ) | ||
|
|
||
| stream_model = self._get_state_delegating_stream_model( | ||
| False if has_parent_state is None else has_parent_state, model | ||
| ) | ||
| if model.api_retention_period: | ||
| for stream_model in (model.full_refresh_stream, model.incremental_stream): | ||
| if isinstance(stream_model.incremental_sync, IncrementingCountCursorModel): | ||
| raise ValueError( | ||
| f"Stream '{model.name}' uses IncrementingCountCursor which is not supported " | ||
| f"with api_retention_period. IncrementingCountCursor does not use datetime-based " | ||
| f"cursors, so cursor age validation cannot be performed." | ||
| ) | ||
|
|
||
| stream_state = self._connector_state_manager.get_stream_state(model.name, None) | ||
|
|
||
| if not stream_state: | ||
| return self._create_component_from_model( # type: ignore[no-any-return] | ||
| model.full_refresh_stream, config=config, **kwargs | ||
| ) | ||
|
|
||
| incremental_stream: DefaultStream = self._create_component_from_model( | ||
| model.incremental_stream, config=config, **kwargs | ||
| ) # type: ignore[assignment] | ||
|
|
||
| # Only run cursor age validation for streams that are in the configured | ||
| # catalog (or when no catalog was provided, e.g. during discover / connector | ||
| # builder). Streams not selected by the user but instantiated as parent-stream | ||
| # dependencies must not go through this path because it emits state messages | ||
| # that the destination does not know about, causing "Stream not found" crashes. | ||
| stream_is_in_catalog = ( | ||
| not self._stream_name_to_configured_stream # no catalog → validate by default | ||
| or model.name in self._stream_name_to_configured_stream | ||
| ) | ||
| if model.api_retention_period and stream_is_in_catalog: | ||
| full_refresh_stream: DefaultStream = self._create_component_from_model( | ||
| model.full_refresh_stream, config=config, **kwargs | ||
| ) # type: ignore[assignment] | ||
| if self._is_cursor_older_than_retention_period( | ||
| stream_state, | ||
| full_refresh_stream.cursor, | ||
| incremental_stream.cursor, | ||
| model.api_retention_period, | ||
| model.name, | ||
| ): | ||
| self._connector_state_manager.update_state_for_stream(model.name, None, {}) | ||
| state_message = self._connector_state_manager.create_state_message(model.name, None) | ||
| self._message_repository.emit_message(state_message) | ||
| return full_refresh_stream | ||
|
|
||
| return incremental_stream | ||
|
|
||
| @staticmethod | ||
| def _is_cursor_older_than_retention_period( | ||
| stream_state: Mapping[str, Any], | ||
| full_refresh_cursor: Cursor, | ||
| incremental_cursor: Cursor, | ||
| api_retention_period: str, | ||
| stream_name: str, | ||
| ) -> bool: | ||
| """Check if the cursor value in the state is older than the API's retention period. | ||
|
|
||
| Checks cursors in sequence: full refresh cursor first, then incremental cursor. | ||
| FinalStateCursor returns now() for completed full refresh state (NO_CURSOR_STATE_KEY), | ||
| which is always within retention, so we use incremental. For other states, it returns | ||
| None and we fall back to checking the incremental cursor. | ||
|
|
||
| Returns True if the cursor is older than the retention period (should use full refresh). | ||
| Returns False if the cursor is within the retention period (safe to use incremental). | ||
| """ | ||
| retention_duration = parse_duration(api_retention_period) | ||
| retention_cutoff = datetime.datetime.now(datetime.timezone.utc) - retention_duration | ||
|
|
||
| # Check full refresh cursor first | ||
| cursor_datetime = full_refresh_cursor.get_cursor_datetime_from_state(stream_state) | ||
|
|
||
| # If full refresh cursor returns None, check incremental cursor | ||
| if cursor_datetime is None: | ||
| cursor_datetime = incremental_cursor.get_cursor_datetime_from_state(stream_state) | ||
|
|
||
| if cursor_datetime is None: | ||
| # Neither cursor could parse the state - fall back to full refresh to be safe | ||
| return True | ||
|
|
||
| return self._create_component_from_model(stream_model, config=config, **kwargs) # type: ignore[no-any-return] # DeclarativeStream will be created as stream_model is alwyas DeclarativeStreamModel | ||
| if cursor_datetime < retention_cutoff: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So for the case where we have a state delegating stream where the full refresh implementation has no cursor and therefore has a And if that is the case, because of how the code is written, my worry is that because we only ever evaluate the FinalStateCursor and short circuit before we check incremental, we will always use the Anatolii Yatsuk (@tolik0) I might not be understand this flow right because i think it is written in a fairly overcomplicated way, but just want to check my understanding against this condition
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding is that FinalCursor emits the state only at the end of the sync. So, if we have a state from a full refresh, we switch to incremental. If the incremental state is outdated, we switch back to the FinalCursor to retrieve all records.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, your understanding is correct! The flow is:
The current implementation handles this via:
Note: This comment is on an older version of the code. The latest version (per brianjlai's feedback) simplified the logic to only check the incremental cursor directly, since FinalStateCursor doesn't have a meaningful cursor datetime anyway. |
||
| logging.warning( | ||
| f"Stream '{stream_name}' has a cursor value older than " | ||
| f"the API's retention period of {api_retention_period} " | ||
| f"(cutoff: {retention_cutoff.isoformat()}). " | ||
| f"Falling back to full refresh to avoid data loss." | ||
| ) | ||
| return True | ||
|
|
||
| return False | ||
|
|
||
| def _get_state_delegating_stream_model( | ||
| self, has_parent_state: bool, model: StateDelegatingStreamModel | ||
| self, | ||
| model: StateDelegatingStreamModel, | ||
| parent_state: Optional[Mapping[str, Any]] = None, | ||
| ) -> DeclarativeStreamModel: | ||
| """Return the appropriate underlying stream model based on state.""" | ||
| return ( | ||
| model.incremental_stream | ||
| if self._connector_state_manager.get_stream_state(model.name, None) or has_parent_state | ||
| if self._connector_state_manager.get_stream_state(model.name, None) or parent_state | ||
| else model.full_refresh_stream | ||
| ) | ||
|
|
||
|
|
@@ -3901,17 +3993,13 @@ def create_substream_partition_router( | |
| def create_parent_stream_config_with_substream_wrapper( | ||
| self, model: ParentStreamConfigModel, config: Config, *, stream_name: str, **kwargs: Any | ||
| ) -> Any: | ||
| # getting the parent state | ||
| child_state = self._connector_state_manager.get_stream_state(stream_name, None) | ||
|
|
||
| # This flag will be used exclusively for StateDelegatingStream when a parent stream is created | ||
| has_parent_state = bool( | ||
| self._connector_state_manager.get_stream_state(stream_name, None) | ||
| if model.incremental_dependency | ||
| else False | ||
| parent_state: Optional[Mapping[str, Any]] = ( | ||
| child_state if model.incremental_dependency and child_state else None | ||
| ) | ||
| connector_state_manager = self._instantiate_parent_stream_state_manager( | ||
| child_state, config, model, has_parent_state | ||
| child_state, config, model, parent_state | ||
| ) | ||
|
|
||
| substream_factory = ModelToComponentFactory( | ||
|
|
@@ -3943,7 +4031,7 @@ def _instantiate_parent_stream_state_manager( | |
| child_state: MutableMapping[str, Any], | ||
| config: Config, | ||
| model: ParentStreamConfigModel, | ||
| has_parent_state: bool, | ||
| parent_state: Optional[Mapping[str, Any]] = None, | ||
| ) -> ConnectorStateManager: | ||
| """ | ||
| With DefaultStream, the state needs to be provided during __init__ of the cursor as opposed to the | ||
|
|
@@ -3955,36 +4043,33 @@ def _instantiate_parent_stream_state_manager( | |
| """ | ||
| if model.incremental_dependency and child_state: | ||
| parent_stream_name = model.stream.name or "" | ||
| parent_state = ConcurrentPerPartitionCursor.get_parent_state( | ||
| extracted_parent_state = ConcurrentPerPartitionCursor.get_parent_state( | ||
| child_state, parent_stream_name | ||
| ) | ||
|
|
||
| if not parent_state: | ||
| # there are two migration cases: state value from child stream or from global state | ||
| parent_state = ConcurrentPerPartitionCursor.get_global_state( | ||
| if not extracted_parent_state: | ||
| extracted_parent_state = ConcurrentPerPartitionCursor.get_global_state( | ||
| child_state, parent_stream_name | ||
| ) | ||
|
|
||
| if not parent_state and not isinstance(parent_state, dict): | ||
| if not extracted_parent_state and not isinstance(extracted_parent_state, dict): | ||
| cursor_values = child_state.values() | ||
| if cursor_values and len(cursor_values) == 1: | ||
| # We assume the child state is a pair `{<cursor_field>: <cursor_value>}` and we will use the | ||
| # cursor value as a parent state. | ||
| incremental_sync_model: Union[ | ||
| DatetimeBasedCursorModel, | ||
| IncrementingCountCursorModel, | ||
| ] = ( | ||
| model.stream.incremental_sync # type: ignore # if we are there, it is because there is incremental_dependency and therefore there is an incremental_sync on the parent stream | ||
| if isinstance(model.stream, DeclarativeStreamModel) | ||
| else self._get_state_delegating_stream_model( | ||
| has_parent_state, model.stream | ||
| model.stream, parent_state=parent_state | ||
| ).incremental_sync | ||
| ) | ||
| cursor_field = InterpolatedString.create( | ||
| incremental_sync_model.cursor_field, | ||
| parameters=incremental_sync_model.parameters or {}, | ||
| ).eval(config) | ||
| parent_state = AirbyteStateMessage( | ||
| extracted_parent_state = AirbyteStateMessage( | ||
| type=AirbyteStateType.STREAM, | ||
| stream=AirbyteStreamState( | ||
| stream_descriptor=StreamDescriptor( | ||
|
|
@@ -3995,7 +4080,7 @@ def _instantiate_parent_stream_state_manager( | |
| ), | ||
| ), | ||
| ) | ||
| return ConnectorStateManager([parent_state] if parent_state else []) | ||
| return ConnectorStateManager([extracted_parent_state] if extracted_parent_state else []) | ||
|
|
||
| return ConnectorStateManager([]) | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flow should be like this:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the detailed flow! This is on an older version of the code. The current implementation (per brianjlai's feedback) simplified to only check the incremental cursor, with
NO_CURSOR_STATE_KEYhandled as an early return.Your suggested flow would require checking both cursors in sequence:
This is a different approach than brianjlai's simplification. I've replied to your other comment asking for alignment between you and brianjlai on the preferred approach.
Devin session
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The final cursor is fixed; now implement this flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for confirming the FinalStateCursor changes are good!
I believe I've already implemented the flow you described in commit e4f71ff:
now()forNO_CURSOR_STATE_KEY: Truestate, otherwiseNoneNoneHowever, I kept an early return for
NO_CURSOR_STATE_KEYin_is_cursor_older_than_retention_periodbecause thefull_refresh_stream.cursoris actually aDatetimeBasedCursor(orConcurrentCursor), not aFinalStateCursor. This is because the manifest defines aDatetimeBasedCursorin theincremental_syncfield of thefull_refresh_stream.Could you clarify if you want me to:
NO_CURSOR_STATE_KEY+ sequential cursor checking)FinalStateCursordirectly for the retention check (would require creating aFinalStateCursorinstance explicitly rather than usingfull_refresh_stream.cursor)All 13 tests pass locally with the current implementation.
Devin session