diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index e68318cd4..ce338ac03 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1927,6 +1927,10 @@ definitions: - ["data", "records"] - ["data", "{{ parameters.name }}"] - ["data", "*", "record"] + record_expander: + title: Record Expander + description: Optional component to expand records by extracting items from nested array fields. + "$ref": "#/definitions/RecordExpander" $parameters: type: object additionalProperties: true @@ -1943,6 +1947,86 @@ definitions: $parameters: type: object additionalProperties: true + RecordExpander: + title: Record Expander + description: Expands records by extracting items from a nested array field. When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation. Supports wildcards (*) for matching multiple arrays. + type: object + required: + - type + - expand_records_from_field + properties: + type: + type: string + enum: [RecordExpander] + expand_records_from_field: + title: Expand Records From Field + description: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays. + type: array + items: + type: string + interpolation_context: + - config + examples: + - ["lines", "data"] + - ["items"] + - ["nested", "array"] + - ["sections", "*", "items"] + remain_original_record: + title: Remain Original Record + description: If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false. + type: boolean + default: false + on_no_records: + title: On No Records + description: Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged. + type: string + enum: + - skip + - emit_parent + default: skip + parent_fields_to_copy: + title: Parent Fields To Copy + description: List of parent field mappings to copy onto each expanded child record. Each mapping specifies a source path in the parent record and a target field name in the child record. + type: array + items: + "$ref": "#/definitions/ParentFieldMapping" + $parameters: + type: object + additionalProperties: true + ParentFieldMapping: + title: Parent Field Mapping + description: Defines a mapping from a parent record field to a child record field. + type: object + required: + - type + - source_field_path + - target_field + properties: + type: + type: string + enum: [ParentFieldMapping] + source_field_path: + title: Source Field Path + description: Path to the field in the parent record to copy. + type: array + items: + type: string + interpolation_context: + - config + examples: + - ["id"] + - ["created"] + - ["metadata", "timestamp"] + target_field: + title: Target Field + description: Name of the field in the child record where the value will be copied. + type: string + examples: + - "parent_id" + - "subscription_updated" + $parameters: + type: object + additionalProperties: true ExponentialBackoffStrategy: title: Exponential Backoff description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count. diff --git a/airbyte_cdk/sources/declarative/expanders/__init__.py b/airbyte_cdk/sources/declarative/expanders/__init__.py new file mode 100644 index 000000000..335d30df1 --- /dev/null +++ b/airbyte_cdk/sources/declarative/expanders/__init__.py @@ -0,0 +1,10 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.declarative.expanders.record_expander import ( + ParentFieldMapping, + RecordExpander, +) + +__all__ = ["ParentFieldMapping", "RecordExpander"] diff --git a/airbyte_cdk/sources/declarative/expanders/record_expander.py b/airbyte_cdk/sources/declarative/expanders/record_expander.py new file mode 100644 index 000000000..0caea9a34 --- /dev/null +++ b/airbyte_cdk/sources/declarative/expanders/record_expander.py @@ -0,0 +1,153 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +from dataclasses import InitVar, dataclass, field +from typing import Any, Iterable, Mapping, MutableMapping + +import dpath + +from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString +from airbyte_cdk.sources.types import Config + + +@dataclass +class ParentFieldMapping: + """Defines a mapping from a parent record field to a child record field.""" + + source_field_path: list[str | InterpolatedString] + target_field: str + config: Config + parameters: InitVar[Mapping[str, Any]] + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._source_path = [ + InterpolatedString.create(path, parameters=parameters) + for path in self.source_field_path + ] + + def copy_field( + self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any] + ) -> None: + """Copy a field from parent record to child record.""" + source_path = [path.eval(self.config) for path in self._source_path] + try: + value = dpath.get(dict(parent_record), source_path) + child_record[self.target_field] = value + except KeyError: + pass # Missing source fields in parent record are expected and intentionally skipped + + +@dataclass +class RecordExpander: + """Expands records by extracting items from a nested array field. + + When configured, this component extracts items from a specified nested array path + within each record and emits each item as a separate record. Optionally, the original + parent record can be embedded in each expanded item for context preservation. + + The expand_records_from_field path supports wildcards (*) for matching multiple arrays. + When wildcards are used, items from all matched arrays are extracted and emitted. + + Examples of instantiating this component: + ``` + record_expander: + type: RecordExpander + expand_records_from_field: + - "lines" + - "data" + remain_original_record: true + ``` + + ``` + record_expander: + type: RecordExpander + expand_records_from_field: + - "sections" + - "*" + - "items" + on_no_records: emit_parent + parent_fields_to_copy: + - type: ParentFieldMapping + source_field_path: ["id"] + target_field: "parent_id" + ``` + + Attributes: + expand_records_from_field: Path to a nested array field within each record. + Items from this array will be extracted and emitted as separate records. + Supports wildcards (*). + remain_original_record: If True, each expanded record will include the original + parent record in an "original_record" field. Defaults to False. + on_no_records: Behavior when expansion produces no records. "skip" (default) + emits nothing. "emit_parent" emits the original parent record unchanged. + parent_fields_to_copy: List of field mappings to copy from parent to each + expanded child record. + config: The user-provided configuration as specified by the source's spec. + """ + + expand_records_from_field: list[str | InterpolatedString] + config: Config + parameters: InitVar[Mapping[str, Any]] + remain_original_record: bool = False + on_no_records: str = "skip" + parent_fields_to_copy: list[ParentFieldMapping] = field(default_factory=list) + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._expand_path: list[InterpolatedString] | None = [ + InterpolatedString.create(path, parameters=parameters) + for path in self.expand_records_from_field + ] + + def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]: + """Expand a record by extracting items from a nested array field.""" + if not self._expand_path: + yield record + return + + parent_record = record + expand_path = [path.eval(self.config) for path in self._expand_path] + expanded_any = False + + if "*" in expand_path: + extracted: Any = dpath.values(parent_record, expand_path) + for extracted_value in extracted: + if isinstance(extracted_value, list): + for item in extracted_value: + if isinstance(item, dict): + expanded_record = dict(item) + self._apply_parent_context(parent_record, expanded_record) + yield expanded_record + expanded_any = True + else: + yield item + expanded_any = True + else: + try: + extracted = dpath.get(parent_record, expand_path) + except KeyError: + extracted = None + + if isinstance(extracted, list): + for item in extracted: + if isinstance(item, dict): + expanded_record = dict(item) + self._apply_parent_context(parent_record, expanded_record) + yield expanded_record + expanded_any = True + else: + yield item + expanded_any = True + + if not expanded_any and self.on_no_records == "emit_parent": + yield parent_record + + def _apply_parent_context( + self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any] + ) -> None: + """Apply parent context to a child record.""" + if self.remain_original_record: + child_record["original_record"] = parent_record + + for field_mapping in self.parent_fields_to_copy: + field_mapping.copy_field(parent_record, child_record) diff --git a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py index 9c97773e3..1d8831056 100644 --- a/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py +++ b/airbyte_cdk/sources/declarative/extractors/dpath_extractor.py @@ -3,12 +3,13 @@ # from dataclasses import InitVar, dataclass, field -from typing import Any, Iterable, List, Mapping, MutableMapping, Union +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union import dpath import requests from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder +from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.types import Config @@ -24,6 +25,11 @@ class DpathExtractor(RecordExtractor): If the field path points to an empty object, an empty array is returned. If the field path points to a non-existing path, an empty array is returned. + Optionally, records can be expanded by providing a RecordExpander component. + When record_expander is configured, each extracted record is passed through the + expander which extracts items from nested array fields and emits each item as a + separate record. + Examples of instantiating this transform: ``` extractor: @@ -47,16 +53,32 @@ class DpathExtractor(RecordExtractor): field_path: [] ``` + ``` + extractor: + type: DpathExtractor + field_path: + - "data" + - "object" + record_expander: + type: RecordExpander + expand_records_from_field: + - "lines" + - "data" + remain_original_record: true + ``` + Attributes: field_path (Union[InterpolatedString, str]): Path to the field that should be extracted config (Config): The user-provided configuration as specified by the source's spec decoder (Decoder): The decoder responsible to transfom the response in a Mapping + record_expander (Optional[RecordExpander]): Optional component to expand records by extracting items from nested array fields """ field_path: List[Union[InterpolatedString, str]] config: Config parameters: InitVar[Mapping[str, Any]] decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) + record_expander: Optional[RecordExpander] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._field_path = [ @@ -79,8 +101,15 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin else: extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure if isinstance(extracted, list): - yield from extracted + if not self.record_expander: + yield from extracted + else: + for record in extracted: + yield from self.record_expander.expand_record(record) elif extracted: - yield extracted + if self.record_expander: + yield from self.record_expander.expand_record(extracted) + else: + yield extracted else: yield from [] diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 5d2f0521f..89d349bb4 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -484,24 +482,30 @@ class Config: ) -class DpathExtractor(BaseModel): - type: Literal["DpathExtractor"] - field_path: List[str] = Field( - ..., - description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', - examples=[ - ["data"], - ["data", "records"], - ["data", "{{ parameters.name }}"], - ["data", "*", "record"], - ], - title="Field Path", - ) +class ResponseToFileExtractor(BaseModel): + type: Literal["ResponseToFileExtractor"] parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class ResponseToFileExtractor(BaseModel): - type: Literal["ResponseToFileExtractor"] +class OnNoRecords(Enum): + skip = "skip" + emit_parent = "emit_parent" + + +class ParentFieldMapping(BaseModel): + type: Literal["ParentFieldMapping"] + source_field_path: List[str] = Field( + ..., + description="Path to the field in the parent record to copy.", + examples=[["id"], ["created"], ["metadata", "timestamp"]], + title="Source Field Path", + ) + target_field: str = Field( + ..., + description="Name of the field in the child record where the value will be copied.", + examples=["parent_id", "subscription_updated"], + title="Target Field", + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2048,6 +2052,37 @@ class DefaultPaginator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class RecordExpander(BaseModel): + type: Literal["RecordExpander"] + expand_records_from_field: List[str] = Field( + ..., + description="Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.", + examples=[ + ["lines", "data"], + ["items"], + ["nested", "array"], + ["sections", "*", "items"], + ], + title="Expand Records From Field", + ) + remain_original_record: Optional[bool] = Field( + False, + description='If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.', + title="Remain Original Record", + ) + on_no_records: Optional[OnNoRecords] = Field( + OnNoRecords.skip, + description='Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.', + title="On No Records", + ) + parent_fields_to_copy: Optional[List[ParentFieldMapping]] = Field( + None, + description="List of parent field mappings to copy onto each expanded child record. Each mapping specifies a source path in the parent record and a target field name in the child record.", + title="Parent Fields To Copy", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class SessionTokenRequestApiKeyAuthenticator(BaseModel): type: Literal["ApiKey"] inject_into: RequestOption = Field( @@ -2116,27 +2151,6 @@ class ListPartitionRouter(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class RecordSelector(BaseModel): - type: Literal["RecordSelector"] - extractor: Union[DpathExtractor, CustomRecordExtractor] - record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( - None, - description="Responsible for filtering records to be emitted by the Source.", - title="Record Filter", - ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( - None, - description="Responsible for normalization according to the schema.", - title="Schema Normalization", - ) - transform_before_filtering: Optional[bool] = Field( - None, - description="If true, transformation will be applied before record filtering.", - title="Transform Before Filtering", - ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class PaginationReset(BaseModel): type: Literal["PaginationReset"] action: Action1 @@ -2257,6 +2271,27 @@ class Config: ) +class DpathExtractor(BaseModel): + type: Literal["DpathExtractor"] + field_path: List[str] = Field( + ..., + description='List of potentially nested fields describing the full path of the field to extract. Use "*" to extract all values from an array. See more info in the [docs](https://docs.airbyte.com/connector-development/config-based/understanding-the-yaml-file/record-selector).', + examples=[ + ["data"], + ["data", "records"], + ["data", "{{ parameters.name }}"], + ["data", "*", "record"], + ], + title="Field Path", + ) + record_expander: Optional[RecordExpander] = Field( + None, + description="Optional component to expand records by extracting items from nested array fields.", + title="Record Expander", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ZipfileDecoder(BaseModel): class Config: extra = Extra.allow @@ -2269,6 +2304,27 @@ class Config: ) +class RecordSelector(BaseModel): + type: Literal["RecordSelector"] + extractor: Union[DpathExtractor, CustomRecordExtractor] + record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( + None, + description="Responsible for filtering records to be emitted by the Source.", + title="Record Filter", + ) + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( + None, + description="Responsible for normalization according to the schema.", + title="Schema Normalization", + ) + transform_before_filtering: Optional[bool] = Field( + None, + description="If true, transformation will be applied before record filtering.", + title="Transform Before Filtering", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class ConfigMigration(BaseModel): type: Literal["ConfigMigration"] description: Optional[str] = Field( diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index 3ed86bf06..00b1a18ff 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -42,6 +42,7 @@ "DefaultPaginator.page_size_option": "RequestOption", # DpathExtractor "DpathExtractor.decoder": "JsonDecoder", + "DpathExtractor.record_expander": "RecordExpander", # HttpRequester "HttpRequester.error_handler": "DefaultErrorHandler", # ListPartitionRouter diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 2bd7d268d..3605e7944 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -95,6 +95,10 @@ JsonParser, Parser, ) +from airbyte_cdk.sources.declarative.expanders.record_expander import ( + ParentFieldMapping, + RecordExpander, +) from airbyte_cdk.sources.declarative.extractors import ( DpathExtractor, RecordFilter, @@ -371,6 +375,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ParametrizedComponentsResolver as ParametrizedComponentsResolverModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + ParentFieldMapping as ParentFieldMappingModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ParentStreamConfig as ParentStreamConfigModel, ) @@ -392,6 +399,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( Rate as RateModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + RecordExpander as RecordExpanderModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( RecordFilter as RecordFilterModel, ) @@ -742,6 +752,8 @@ def _init_mappings(self) -> None: DefaultPaginatorModel: self.create_default_paginator, DpathExtractorModel: self.create_dpath_extractor, DpathValidatorModel: self.create_dpath_validator, + RecordExpanderModel: self.create_record_expander, + ParentFieldMappingModel: self.create_parent_field_mapping, ResponseToFileExtractorModel: self.create_response_to_file_extractor, ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy, SessionTokenAuthenticatorModel: self.create_session_token_authenticator, @@ -2371,11 +2383,58 @@ def create_dpath_extractor( else: decoder_to_use = JsonDecoder(parameters={}) model_field_path: List[Union[InterpolatedString, str]] = [x for x in model.field_path] + + record_expander = None + if hasattr(model, "record_expander") and model.record_expander: + record_expander = self._create_component_from_model( + model=model.record_expander, + config=config, + ) + return DpathExtractor( decoder=decoder_to_use, field_path=model_field_path, config=config, parameters=model.parameters or {}, + record_expander=record_expander, + ) + + def create_record_expander( + self, + model: RecordExpanderModel, + config: Config, + **kwargs: Any, + ) -> RecordExpander: + parent_fields_to_copy: list[ParentFieldMapping] = [] + if model.parent_fields_to_copy: + for field_mapping_model in model.parent_fields_to_copy: + parent_fields_to_copy.append( + self._create_component_from_model( + model=field_mapping_model, + config=config, + ) + ) + + return RecordExpander( + expand_records_from_field=list(model.expand_records_from_field), + config=config, + parameters=model.parameters or {}, + remain_original_record=model.remain_original_record or False, + on_no_records=model.on_no_records.value if model.on_no_records else "skip", + parent_fields_to_copy=parent_fields_to_copy, + ) + + @staticmethod + def create_parent_field_mapping( + model: ParentFieldMappingModel, + config: Config, + **kwargs: Any, + ) -> ParentFieldMapping: + return ParentFieldMapping( + source_field_path=list(model.source_field_path), + target_field=model.target_field, + config=config, + parameters=model.parameters or {}, ) @staticmethod diff --git a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py index 05e586592..982f3cc49 100644 --- a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py +++ b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py @@ -14,6 +14,10 @@ IterableDecoder, JsonDecoder, ) +from airbyte_cdk.sources.declarative.expanders.record_expander import ( + ParentFieldMapping, + RecordExpander, +) from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor config = {"field": "record_array"} @@ -121,3 +125,486 @@ def test_dpath_extractor(field_path: List, decoder: Decoder, body, expected_reco actual_records = list(extractor.extract_records(response)) assert actual_records == expected_records + + +@pytest.mark.parametrize( + "field_path, expand_records_from_field, remain_original_record, body, expected_records", + [ + ( + ["data", "object"], + ["lines", "data"], + False, + { + "data": { + "object": { + "id": "in_123", + "created": 1234567890, + "lines": { + "data": [ + {"id": "il_1", "amount": 100}, + {"id": "il_2", "amount": 200}, + ] + }, + } + } + }, + [ + {"id": "il_1", "amount": 100}, + {"id": "il_2", "amount": 200}, + ], + ), + ( + ["data", "object"], + ["lines", "data"], + True, + { + "data": { + "object": { + "id": "in_123", + "created": 1234567890, + "lines": { + "data": [ + {"id": "il_1", "amount": 100}, + ] + }, + } + } + }, + [ + { + "id": "il_1", + "amount": 100, + "original_record": { + "id": "in_123", + "created": 1234567890, + "lines": {"data": [{"id": "il_1", "amount": 100}]}, + }, + }, + ], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1", "items": []}}, + [], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1"}}, + [], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1", "items": "not_an_array"}}, + [], + ), + ( + ["data"], + ["nested", "array"], + False, + { + "data": { + "id": "parent_1", + "nested": {"array": [{"id": "child_1"}, {"id": "child_2"}]}, + } + }, + [{"id": "child_1"}, {"id": "child_2"}], + ), + ( + ["data"], + ["items"], + False, + {"data": {"id": "parent_1", "items": [1, 2, "string", {"id": "dict_item"}]}}, + [1, 2, "string", {"id": "dict_item"}], + ), + ( + [], + ["items"], + False, + [ + {"id": "parent_1", "items": [{"id": "child_1"}]}, + {"id": "parent_2", "items": [{"id": "child_2"}, {"id": "child_3"}]}, + ], + [{"id": "child_1"}, {"id": "child_2"}, {"id": "child_3"}], + ), + ( + ["data"], + ["sections", "*", "items"], + False, + { + "data": { + "sections": [ + {"name": "section1", "items": [{"id": "item_1"}, {"id": "item_2"}]}, + {"name": "section2", "items": [{"id": "item_3"}]}, + ] + } + }, + [{"id": "item_1"}, {"id": "item_2"}, {"id": "item_3"}], + ), + ( + ["data"], + ["sections", "*", "items"], + True, + { + "data": { + "sections": [ + {"name": "section1", "items": [{"id": "item_1"}]}, + ] + } + }, + [ + { + "id": "item_1", + "original_record": { + "sections": [ + {"name": "section1", "items": [{"id": "item_1"}]}, + ] + }, + } + ], + ), + ( + ["data"], + ["sections", "*", "items"], + False, + { + "data": { + "sections": [ + {"name": "section1", "items": []}, + {"name": "section2", "items": []}, + ] + } + }, + [], + ), + ( + ["data"], + ["sections", "*", "items"], + False, + { + "data": { + "sections": [ + {"name": "section1"}, + {"name": "section2", "items": "not_an_array"}, + ] + } + }, + [], + ), + ( + ["data"], + ["*", "items"], + False, + { + "data": { + "group1": {"items": [{"id": "item_1"}]}, + "group2": {"items": [{"id": "item_2"}, {"id": "item_3"}]}, + } + }, + [{"id": "item_1"}, {"id": "item_2"}, {"id": "item_3"}], + ), + ], + ids=[ + "test_expand_nested_array", + "test_expand_with_original_record", + "test_expand_empty_array_yields_nothing", + "test_expand_missing_path_yields_nothing", + "test_expand_non_array_yields_nothing", + "test_expand_deeply_nested_path", + "test_expand_mixed_types_in_array", + "test_expand_multiple_parent_records", + "test_expand_wildcard_multiple_lists", + "test_expand_wildcard_with_original_record", + "test_expand_wildcard_all_empty_arrays", + "test_expand_wildcard_no_list_matches", + "test_expand_wildcard_dict_values", + ], +) +def test_dpath_extractor_with_expansion( + field_path: List, + expand_records_from_field: List, + remain_original_record: bool, + body, + expected_records: List, +): + record_expander = RecordExpander( + expand_records_from_field=expand_records_from_field, + config=config, + parameters=parameters, + remain_original_record=remain_original_record, + ) + extractor = DpathExtractor( + field_path=field_path, + config=config, + decoder=decoder_json, + parameters=parameters, + record_expander=record_expander, + ) + + response = create_response(body) + actual_records = list(extractor.extract_records(response)) + + assert actual_records == expected_records + + +@pytest.mark.parametrize( + "field_path, expand_records_from_field, on_no_records, body, expected_records", + [ + pytest.param( + ["data"], + ["items"], + "skip", + {"data": {"id": "parent_1"}}, + [], + id="on_no_records_skip_missing_path", + ), + pytest.param( + ["data"], + ["items"], + "skip", + {"data": {"id": "parent_1", "items": []}}, + [], + id="on_no_records_skip_empty_array", + ), + pytest.param( + ["data"], + ["items"], + "emit_parent", + {"data": {"id": "parent_1"}}, + [{"id": "parent_1"}], + id="on_no_records_emit_parent_missing_path", + ), + pytest.param( + ["data"], + ["items"], + "emit_parent", + {"data": {"id": "parent_1", "items": []}}, + [{"id": "parent_1", "items": []}], + id="on_no_records_emit_parent_empty_array", + ), + pytest.param( + ["data"], + ["items"], + "emit_parent", + {"data": {"id": "parent_1", "items": "not_an_array"}}, + [{"id": "parent_1", "items": "not_an_array"}], + id="on_no_records_emit_parent_non_array", + ), + pytest.param( + ["data"], + ["items"], + "emit_parent", + {"data": {"id": "parent_1", "items": [{"id": "child_1"}]}}, + [{"id": "child_1"}], + id="on_no_records_emit_parent_has_items_extracts_normally", + ), + ], +) +def test_dpath_extractor_on_no_records( + field_path: List, + expand_records_from_field: List, + on_no_records: str, + body, + expected_records: List, +): + record_expander = RecordExpander( + expand_records_from_field=expand_records_from_field, + config=config, + parameters=parameters, + on_no_records=on_no_records, + ) + extractor = DpathExtractor( + field_path=field_path, + config=config, + decoder=decoder_json, + parameters=parameters, + record_expander=record_expander, + ) + + response = create_response(body) + actual_records = list(extractor.extract_records(response)) + + assert actual_records == expected_records + + +@pytest.mark.parametrize( + "field_path, expand_records_from_field, parent_fields_to_copy, body, expected_records", + [ + pytest.param( + ["data"], + ["items", "data"], + [ + ParentFieldMapping( + source_field_path=["id"], + target_field="parent_id", + config=config, + parameters=parameters, + ), + ], + { + "data": { + "id": "sub_123", + "created": 1234567890, + "items": {"data": [{"id": "si_1"}, {"id": "si_2"}]}, + } + }, + [ + {"id": "si_1", "parent_id": "sub_123"}, + {"id": "si_2", "parent_id": "sub_123"}, + ], + id="copy_single_parent_field", + ), + pytest.param( + ["data"], + ["items", "data"], + [ + ParentFieldMapping( + source_field_path=["id"], + target_field="subscription_id", + config=config, + parameters=parameters, + ), + ParentFieldMapping( + source_field_path=["created"], + target_field="subscription_updated", + config=config, + parameters=parameters, + ), + ], + { + "data": { + "id": "sub_123", + "created": 1234567890, + "items": {"data": [{"id": "si_1"}]}, + } + }, + [{"id": "si_1", "subscription_id": "sub_123", "subscription_updated": 1234567890}], + id="copy_multiple_parent_fields", + ), + pytest.param( + ["data"], + ["items", "data"], + [ + ParentFieldMapping( + source_field_path=["metadata", "timestamp"], + target_field="parent_timestamp", + config=config, + parameters=parameters, + ), + ], + { + "data": { + "id": "parent_1", + "metadata": {"timestamp": 9999}, + "items": {"data": [{"id": "child_1"}]}, + } + }, + [{"id": "child_1", "parent_timestamp": 9999}], + id="copy_nested_parent_field", + ), + pytest.param( + ["data"], + ["items", "data"], + [ + ParentFieldMapping( + source_field_path=["missing_field"], + target_field="should_not_exist", + config=config, + parameters=parameters, + ), + ], + {"data": {"id": "parent_1", "items": {"data": [{"id": "child_1"}]}}}, + [{"id": "child_1"}], + id="copy_missing_parent_field_ignored", + ), + ], +) +def test_dpath_extractor_parent_fields_to_copy( + field_path: List, + expand_records_from_field: List, + parent_fields_to_copy: List[ParentFieldMapping], + body, + expected_records: List, +): + record_expander = RecordExpander( + expand_records_from_field=expand_records_from_field, + config=config, + parameters=parameters, + parent_fields_to_copy=parent_fields_to_copy, + ) + extractor = DpathExtractor( + field_path=field_path, + config=config, + decoder=decoder_json, + parameters=parameters, + record_expander=record_expander, + ) + + response = create_response(body) + actual_records = list(extractor.extract_records(response)) + + assert actual_records == expected_records + + +@pytest.mark.parametrize( + "field_path, expand_records_from_field, remain_original_record, parent_fields_to_copy, body, expected_records", + [ + pytest.param( + ["data"], + ["items", "data"], + True, + [ + ParentFieldMapping( + source_field_path=["id"], + target_field="parent_id", + config=config, + parameters=parameters, + ), + ], + {"data": {"id": "parent_1", "items": {"data": [{"id": "child_1"}]}}}, + [ + { + "id": "child_1", + "parent_id": "parent_1", + "original_record": { + "id": "parent_1", + "items": {"data": [{"id": "child_1"}]}, + }, + } + ], + id="combine_remain_original_record_and_parent_fields_to_copy", + ), + ], +) +def test_dpath_extractor_combined_features( + field_path: List, + expand_records_from_field: List, + remain_original_record: bool, + parent_fields_to_copy: List[ParentFieldMapping], + body, + expected_records: List, +): + record_expander = RecordExpander( + expand_records_from_field=expand_records_from_field, + config=config, + parameters=parameters, + remain_original_record=remain_original_record, + parent_fields_to_copy=parent_fields_to_copy, + ) + extractor = DpathExtractor( + field_path=field_path, + config=config, + decoder=decoder_json, + parameters=parameters, + record_expander=record_expander, + ) + + response = create_response(body) + actual_records = list(extractor.extract_records(response)) + + assert actual_records == expected_records