From 1878d1be5548ac8d3ab5f07e750e1d24c5bb5a37 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 5 Feb 2026 20:40:32 +0000 Subject: [PATCH 1/5] feat(cdk): Add RecordExpander component for nested array extraction Co-Authored-By: sophie.cui@airbyte.io --- .../declarative_component_schema.yaml | 84 +++ .../sources/declarative/expanders/__init__.py | 10 + .../declarative/expanders/record_expander.py | 153 ++++++ .../declarative/extractors/dpath_extractor.py | 35 +- .../models/declarative_component_schema.py | 274 ++++++---- .../parsers/manifest_component_transformer.py | 1 + .../parsers/model_to_component_factory.py | 59 +++ .../extractors/test_dpath_extractor.py | 487 ++++++++++++++++++ 8 files changed, 1005 insertions(+), 98 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/expanders/__init__.py create mode 100644 airbyte_cdk/sources/declarative/expanders/record_expander.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index e68318cd4..ce338ac03 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1927,6 +1927,10 @@ definitions: - ["data", "records"] - ["data", "{{ parameters.name }}"] - ["data", "*", "record"] + record_expander: + title: Record Expander + description: Optional component to expand records by extracting items from nested array fields. + "$ref": "#/definitions/RecordExpander" $parameters: type: object additionalProperties: true @@ -1943,6 +1947,86 @@ definitions: $parameters: type: object additionalProperties: true + RecordExpander: + title: Record Expander + description: Expands records by extracting items from a nested array field. When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation. Supports wildcards (*) for matching multiple arrays. + type: object + required: + - type + - expand_records_from_field + properties: + type: + type: string + enum: [RecordExpander] + expand_records_from_field: + title: Expand Records From Field + description: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays. + type: array + items: + type: string + interpolation_context: + - config + examples: + - ["lines", "data"] + - ["items"] + - ["nested", "array"] + - ["sections", "*", "items"] + remain_original_record: + title: Remain Original Record + description: If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false. + type: boolean + default: false + on_no_records: + title: On No Records + description: Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged. + type: string + enum: + - skip + - emit_parent + default: skip + parent_fields_to_copy: + title: Parent Fields To Copy + description: List of parent field mappings to copy onto each expanded child record. Each mapping specifies a source path in the parent record and a target field name in the child record. + type: array + items: + "$ref": "#/definitions/ParentFieldMapping" + $parameters: + type: object + additionalProperties: true + ParentFieldMapping: + title: Parent Field Mapping + description: Defines a mapping from a parent record field to a child record field. + type: object + required: + - type + - source_field_path + - target_field + properties: + type: + type: string + enum: [ParentFieldMapping] + source_field_path: + title: Source Field Path + description: Path to the field in the parent record to copy. + type: array + items: + type: string + interpolation_context: + - config + examples: + - ["id"] + - ["created"] + - ["metadata", "timestamp"] + target_field: + title: Target Field + description: Name of the field in the child record where the value will be copied. + type: string + examples: + - "parent_id" + - "subscription_updated" + $parameters: + type: object + additionalProperties: true ExponentialBackoffStrategy: title: Exponential Backoff description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count. diff --git a/airbyte_cdk/sources/declarative/expanders/__init__.py b/airbyte_cdk/sources/declarative/expanders/__init__.py new file mode 100644 index 000000000..335d30df1 --- /dev/null +++ b/airbyte_cdk/sources/declarative/expanders/__init__.py @@ -0,0 +1,10 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.declarative.expanders.record_expander import ( + ParentFieldMapping, + RecordExpander, +) + +__all__ = ["ParentFieldMapping", "RecordExpander"] diff --git a/airbyte_cdk/sources/declarative/expanders/record_expander.py b/airbyte_cdk/sources/declarative/expanders/record_expander.py new file mode 100644 index 000000000..c46e1b75e --- /dev/null +++ b/airbyte_cdk/sources/declarative/expanders/record_expander.py @@ -0,0 +1,153 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from dataclasses import InitVar, dataclass, field +from typing import Any, Iterable, Mapping, MutableMapping + +import dpath + +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.types import Config + + +@dataclass +class ParentFieldMapping: + """Defines a mapping from a parent record field to a child record field.""" + + source_field_path: list[str | InterpolatedString] + target_field: str + config: Config + parameters: InitVar[Mapping[str, Any]] + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._source_path = [ + InterpolatedString.create(path, parameters=parameters) + for path in self.source_field_path + ] + + def copy_field( + self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any] + ) -> None: + """Copy a field from parent record to child record.""" + source_path = [path.eval(self.config) for path in self._source_path] + try: + value = dpath.get(dict(parent_record), source_path) + child_record[self.target_field] = value + except KeyError: + pass + + +@dataclass +class RecordExpander: + """Expands records by extracting items from a nested array field. + + When configured, this component extracts items from a specified nested array path + within each record and emits each item as a separate record. Optionally, the original + parent record can be embedded in each expanded item for context preservation. + + The expand_records_from_field path supports wildcards (*) for matching multiple arrays. + When wildcards are used, items from all matched arrays are extracted and emitted. + + Examples of instantiating this component: + ``` + record_expander: + type: RecordExpander + expand_records_from_field: + - "lines" + - "data" + remain_original_record: true + ``` + + ``` + record_expander: + type: RecordExpander + expand_records_from_field: + - "sections" + - "*" + - "items" + on_no_records: emit_parent + parent_fields_to_copy: + - type: ParentFieldMapping + source_field_path: ["id"] + target_field: "parent_id" + ``` + + Attributes: + expand_records_from_field: Path to a nested array field within each record. + Items from this array will be extracted and emitted as separate records. + Supports wildcards (*). + remain_original_record: If True, each expanded record will include the original + parent record in an "original_record" field. Defaults to False. + on_no_records: Behavior when expansion produces no records. "skip" (default) + emits nothing. "emit_parent" emits the original parent record unchanged. + parent_fields_to_copy: List of field mappings to copy from parent to each + expanded child record. + config: The user-provided configuration as specified by the source's spec. + """ + + expand_records_from_field: list[str | InterpolatedString] + config: Config + parameters: InitVar[Mapping[str, Any]] + remain_original_record: bool = False + on_no_records: str = "skip" + parent_fields_to_copy: list[ParentFieldMapping] = field(default_factory=list) + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._expand_path: list[InterpolatedString] | None = [ + InterpolatedString.create(path, parameters=parameters) + for path in self.expand_records_from_field + ] + + def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]: + """Expand a record by extracting items from a nested array field.""" + if not self._expand_path: + yield record + return + + parent_record = record + expand_path = [path.eval(self.config) for path in self._expand_path] + expanded_any = False + + if "*" in expand_path: + extracted: Any = dpath.values(parent_record, expand_path) + for record in extracted: + if isinstance(record, list): + for item in record: + if isinstance(item, dict): + expanded_record = dict(item) + self._apply_parent_context(parent_record, expanded_record) + yield expanded_record + expanded_any = True + else: + yield item + expanded_any = True + else: + try: + extracted = dpath.get(parent_record, expand_path) + except KeyError: + extracted = None + + if isinstance(extracted, list): + for item in extracted: + if isinstance(item, dict): + expanded_record = dict(item) + self._apply_parent_context(parent_record, expanded_record) + yield expanded_record + expanded_any = True + else: + yield item + expanded_any = True + + if not expanded_any and self.on_no_records == "emit_parent": + yield parent_record + + def _apply_parent_context( + self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any] + ) -> None: + """Apply parent context to a child record.""" + if self.remain_original_record: + child_record["original_record"] = parent_record + + for field_mapping in self.parent_fields_to_copy: + field_mapping.copy_field(parent_record, child_record) diff --git a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py index 9c97773e3..1d8831056 100644 --- a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py @@ -3,12 +3,13 @@ # from dataclasses import InitVar, dataclass, field -from typing import Any, Iterable, List, Mapping, MutableMapping, Union +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import dpath import requests from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder +from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.types import Config @@ -24,6 +25,11 @@ class DpathExtractor(RecordExtractor): If the field path points to an empty object, an empty array is returned. If the field path points to a non-existing path, an empty array is returned. + Optionally, records can be expanded by providing a RecordExpander component. + When record_expander is configured, each extracted record is passed through the + expander which extracts items from nested array fields and emits each item as a + separate record. + Examples of instantiating this transform: ``` extractor: @@ -47,16 +53,32 @@ class DpathExtractor(RecordExtractor): field_path: [] ``` + ``` + extractor: + type: DpathExtractor + field_path: + - "data" + - "object" + record_expander: + type: RecordExpander + expand_records_from_field: + - "lines" + - "data" + remain_original_record: true + ``` + Attributes: field_path (Union[InterpolatedString, str]): Path to the field that should be extracted config (Config): The user-provided configuration as specified by the source's spec decoder (Decoder): The decoder responsible to transfom the response in a Mapping + record_expander (Optional[RecordExpander]): Optional component to expand records by extracting items from nested array fields """ field_path: List[Union[InterpolatedString, str]] config: Config parameters: InitVar[Mapping[str, Any]] decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) + record_expander: Optional[RecordExpander] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._field_path = [ @@ -79,8 +101,15 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin else: extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure if isinstance(extracted, list): - yield from extracted + if not self.record_expander: + yield from extracted + else: + for record in extracted: + yield from self.record_expander.expand_record(record) elif extracted: - yield extracted + if self.record_expander: + yield from self.record_expander.expand_record(extracted) + else: + yield extracted else: yield from [] diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 5d2f0521f..b1b785ad4 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -484,24 +482,30 @@ class Config: ) -class DpathExtractor(BaseModel): - type: Literal["DpathExtractor"] - field_path: List[str] = Field( - ..., - description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', - examples=[ - ["data"], - ["data", "records"], - ["data", "{{ parameters.name }}"], - ["data", "*", "record"], - ], - title="Field Path", - ) +class ResponseToFileExtractor(BaseModel): + type: Literal["ResponseToFileExtractor"] parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class ResponseToFileExtractor(BaseModel): - type: Literal["ResponseToFileExtractor"] +class OnNoRecords(Enum): + skip = "skip" + emit_parent = "emit_parent" + + +class ParentFieldMapping(BaseModel): + type: Literal["ParentFieldMapping"] + source_field_path: List[str] = Field( + ..., + description="Path to the field in the parent record to copy.", + examples=[["id"], ["created"], ["metadata", "timestamp"]], + title="Source Field Path", + ) + target_field: str = Field( + ..., + description="Name of the field in the child record where the value will be copied.", + examples=["parent_id", "subscription_updated"], + title="Target Field", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -930,24 +934,28 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( + Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -965,7 +973,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1469,7 +1479,9 @@ class CustomConfigTransformation(BaseModel): class_name: str = Field( ..., description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_..`.", - examples=["source_declarative_manifest.components.MyCustomConfigTransformation"], + examples=[ + "source_declarative_manifest.components.MyCustomConfigTransformation" + ], ) parameters: Optional[Dict[str, Any]] = Field( None, @@ -1897,7 +1909,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -2048,6 +2062,37 @@ class DefaultPaginator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class RecordExpander(BaseModel): + type: Literal["RecordExpander"] + expand_records_from_field: List[str] = Field( + ..., + description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.", + examples=[ + ["lines", "data"], + ["items"], + ["nested", "array"], + ["sections", "*", "items"], + ], + title="Expand Records From Field", + ) + remain_original_record: Optional[bool] = Field( + False, + description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.', + title="Remain Original Record", + ) + on_no_records: Optional[OnNoRecords] = Field( + OnNoRecords.skip, + description='Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.', + title="On No Records", + ) + parent_fields_to_copy: Optional[List[ParentFieldMapping]] = Field( + None, + description="List of parent field mappings to copy onto each expanded child record. Each mapping specifies a source path in the parent record and a target field name in the child record.", + title="Parent Fields To Copy", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class SessionTokenRequestApiKeyAuthenticator(BaseModel): type: Literal["ApiKey"] inject_into: RequestOption = Field( @@ -2116,27 +2161,6 @@ class ListPartitionRouter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class RecordSelector(BaseModel): - type: Literal["RecordSelector"] - extractor: Union[DpathExtractor, CustomRecordExtractor] - record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( - None, - description="Responsible for filtering records to be emitted by the Source.", - title="Record Filter", - ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( - None, - description="Responsible for normalization according to the schema.", - title="Schema Normalization", - ) - transform_before_filtering: Optional[bool] = Field( - None, - description="If true, transformation will be applied before record filtering.", - title="Transform Before Filtering", - ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class PaginationReset(BaseModel): type: Literal["PaginationReset"] action: Action1 @@ -2166,10 +2190,12 @@ class DpathValidator(BaseModel): ], title="Field Path", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( - ..., - description="The condition that the specified config value will be evaluated against", - title="Validation Strategy", + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( + Field( + ..., + description="The condition that the specified config value will be evaluated against", + title="Validation Strategy", + ) ) @@ -2186,10 +2212,12 @@ class PredicateValidator(BaseModel): ], title="Value", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( - ..., - description="The validation strategy to apply to the value.", - title="Validation Strategy", + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( + Field( + ..., + description="The validation strategy to apply to the value.", + title="Validation Strategy", + ) ) @@ -2214,12 +2242,12 @@ class ConfigAddFields(BaseModel): class CompositeErrorHandler(BaseModel): type: Literal["CompositeErrorHandler"] - error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = ( - Field( - ..., - description="List of error handlers to iterate on to determine how to handle a failed response.", - title="Error Handlers", - ) + error_handlers: List[ + Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler] + ] = Field( + ..., + description="List of error handlers to iterate on to determine how to handle a failed response.", + title="Error Handlers", ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2257,6 +2285,27 @@ class Config: ) +class DpathExtractor(BaseModel): + type: Literal["DpathExtractor"] + field_path: List[str] = Field( + ..., + description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', + examples=[ + ["data"], + ["data", "records"], + ["data", "{{ parameters.name }}"], + ["data", "*", "record"], + ], + title="Field Path", + ) + record_expander: Optional[RecordExpander] = Field( + None, + description="Optional component to expand records by extracting items from nested array fields.", + title="Record Expander", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ZipfileDecoder(BaseModel): class Config: extra = Extra.allow @@ -2269,6 +2318,29 @@ class Config: ) +class RecordSelector(BaseModel): + type: Literal["RecordSelector"] + extractor: Union[DpathExtractor, CustomRecordExtractor] + record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( + None, + description="Responsible for filtering records to be emitted by the Source.", + title="Record Filter", + ) + schema_normalization: Optional[ + Union[SchemaNormalization, CustomSchemaNormalization] + ] = Field( + None, + description="Responsible for normalization according to the schema.", + title="Schema Normalization", + ) + transform_before_filtering: Optional[bool] = Field( + None, + description="If true, transformation will be applied before record filtering.", + title="Transform Before Filtering", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ConfigMigration(BaseModel): type: Literal["ConfigMigration"] description: Optional[str] = Field( @@ -2381,9 +2453,9 @@ class Config: type: Literal["DeclarativeSource"] check: Union[CheckStream, CheckDynamicStream] - streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = ( - None - ) + streams: Optional[ + List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]] + ] = None dynamic_streams: List[DynamicDeclarativeStream] version: str = Field( ..., @@ -2508,16 +2580,20 @@ class Config: extra = Extra.allow type: Literal["DeclarativeStream"] - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" + ) retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) ) primary_key: Optional[PrimaryKey] = Field("", title="Primary Key") schema_loader: Optional[ @@ -2691,18 +2767,20 @@ class HttpRequester(BaseModelWithDeprecations): description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.", title="Query Properties", ) - request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field( - None, - description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", - examples=[ - {"unit": "day"}, - { - "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' - }, - {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, - {"sort_by[asc]": "updated_at"}, - ], - title="Query Parameters", + request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = ( + Field( + None, + description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", + examples=[ + {"unit": "day"}, + { + "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' + }, + {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, + {"sort_by[asc]": "updated_at"}, + ], + title="Query Parameters", + ) ) request_headers: Optional[Union[Dict[str, str], str]] = Field( None, @@ -2874,7 +2952,9 @@ class QueryProperties(BaseModel): class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] - name: str = Field(..., description="The stream name.", example=["Users"], title="Name") + name: str = Field( + ..., description="The stream name.", example=["Users"], title="Name" + ) full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", @@ -2961,13 +3041,17 @@ class AsyncRetriever(BaseModel): status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field( + download_target_extractor: Optional[ + Union[DpathExtractor, CustomRecordExtractor] + ] = Field( None, description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor] - ] = Field(None, description="Responsible for fetching the records from provided urls.") + ] = Field( + None, description="Responsible for fetching the records from provided urls." + ) creation_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index 3ed86bf06..00b1a18ff 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -42,6 +42,7 @@ "DefaultPaginator.page_size_option": "RequestOption", # DpathExtractor "DpathExtractor.decoder": "JsonDecoder", + "DpathExtractor.record_expander": "RecordExpander", # HttpRequester "HttpRequester.error_handler": "DefaultErrorHandler", # ListPartitionRouter diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 2bd7d268d..53540f994 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -371,6 +371,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ParametrizedComponentsResolver as ParametrizedComponentsResolverModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + ParentFieldMapping as ParentFieldMappingModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ParentStreamConfig as ParentStreamConfigModel, ) @@ -392,6 +395,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( Rate as RateModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + RecordExpander as RecordExpanderModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( RecordFilter as RecordFilterModel, ) @@ -742,6 +748,8 @@ def _init_mappings(self) -> None: DefaultPaginatorModel: self.create_default_paginator, DpathExtractorModel: self.create_dpath_extractor, DpathValidatorModel: self.create_dpath_validator, + RecordExpanderModel: self.create_record_expander, + ParentFieldMappingModel: self.create_parent_field_mapping, ResponseToFileExtractorModel: self.create_response_to_file_extractor, ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy, SessionTokenAuthenticatorModel: self.create_session_token_authenticator, @@ -2371,11 +2379,62 @@ def create_dpath_extractor( else: decoder_to_use = JsonDecoder(parameters={}) model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path] + + record_expander = None + if hasattr(model, "record_expander") and model.record_expander: + record_expander = self._create_component_from_model( + model=model.record_expander, + config=config, + ) + return DpathExtractor( decoder=decoder_to_use, field_path=model_field_path, config=config, parameters=model.parameters or {}, + record_expander=record_expander, + ) + + def create_record_expander( + self, + model: RecordExpanderModel, + config: Config, + **kwargs: Any, + ) -> "RecordExpander": + from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander + + parent_fields_to_copy = [] + if model.parent_fields_to_copy: + for field_mapping_model in model.parent_fields_to_copy: + parent_fields_to_copy.append( + self._create_component_from_model( + model=field_mapping_model, + config=config, + ) + ) + + return RecordExpander( + expand_records_from_field=list(model.expand_records_from_field), + config=config, + parameters=model.parameters or {}, + remain_original_record=model.remain_original_record or False, + on_no_records=model.on_no_records.value if model.on_no_records else "skip", + parent_fields_to_copy=parent_fields_to_copy, + ) + + @staticmethod + def create_parent_field_mapping( + model: ParentFieldMappingModel, + config: Config, + **kwargs: Any, + ) -> "ParentFieldMapping": + from airbyte_cdk.sources.declarative.expanders.record_expander import ParentFieldMapping + + return ParentFieldMapping( + source_field_path=list(model.source_field_path), + target_field=model.target_field, + config=config, + parameters=model.parameters or {}, ) @staticmethod diff --git a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py index 05e586592..982f3cc49 100644 --- a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py +++ b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py @@ -14,6 +14,10 @@ IterableDecoder, JsonDecoder, ) +from airbyte_cdk.sources.declarative.expanders.record_expander import ( + ParentFieldMapping, + RecordExpander, +) from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor config = {"field": "record_array"} @@ -121,3 +125,486 @@ def test_dpath_extractor(field_path: List, decoder: Decoder, body, expected_reco actual_records = list(extractor.extract_records(response)) assert actual_records == expected_records + + +@pytest.mark.parametrize( + "field_path, expand_records_from_field, remain_original_record, body, expected_records", + [ + ( + ["data", "object"], + ["lines", "data"], + False, + { + "data": { + "object": { + "id": "in_123", + "created": 1234567890, + "lines": { + "data": [ + {"id": "il_1", "amount": 100}, + {"id": "il_2", "amount": 200}, + ] + }, + } + } + }, + [ + {"id": "il_1", "amount": 100}, + {"id": "il_2", "amount": 200}, + ], + ), + ( + ["data", "object"], + ["lines", "data"], + True, + { + "data": { + "object": { + "id": "in_123", + "created": 1234567890, + "lines": { + "data": [ + {"id": "il_1", "amount": 100}, + ] + }, + } + } + }, + [ + { + "id": "il_1", + "amount": 100, + "original_record": { + "id": "in_123", + "created": 1234567890, + "lines": {"data": [{"id": "il_1", "amount": 100}]}, + }, + }, + ], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1", "items": []}}, + [], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1"}}, + [], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1", "items": "not_an_array"}}, + [], + ), + ( + ["data"], + ["nested", "array"], + False, + { + "data": { + "id": "parent_1", + "nested": {"array": [{"id": "child_1"}, {"id": "child_2"}]}, + } + }, + [{"id": "child_1"}, {"id": "child_2"}], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1", "items": [1, 2, "string", {"id": "dict_item"}]}}, + [1, 2, "string", {"id": "dict_item"}], + ), + ( + [], + ["items"], + False, + [ + {"id": "parent_1", "items": [{"id": "child_1"}]}, + {"id": "parent_2", "items": [{"id": "child_2"}, {"id": "child_3"}]}, + ], + [{"id": "child_1"}, {"id": "child_2"}, {"id": "child_3"}], + ), + ( + ["data"], + ["sections", "*", "items"], + False, + { + "data": { + "sections": [ + {"name": "section1", "items": [{"id": "item_1"}, {"id": "item_2"}]}, + {"name": "section2", "items": [{"id": "item_3"}]}, + ] + } + }, + [{"id": "item_1"}, {"id": "item_2"}, {"id": "item_3"}], + ), + ( + ["data"], + ["sections", "*", "items"], + True, + { + "data": { + "sections": [ + {"name": "section1", "items": [{"id": "item_1"}]}, + ] + } + }, + [ + { + "id": "item_1", + "original_record": { + "sections": [ + {"name": "section1", "items": [{"id": "item_1"}]}, + ] + }, + } + ], + ), + ( + ["data"], + ["sections", "*", "items"], + False, + { + "data": { + "sections": [ + {"name": "section1", "items": []}, + {"name": "section2", "items": []}, + ] + } + }, + [], + ), + ( + ["data"], + ["sections", "*", "items"], + False, + { + "data": { + "sections": [ + {"name": "section1"}, + {"name": "section2", "items": "not_an_array"}, + ] + } + }, + [], + ), + ( + ["data"], + ["*", "items"], + False, + { + "data": { + "group1": {"items": [{"id": "item_1"}]}, + "group2": {"items": [{"id": "item_2"}, {"id": "item_3"}]}, + } + }, + [{"id": "item_1"}, {"id": "item_2"}, {"id": "item_3"}], + ), + ], + ids=[ + "test_expand_nested_array", + "test_expand_with_original_record", + "test_expand_empty_array_yields_nothing", + "test_expand_missing_path_yields_nothing", + "test_expand_non_array_yields_nothing", + "test_expand_deeply_nested_path", + "test_expand_mixed_types_in_array", + "test_expand_multiple_parent_records", + "test_expand_wildcard_multiple_lists", + "test_expand_wildcard_with_original_record", + "test_expand_wildcard_all_empty_arrays", + "test_expand_wildcard_no_list_matches", + "test_expand_wildcard_dict_values", + ], +) +def test_dpath_extractor_with_expansion( + field_path: List, + expand_records_from_field: List, + remain_original_record: bool, + body, + expected_records: List, +): + record_expander = RecordExpander( + expand_records_from_field=expand_records_from_field, + config=config, + parameters=parameters, + remain_original_record=remain_original_record, + ) + extractor = DpathExtractor( + field_path=field_path, + config=config, + decoder=decoder_json, + parameters=parameters, + record_expander=record_expander, + ) + + response = create_response(body) + actual_records = list(extractor.extract_records(response)) + + assert actual_records == expected_records + + +@pytest.mark.parametrize( + "field_path, expand_records_from_field, on_no_records, body, expected_records", + [ + pytest.param( + ["data"], + ["items"], + "skip", + {"data": {"id": "parent_1"}}, + [], + id="on_no_records_skip_missing_path", + ), + pytest.param( + ["data"], + ["items"], + "skip", + {"data": {"id": "parent_1", "items": []}}, + [], + id="on_no_records_skip_empty_array", + ), + pytest.param( + ["data"], + ["items"], + "emit_parent", + {"data": {"id": "parent_1"}}, + [{"id": "parent_1"}], + id="on_no_records_emit_parent_missing_path", + ), + pytest.param( + ["data"], + ["items"], + "emit_parent", + {"data": {"id": "parent_1", "items": []}}, + [{"id": "parent_1", "items": []}], + id="on_no_records_emit_parent_empty_array", + ), + pytest.param( + ["data"], + ["items"], + "emit_parent", + {"data": {"id": "parent_1", "items": "not_an_array"}}, + [{"id": "parent_1", "items": "not_an_array"}], + id="on_no_records_emit_parent_non_array", + ), + pytest.param( + ["data"], + ["items"], + "emit_parent", + {"data": {"id": "parent_1", "items": [{"id": "child_1"}]}}, + [{"id": "child_1"}], + id="on_no_records_emit_parent_has_items_extracts_normally", + ), + ], +) +def test_dpath_extractor_on_no_records( + field_path: List, + expand_records_from_field: List, + on_no_records: str, + body, + expected_records: List, +): + record_expander = RecordExpander( + expand_records_from_field=expand_records_from_field, + config=config, + parameters=parameters, + on_no_records=on_no_records, + ) + extractor = DpathExtractor( + field_path=field_path, + config=config, + decoder=decoder_json, + parameters=parameters, + record_expander=record_expander, + ) + + response = create_response(body) + actual_records = list(extractor.extract_records(response)) + + assert actual_records == expected_records + + +@pytest.mark.parametrize( + "field_path, expand_records_from_field, parent_fields_to_copy, body, expected_records", + [ + pytest.param( + ["data"], + ["items", "data"], + [ + ParentFieldMapping( + source_field_path=["id"], + target_field="parent_id", + config=config, + parameters=parameters, + ), + ], + { + "data": { + "id": "sub_123", + "created": 1234567890, + "items": {"data": [{"id": "si_1"}, {"id": "si_2"}]}, + } + }, + [ + {"id": "si_1", "parent_id": "sub_123"}, + {"id": "si_2", "parent_id": "sub_123"}, + ], + id="copy_single_parent_field", + ), + pytest.param( + ["data"], + ["items", "data"], + [ + ParentFieldMapping( + source_field_path=["id"], + target_field="subscription_id", + config=config, + parameters=parameters, + ), + ParentFieldMapping( + source_field_path=["created"], + target_field="subscription_updated", + config=config, + parameters=parameters, + ), + ], + { + "data": { + "id": "sub_123", + "created": 1234567890, + "items": {"data": [{"id": "si_1"}]}, + } + }, + [{"id": "si_1", "subscription_id": "sub_123", "subscription_updated": 1234567890}], + id="copy_multiple_parent_fields", + ), + pytest.param( + ["data"], + ["items", "data"], + [ + ParentFieldMapping( + source_field_path=["metadata", "timestamp"], + target_field="parent_timestamp", + config=config, + parameters=parameters, + ), + ], + { + "data": { + "id": "parent_1", + "metadata": {"timestamp": 9999}, + "items": {"data": [{"id": "child_1"}]}, + } + }, + [{"id": "child_1", "parent_timestamp": 9999}], + id="copy_nested_parent_field", + ), + pytest.param( + ["data"], + ["items", "data"], + [ + ParentFieldMapping( + source_field_path=["missing_field"], + target_field="should_not_exist", + config=config, + parameters=parameters, + ), + ], + {"data": {"id": "parent_1", "items": {"data": [{"id": "child_1"}]}}}, + [{"id": "child_1"}], + id="copy_missing_parent_field_ignored", + ), + ], +) +def test_dpath_extractor_parent_fields_to_copy( + field_path: List, + expand_records_from_field: List, + parent_fields_to_copy: List[ParentFieldMapping], + body, + expected_records: List, +): + record_expander = RecordExpander( + expand_records_from_field=expand_records_from_field, + config=config, + parameters=parameters, + parent_fields_to_copy=parent_fields_to_copy, + ) + extractor = DpathExtractor( + field_path=field_path, + config=config, + decoder=decoder_json, + parameters=parameters, + record_expander=record_expander, + ) + + response = create_response(body) + actual_records = list(extractor.extract_records(response)) + + assert actual_records == expected_records + + +@pytest.mark.parametrize( + "field_path, expand_records_from_field, remain_original_record, parent_fields_to_copy, body, expected_records", + [ + pytest.param( + ["data"], + ["items", "data"], + True, + [ + ParentFieldMapping( + source_field_path=["id"], + target_field="parent_id", + config=config, + parameters=parameters, + ), + ], + {"data": {"id": "parent_1", "items": {"data": [{"id": "child_1"}]}}}, + [ + { + "id": "child_1", + "parent_id": "parent_1", + "original_record": { + "id": "parent_1", + "items": {"data": [{"id": "child_1"}]}, + }, + } + ], + id="combine_remain_original_record_and_parent_fields_to_copy", + ), + ], +) +def test_dpath_extractor_combined_features( + field_path: List, + expand_records_from_field: List, + remain_original_record: bool, + parent_fields_to_copy: List[ParentFieldMapping], + body, + expected_records: List, +): + record_expander = RecordExpander( + expand_records_from_field=expand_records_from_field, + config=config, + parameters=parameters, + remain_original_record=remain_original_record, + parent_fields_to_copy=parent_fields_to_copy, + ) + extractor = DpathExtractor( + field_path=field_path, + config=config, + decoder=decoder_json, + parameters=parameters, + record_expander=record_expander, + ) + + response = create_response(body) + actual_records = list(extractor.extract_records(response)) + + assert actual_records == expected_records From 2fb489f573497288ef2aedc33fdb7246c5c4a65a Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 5 Feb 2026 20:42:41 +0000 Subject: [PATCH 2/5] style: fix ruff format on auto-generated model file Co-Authored-By: sophie.cui@airbyte.io --- .../models/declarative_component_schema.py | 144 +++++++----------- 1 file changed, 58 insertions(+), 86 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index b1b785ad4..89d349bb4 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -934,28 +934,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -973,9 +969,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1479,9 +1473,7 @@ class CustomConfigTransformation(BaseModel): class_name: str = Field( ..., description="Fully-qualified name of the class that will be implementing the custom config transformation. The format is `source_..`.", - examples=[ - "source_declarative_manifest.components.MyCustomConfigTransformation" - ], + examples=["source_declarative_manifest.components.MyCustomConfigTransformation"], ) parameters: Optional[Dict[str, Any]] = Field( None, @@ -1909,9 +1901,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -2190,12 +2180,10 @@ class DpathValidator(BaseModel): ], title="Field Path", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( - Field( - ..., - description="The condition that the specified config value will be evaluated against", - title="Validation Strategy", - ) + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( + ..., + description="The condition that the specified config value will be evaluated against", + title="Validation Strategy", ) @@ -2212,12 +2200,10 @@ class PredicateValidator(BaseModel): ], title="Value", ) - validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = ( - Field( - ..., - description="The validation strategy to apply to the value.", - title="Validation Strategy", - ) + validation_strategy: Union[ValidateAdheresToSchema, CustomValidationStrategy] = Field( + ..., + description="The validation strategy to apply to the value.", + title="Validation Strategy", ) @@ -2242,12 +2228,12 @@ class ConfigAddFields(BaseModel): class CompositeErrorHandler(BaseModel): type: Literal["CompositeErrorHandler"] - error_handlers: List[ - Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler] - ] = Field( - ..., - description="List of error handlers to iterate on to determine how to handle a failed response.", - title="Error Handlers", + error_handlers: List[Union[CompositeErrorHandler, DefaultErrorHandler, CustomErrorHandler]] = ( + Field( + ..., + description="List of error handlers to iterate on to determine how to handle a failed response.", + title="Error Handlers", + ) ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2326,9 +2312,7 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[ - Union[SchemaNormalization, CustomSchemaNormalization] - ] = Field( + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( None, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -2453,9 +2437,9 @@ class Config: type: Literal["DeclarativeSource"] check: Union[CheckStream, CheckDynamicStream] - streams: Optional[ - List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]] - ] = None + streams: Optional[List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]]] = ( + None + ) dynamic_streams: List[DynamicDeclarativeStream] version: str = Field( ..., @@ -2580,20 +2564,16 @@ class Config: extra = Extra.allow type: Literal["DeclarativeStream"] - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" - ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) + incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) primary_key: Optional[PrimaryKey] = Field("", title="Primary Key") schema_loader: Optional[ @@ -2767,20 +2747,18 @@ class HttpRequester(BaseModelWithDeprecations): description="For APIs that require explicit specification of the properties to query for, this component will take a static or dynamic set of properties (which can be optionally split into chunks) and allow them to be injected into an outbound request by accessing stream_partition.extra_fields.", title="Query Properties", ) - request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = ( - Field( - None, - description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", - examples=[ - {"unit": "day"}, - { - "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' - }, - {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, - {"sort_by[asc]": "updated_at"}, - ], - title="Query Parameters", - ) + request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field( + None, + description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", + examples=[ + {"unit": "day"}, + { + "query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"' + }, + {"searchIn": "{{ ','.join(config.get('search_in', [])) }}"}, + {"sort_by[asc]": "updated_at"}, + ], + title="Query Parameters", ) request_headers: Optional[Union[Dict[str, str], str]] = Field( None, @@ -2952,9 +2930,7 @@ class QueryProperties(BaseModel): class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] - name: str = Field( - ..., description="The stream name.", example=["Users"], title="Name" - ) + name: str = Field(..., description="The stream name.", example=["Users"], title="Name") full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", @@ -3041,17 +3017,13 @@ class AsyncRetriever(BaseModel): status_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field( ..., description="Responsible for fetching the actual status of the async job." ) - download_target_extractor: Optional[ - Union[DpathExtractor, CustomRecordExtractor] - ] = Field( + download_target_extractor: Optional[Union[DpathExtractor, CustomRecordExtractor]] = Field( None, description="Responsible for fetching the final result `urls` provided by the completed / finished / ready async job.", ) download_extractor: Optional[ Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[HttpRequester, CustomRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", From d5b06ee21a402573624afede6cf65c092ff91fbb Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 5 Feb 2026 20:47:26 +0000 Subject: [PATCH 3/5] fix: move imports to top-level for mypy compliance Co-Authored-By: sophie.cui@airbyte.io --- .../parsers/model_to_component_factory.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 53540f994..3605e7944 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -95,6 +95,10 @@ JsonParser, Parser, ) +from airbyte_cdk.sources.declarative.expanders.record_expander import ( + ParentFieldMapping, + RecordExpander, +) from airbyte_cdk.sources.declarative.extractors import ( DpathExtractor, RecordFilter, @@ -2400,10 +2404,8 @@ def create_record_expander( model: RecordExpanderModel, config: Config, **kwargs: Any, - ) -> "RecordExpander": - from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander - - parent_fields_to_copy = [] + ) -> RecordExpander: + parent_fields_to_copy: list[ParentFieldMapping] = [] if model.parent_fields_to_copy: for field_mapping_model in model.parent_fields_to_copy: parent_fields_to_copy.append( @@ -2427,9 +2429,7 @@ def create_parent_field_mapping( model: ParentFieldMappingModel, config: Config, **kwargs: Any, - ) -> "ParentFieldMapping": - from airbyte_cdk.sources.declarative.expanders.record_expander import ParentFieldMapping - + ) -> ParentFieldMapping: return ParentFieldMapping( source_field_path=list(model.source_field_path), target_field=model.target_field, From 0e324d9d4b89e67ec0ee1d9139baa08f2058b6ab Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Thu, 5 Feb 2026 20:54:09 +0000 Subject: [PATCH 4/5] fix: add explanatory comment for empty except clause Co-Authored-By: sophie.cui@airbyte.io --- airbyte_cdk/sources/declarative/expanders/record_expander.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/expanders/record_expander.py b/airbyte_cdk/sources/declarative/expanders/record_expander.py index c46e1b75e..92e4b3c0b 100644 --- a/airbyte_cdk/sources/declarative/expanders/record_expander.py +++ b/airbyte_cdk/sources/declarative/expanders/record_expander.py @@ -35,7 +35,7 @@ def copy_field( value = dpath.get(dict(parent_record), source_path) child_record[self.target_field] = value except KeyError: - pass + pass # Missing source fields in parent record are expected and intentionally skipped @dataclass From 87808fdffbe10253fef27e57179f39668a4fc26b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Fri, 13 Feb 2026 21:36:49 +0000 Subject: [PATCH 5/5] refactor: rename loop variable to avoid shadowing method parameter Co-Authored-By: sophie.cui@airbyte.io --- .../sources/declarative/expanders/record_expander.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/expanders/record_expander.py b/airbyte_cdk/sources/declarative/expanders/record_expander.py index 92e4b3c0b..0caea9a34 100644 --- a/airbyte_cdk/sources/declarative/expanders/record_expander.py +++ b/airbyte_cdk/sources/declarative/expanders/record_expander.py @@ -111,9 +111,9 @@ def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMap if "*" in expand_path: extracted: Any = dpath.values(parent_record, expand_path) - for record in extracted: - if isinstance(record, list): - for item in record: + for extracted_value in extracted: + if isinstance(extracted_value, list): + for item in extracted_value: if isinstance(item, dict): expanded_record = dict(item) self._apply_parent_context(parent_record, expanded_record)