Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from io import BufferedIOBase, TextIOWrapper
from typing import Any, List, Optional

GZIP_MAGIC_BYTES = b"\x1f\x8b"

import orjson
import requests

Expand All @@ -35,15 +37,22 @@ def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
"""
Decompress gzipped bytes and pass decompressed data to the inner parser.

IMPORTANT:
- If the data is not gzipped, reset the pointer and pass the data to the inner parser as is.

Note:
- The data is not decoded by default.
Auto-detects gzip content by checking for magic bytes (1f 8b) at the start of the data.
If the data is not gzip-compressed, it is passed directly to the inner parser as-is.
This handles APIs that return gzip-compressed bodies without setting the Content-Encoding header.
"""
header = data.read(2)
if not header:
return

with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj:
yield from self.inner_parser.parse(gzipobj)
remaining = data.read()
full_data = io.BytesIO(header + remaining)

if header == GZIP_MAGIC_BYTES:
with gzip.GzipFile(fileobj=full_data, mode="rb") as gzipobj:
yield from self.inner_parser.parse(gzipobj)
else:
yield from self.inner_parser.parse(full_data)


@dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2634,15 +2634,16 @@ def create_gzip_decoder(
gzip_parser: GzipParser = ModelToComponentFactory._get_parser(model, config) # type: ignore # based on the model, we know this will be a GzipParser

if self._emit_connector_builder_messages:
# This is very surprising but if the response is not streamed,
# CompositeRawDecoder calls response.content and the requests library actually uncompress the data as opposed to response.raw,
# which uses urllib3 directly and does not uncompress the data.
return CompositeRawDecoder(gzip_parser.inner_parser, False)
# When not streaming, CompositeRawDecoder uses response.content which the requests
# library auto-decompresses when Content-Encoding is set. However, some APIs return
# gzip data without Content-Encoding headers. Using gzip_parser (which auto-detects
# gzip magic bytes) ensures decompression works in both cases.
return CompositeRawDecoder(gzip_parser, False)

return CompositeRawDecoder.by_headers(
[({"Content-Encoding", "Content-Type"}, _compressed_response_types, gzip_parser)],
stream_response=True,
fallback_parser=gzip_parser.inner_parser,
fallback_parser=gzip_parser,
)

@staticmethod
Expand Down
77 changes: 77 additions & 0 deletions unit_tests/sources/declarative/decoders/test_composite_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,83 @@ def test_composite_raw_decoder_csv_parser_without_mocked_response():
thread.join(timeout=5) # ensure thread is cleaned up


@pytest.mark.parametrize(
"content,inner_parser,stream,use_by_headers,expected_count",
[
pytest.param(
generate_csv(encoding="utf-8", delimiter="\t", should_compress=True),
CsvParser(encoding="utf-8", delimiter="\\t"),
True,
False,
3,
id="gzip_csv_no_header_streamed",
),
pytest.param(
generate_csv(encoding="iso-8859-1", delimiter="\t", should_compress=True),
CsvParser(encoding="iso-8859-1", delimiter="\\t"),
True,
False,
3,
id="gzip_csv_no_header_iso_encoding",
),
pytest.param(
generate_compressed_jsonlines(),
JsonLineParser(),
True,
True,
3,
id="gzip_jsonl_no_header_by_headers_fallback",
),
pytest.param(
"".join(generate_jsonlines()).encode("utf-8"),
JsonLineParser(),
True,
False,
3,
id="non_gzip_passthrough",
),
pytest.param(
generate_compressed_jsonlines(),
JsonLineParser(),
False,
False,
3,
id="gzip_no_header_non_streamed",
),
pytest.param(
b"",
JsonLineParser(),
True,
False,
0,
id="empty_data",
),
],
)
def test_gzip_parser_auto_detection(
requests_mock, content, inner_parser, stream, use_by_headers, expected_count
):
requests_mock.register_uri(
"GET",
"https://airbyte.io/",
content=content,
)
response = requests.get("https://airbyte.io/", stream=stream)

parser = GzipParser(inner_parser=inner_parser)
if use_by_headers:
composite_raw_decoder = CompositeRawDecoder.by_headers(
[({"Content-Encoding"}, {"gzip"}, parser)],
stream_response=stream,
fallback_parser=parser,
)
else:
composite_raw_decoder = CompositeRawDecoder(parser=parser, stream_response=stream)

parsed_records = list(composite_raw_decoder.decode(response))
assert len(parsed_records) == expected_count


def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock):
requests_mock.register_uri(
"GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode()
Expand Down