diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 7acb5c1e2..ddbf4d8b9 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -11,6 +11,8 @@ from io import BufferedIOBase, TextIOWrapper from typing import Any, List, Optional +GZIP_MAGIC_BYTES = b"\x1f\x8b" + import orjson import requests @@ -35,15 +37,22 @@ def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: """ Decompress gzipped bytes and pass decompressed data to the inner parser. - IMPORTANT: - - If the data is not gzipped, reset the pointer and pass the data to the inner parser as is. - - Note: - - The data is not decoded by default. + Auto-detects gzip content by checking for magic bytes (1f 8b) at the start of the data. + If the data is not gzip-compressed, it is passed directly to the inner parser as-is. + This handles APIs that return gzip-compressed bodies without setting the Content-Encoding header. """ + header = data.read(2) + if not header: + return - with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: - yield from self.inner_parser.parse(gzipobj) + remaining = data.read() + full_data = io.BytesIO(header + remaining) + + if header == GZIP_MAGIC_BYTES: + with gzip.GzipFile(fileobj=full_data, mode="rb") as gzipobj: + yield from self.inner_parser.parse(gzipobj) + else: + yield from self.inner_parser.parse(full_data) @dataclass diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 2bd7d268d..ade134cf5 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2634,15 +2634,16 @@ def create_gzip_decoder( gzip_parser: GzipParser = ModelToComponentFactory._get_parser(model, config) # type: ignore # based on the model, we know this will be a GzipParser if self._emit_connector_builder_messages: - # This is very surprising but if the response is not streamed, - # CompositeRawDecoder calls response.content and the requests library actually uncompress the data as opposed to response.raw, - # which uses urllib3 directly and does not uncompress the data. - return CompositeRawDecoder(gzip_parser.inner_parser, False) + # When not streaming, CompositeRawDecoder uses response.content which the requests + # library auto-decompresses when Content-Encoding is set. However, some APIs return + # gzip data without Content-Encoding headers. Using gzip_parser (which auto-detects + # gzip magic bytes) ensures decompression works in both cases. + return CompositeRawDecoder(gzip_parser, False) return CompositeRawDecoder.by_headers( [({"Content-Encoding", "Content-Type"}, _compressed_response_types, gzip_parser)], stream_response=True, - fallback_parser=gzip_parser.inner_parser, + fallback_parser=gzip_parser, ) @staticmethod diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index d92d6c605..2106f47a6 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -332,6 +332,83 @@ def test_composite_raw_decoder_csv_parser_without_mocked_response(): thread.join(timeout=5) # ensure thread is cleaned up +@pytest.mark.parametrize( + "content,inner_parser,stream,use_by_headers,expected_count", + [ + pytest.param( + generate_csv(encoding="utf-8", delimiter="\t", should_compress=True), + CsvParser(encoding="utf-8", delimiter="\\t"), + True, + False, + 3, + id="gzip_csv_no_header_streamed", + ), + pytest.param( + generate_csv(encoding="iso-8859-1", delimiter="\t", should_compress=True), + CsvParser(encoding="iso-8859-1", delimiter="\\t"), + True, + False, + 3, + id="gzip_csv_no_header_iso_encoding", + ), + pytest.param( + generate_compressed_jsonlines(), + JsonLineParser(), + True, + True, + 3, + id="gzip_jsonl_no_header_by_headers_fallback", + ), + pytest.param( + "".join(generate_jsonlines()).encode("utf-8"), + JsonLineParser(), + True, + False, + 3, + id="non_gzip_passthrough", + ), + pytest.param( + generate_compressed_jsonlines(), + JsonLineParser(), + False, + False, + 3, + id="gzip_no_header_non_streamed", + ), + pytest.param( + b"", + JsonLineParser(), + True, + False, + 0, + id="empty_data", + ), + ], +) +def test_gzip_parser_auto_detection( + requests_mock, content, inner_parser, stream, use_by_headers, expected_count +): + requests_mock.register_uri( + "GET", + "https://airbyte.io/", + content=content, + ) + response = requests.get("https://airbyte.io/", stream=stream) + + parser = GzipParser(inner_parser=inner_parser) + if use_by_headers: + composite_raw_decoder = CompositeRawDecoder.by_headers( + [({"Content-Encoding"}, {"gzip"}, parser)], + stream_response=stream, + fallback_parser=parser, + ) + else: + composite_raw_decoder = CompositeRawDecoder(parser=parser, stream_response=stream) + + parsed_records = list(composite_raw_decoder.decode(response)) + assert len(parsed_records) == expected_count + + def test_given_response_already_consumed_when_decode_then_no_data_is_returned(requests_mock): requests_mock.register_uri( "GET", "https://airbyte.io/", content=json.dumps({"test": "test"}).encode()