Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
d077d99
feat: Add expand_records_from_field and remain_original_record to Dpa…
devin-ai-integration[bot] Dec 2, 2025
81d4630
style: Fix ruff formatting in dpath_extractor.py
devin-ai-integration[bot] Dec 2, 2025
c7ac5f2
style: Fix ruff formatting in test_dpath_extractor.py
devin-ai-integration[bot] Dec 2, 2025
24c8ac9
fix: Add type annotation for _expand_path to fix MyPy error
devin-ai-integration[bot] Dec 2, 2025
91690f4
refactor: Extract record expansion logic into RecordExpander class
devin-ai-integration[bot] Dec 2, 2025
c035138
feat: Add RecordExpander to declarative component schema
devin-ai-integration[bot] Dec 2, 2025
b04e174
refactor: Clean up DpathExtractor extract_records logic
devin-ai-integration[bot] Dec 2, 2025
c8a2643
fix: Update RecordExpander to return nothing when path doesn't exist
devin-ai-integration[bot] Dec 2, 2025
c6a9d05
feat: Add wildcard support to RecordExpander and remove TypeError
devin-ai-integration[bot] Dec 2, 2025
c6448e5
fix: Add type casts for dpath.values and dpath.get to fix MyPy errors
devin-ai-integration[bot] Dec 2, 2025
6afe474
refactor: Eliminate code duplication in expand_record method
devin-ai-integration[bot] Dec 2, 2025
5b0c0d5
refactor: Simplify expand_record per code review feedback
devin-ai-integration[bot] Dec 2, 2025
2ca9ad7
feat: Add on_no_records and parent_fields_to_copy to RecordExpander
devin-ai-integration[bot] Feb 3, 2026
1aadd2f
Add missing import
agarctfi Feb 6, 2026
1f31837
Auto-fix lint and format issues
Feb 6, 2026
f6cf99c
fix: Use Sequence instead of list for covariant type annotations in R…
devin-ai-integration[bot] Feb 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1912,6 +1912,10 @@ definitions:
- ["data", "records"]
- ["data", "{{ parameters.name }}"]
- ["data", "*", "record"]
record_expander:
title: Record Expander
description: Optional component to expand records by extracting items from nested array fields.
"$ref": "#/definitions/RecordExpander"
$parameters:
type: object
additionalProperties: true
Expand All @@ -1928,6 +1932,86 @@ definitions:
$parameters:
type: object
additionalProperties: true
RecordExpander:
title: Record Expander
description: Expands records by extracting items from a nested array field. When configured, this component extracts items from a specified nested array path within each record and emits each item as a separate record. Optionally, the original parent record can be embedded in each expanded item for context preservation. Supports wildcards (*) for matching multiple arrays.
type: object
required:
- type
- expand_records_from_field
properties:
type:
type: string
enum: [RecordExpander]
expand_records_from_field:
title: Expand Records From Field
description: Path to a nested array field within each record. Items from this array will be extracted and emitted as separate records. Supports wildcards (*) for matching multiple arrays.
type: array
items:
type: string
interpolation_context:
- config
examples:
- ["lines", "data"]
- ["items"]
- ["nested", "array"]
- ["sections", "*", "items"]
remain_original_record:
title: Remain Original Record
description: If true, each expanded record will include the original parent record in an "original_record" field. Defaults to false.
type: boolean
default: false
on_no_records:
title: On No Records
description: Behavior when the expansion path is missing, not a list, or an empty list. "skip" (default) emits nothing. "emit_parent" emits the original parent record unchanged.
type: string
enum:
- skip
- emit_parent
default: skip
parent_fields_to_copy:
title: Parent Fields To Copy
description: List of parent field mappings to copy onto each expanded child record. Each mapping specifies a source path in the parent record and a target field name in the child record.
type: array
items:
"$ref": "#/definitions/ParentFieldMapping"
$parameters:
type: object
additionalProperties: true
ParentFieldMapping:
title: Parent Field Mapping
description: Defines a mapping from a parent record field to a child record field.
type: object
required:
- type
- source_field_path
- target_field
properties:
type:
type: string
enum: [ParentFieldMapping]
source_field_path:
title: Source Field Path
description: Path to the field in the parent record to copy.
type: array
items:
type: string
interpolation_context:
- config
examples:
- ["id"]
- ["created"]
- ["metadata", "timestamp"]
target_field:
title: Target Field
description: Name of the field in the child record where the value will be copied.
type: string
examples:
- "parent_id"
- "subscription_updated"
$parameters:
type: object
additionalProperties: true
ExponentialBackoffStrategy:
title: Exponential Backoff
description: Backoff strategy with an exponential backoff interval. The interval is defined as factor * 2^attempt_count.
Expand Down
10 changes: 10 additions & 0 deletions airbyte_cdk/sources/declarative/expanders/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.expanders.record_expander import (
ParentFieldMapping,
RecordExpander,
)

__all__ = ["ParentFieldMapping", "RecordExpander"]
153 changes: 153 additions & 0 deletions airbyte_cdk/sources/declarative/expanders/record_expander.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, Mapping, MutableMapping, Sequence

import dpath

from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config


@dataclass
class ParentFieldMapping:
"""Defines a mapping from a parent record field to a child record field."""

source_field_path: Sequence[str | InterpolatedString]
target_field: str
config: Config
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._source_path = [
InterpolatedString.create(path, parameters=parameters)
for path in self.source_field_path
]

def copy_field(
self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any]
) -> None:
"""Copy a field from parent record to child record."""
source_path = [path.eval(self.config) for path in self._source_path]
try:
value = dpath.get(dict(parent_record), source_path)
child_record[self.target_field] = value
except KeyError:
pass


@dataclass
class RecordExpander:
"""Expands records by extracting items from a nested array field.

When configured, this component extracts items from a specified nested array path
within each record and emits each item as a separate record. Optionally, the original
parent record can be embedded in each expanded item for context preservation.

The expand_records_from_field path supports wildcards (*) for matching multiple arrays.
When wildcards are used, items from all matched arrays are extracted and emitted.

Examples of instantiating this component:
```
record_expander:
type: RecordExpander
expand_records_from_field:
- "lines"
- "data"
remain_original_record: true
```

```
record_expander:
type: RecordExpander
expand_records_from_field:
- "sections"
- "*"
- "items"
on_no_records: emit_parent
parent_fields_to_copy:
- type: ParentFieldMapping
source_field_path: ["id"]
target_field: "parent_id"
```

Attributes:
expand_records_from_field: Path to a nested array field within each record.
Items from this array will be extracted and emitted as separate records.
Supports wildcards (*).
remain_original_record: If True, each expanded record will include the original
parent record in an "original_record" field. Defaults to False.
on_no_records: Behavior when expansion produces no records. "skip" (default)
emits nothing. "emit_parent" emits the original parent record unchanged.
parent_fields_to_copy: List of field mappings to copy from parent to each
expanded child record.
config: The user-provided configuration as specified by the source's spec.
"""

expand_records_from_field: Sequence[str | InterpolatedString]
config: Config
parameters: InitVar[Mapping[str, Any]]
remain_original_record: bool = False
on_no_records: str = "skip"
parent_fields_to_copy: list[ParentFieldMapping] = field(default_factory=list)
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use List from the typing module instead of the built-in list type for compatibility with Python versions prior to 3.9. The codebase uses List and Union from typing elsewhere (e.g., in dpath_extractor.py).

Copilot uses AI. Check for mistakes.

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._expand_path: list[InterpolatedString] | None = [
Copy link

Copilot AI Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Optional[List[InterpolatedString]] from the typing module instead of list[InterpolatedString] | None for compatibility with Python versions prior to 3.10. The codebase uses Optional from typing elsewhere (e.g., in dpath_extractor.py).

Copilot uses AI. Check for mistakes.
InterpolatedString.create(path, parameters=parameters)
for path in self.expand_records_from_field
]

def expand_record(self, record: MutableMapping[Any, Any]) -> Iterable[MutableMapping[Any, Any]]:
"""Expand a record by extracting items from a nested array field."""
if not self._expand_path:
yield record
return

parent_record = record
expand_path = [path.eval(self.config) for path in self._expand_path]
expanded_any = False

if "*" in expand_path:
extracted: Any = dpath.values(parent_record, expand_path)
for record in extracted:
if isinstance(record, list):
for item in record:
if isinstance(item, dict):
expanded_record = dict(item)
self._apply_parent_context(parent_record, expanded_record)
yield expanded_record
expanded_any = True
else:
yield item
expanded_any = True
else:
try:
extracted = dpath.get(parent_record, expand_path)
except KeyError:
extracted = None

if isinstance(extracted, list):
for item in extracted:
if isinstance(item, dict):
expanded_record = dict(item)
self._apply_parent_context(parent_record, expanded_record)
yield expanded_record
expanded_any = True
else:
yield item
expanded_any = True

if not expanded_any and self.on_no_records == "emit_parent":
yield parent_record

def _apply_parent_context(
self, parent_record: Mapping[str, Any], child_record: MutableMapping[str, Any]
) -> None:
"""Apply parent context to a child record."""
if self.remain_original_record:
child_record["original_record"] = parent_record

for field_mapping in self.parent_fields_to_copy:
field_mapping.copy_field(parent_record, child_record)
35 changes: 32 additions & 3 deletions airbyte_cdk/sources/declarative/extractors/dpath_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
#

from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, List, Mapping, MutableMapping, Union
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

import dpath
import requests

from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
from airbyte_cdk.sources.declarative.expanders.record_expander import RecordExpander
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config
Expand All @@ -24,6 +25,11 @@ class DpathExtractor(RecordExtractor):
If the field path points to an empty object, an empty array is returned.
If the field path points to a non-existing path, an empty array is returned.

Optionally, records can be expanded by providing a RecordExpander component.
When record_expander is configured, each extracted record is passed through the
expander which extracts items from nested array fields and emits each item as a
separate record.

Examples of instantiating this transform:
```
extractor:
Expand All @@ -47,16 +53,32 @@ class DpathExtractor(RecordExtractor):
field_path: []
```

```
extractor:
type: DpathExtractor
field_path:
- "data"
- "object"
record_expander:
type: RecordExpander
expand_records_from_field:
- "lines"
- "data"
remain_original_record: true
```

Attributes:
field_path (Union[InterpolatedString, str]): Path to the field that should be extracted
config (Config): The user-provided configuration as specified by the source's spec
decoder (Decoder): The decoder responsible to transfom the response in a Mapping
record_expander (Optional[RecordExpander]): Optional component to expand records by extracting items from nested array fields
"""

field_path: List[Union[InterpolatedString, str]]
config: Config
parameters: InitVar[Mapping[str, Any]]
decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
record_expander: Optional[RecordExpander] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._field_path = [
Expand All @@ -79,8 +101,15 @@ def extract_records(self, response: requests.Response) -> Iterable[MutableMappin
else:
extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
if isinstance(extracted, list):
yield from extracted
if not self.record_expander:
yield from extracted
else:
for record in extracted:
yield from self.record_expander.expand_record(record)
elif extracted:
yield extracted
if self.record_expander:
yield from self.record_expander.expand_record(extracted)
else:
yield extracted
else:
yield from []
Loading
Loading