Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,56 +31,56 @@
400: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.system_error,
error_message="HTTP Status Code: 400. Error: Bad request. Please check your request parameters.",
error_message="Bad request response from source's API.",
),
401: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message="HTTP Status Code: 401. Error: Unauthorized. Please ensure you are authenticated correctly.",
error_message="Authentication failed on source's API. Credentials may be invalid, expired, or lack required access.",
),
403: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message="HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
error_message="Source's API denied access. Configured credentials have insufficient permissions.",
),
404: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.system_error,
error_message="HTTP Status Code: 404. Error: Not found. The requested resource was not found on the server.",
error_message="Requested resource not found on source's API.",
),
405: ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.system_error,
error_message="HTTP Status Code: 405. Error: Method not allowed. Please check your request method.",
error_message="Method not allowed by source's API.",
),
408: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="HTTP Status Code: 408. Error: Request timeout.",
error_message="Request to source's API timed out.",
),
429: ErrorResolution(
response_action=ResponseAction.RATE_LIMITED,
failure_type=FailureType.transient_error,
error_message="HTTP Status Code: 429. Error: Too many requests.",
error_message="Rate limit exceeded on source's API.",
),
500: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="HTTP Status Code: 500. Error: Internal server error.",
error_message="Internal server error from source's API.",
),
502: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="HTTP Status Code: 502. Error: Bad gateway.",
error_message="Bad gateway response from source's API.",
),
503: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="HTTP Status Code: 503. Error: Service unavailable.",
error_message="Source's API is temporarily unavailable.",
),
504: ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="HTTP Status Code: 504. Error: Gateway timeout.",
error_message="Gateway timeout from source's API.",
),
}
15 changes: 11 additions & 4 deletions airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ def _send_with_retry(
self._logger.error(f"Retries exhausted with backoff exception.", exc_info=True)
raise MessageRepresentationAirbyteTracedErrors(
internal_message=f"Exhausted available request attempts. Exception: {e}",
message=f"Exhausted available request attempts. Please see logs for more details. Exception: {e}",
message=f"Request retry limit exhausted. See logs for details.",
failure_type=e.failure_type or FailureType.system_error,
exception=e,
stream_descriptor=StreamDescriptor(name=self._name),
Expand Down Expand Up @@ -424,6 +424,13 @@ def _evict_key(self, prepared_request: requests.PreparedRequest) -> None:
if prepared_request in self._request_attempt_count:
del self._request_attempt_count[prepared_request]

def _format_error_message(self, error_resolution: ErrorResolution, response: Optional[requests.Response]) -> Optional[str]:
"""Prepend stream name and HTTP status code to the error resolution message for user-facing context."""
if not error_resolution.error_message:
return None
status_prefix = f"HTTP {response.status_code}. " if response is not None else ""
return f"Stream '{self._name}': {status_prefix}{error_resolution.error_message}"
Comment on lines +427 to +432
Copy link

Copilot AI Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new _format_error_message method lacks dedicated unit tests. While existing tests will implicitly verify that the formatted messages contain the expected substrings, there are no tests that explicitly verify the stream name and HTTP status code are correctly prepended to error messages. Consider adding unit tests specifically for this method to verify its behavior with and without a response object, and to ensure the formatting is correct.

Copilot uses AI. Check for mistakes.

def _handle_error_resolution(
self,
response: Optional[requests.Response],
Expand Down Expand Up @@ -497,7 +504,7 @@ def _handle_error_resolution(

raise MessageRepresentationAirbyteTracedErrors(
internal_message=error_message,
message=error_resolution.error_message or error_message,
message=self._format_error_message(error_resolution, response) or error_message,
failure_type=error_resolution.failure_type,
)

Expand All @@ -507,7 +514,7 @@ def _handle_error_resolution(
else:
log_message = f"Ignoring response for '{request.method}' request to '{request.url}' with error '{exc}'"

self._logger.info(error_resolution.error_message or log_message)
self._logger.info(self._format_error_message(error_resolution, response) or log_message)

# TODO: Consider dynamic retry count depending on subsequent error codes
elif error_resolution.response_action in (
Expand All @@ -525,7 +532,7 @@ def _handle_error_resolution(
user_defined_backoff_time = backoff_time
break
error_message = (
error_resolution.error_message
self._format_error_message(error_resolution, response)
or f"Request to {request.url} failed with failure type {error_resolution.failure_type}, response action {error_resolution.response_action}."
)

Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/utils/traced_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def as_airbyte_message(
emitted_at=now_millis,
error=AirbyteErrorTraceMessage(
message=self.message
or "Something went wrong in the connector. See the logs for more details.",
or "Unhandled connector error. See logs for details.",
internal_message=self.internal_message,
failure_type=self.failure_type,
stack_trace=stack_trace_str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@
404,
Status.FAILED,
True,
["Not found. The requested resource was not found on the server."],
["Requested resource not found on source's API."],
id="test_stream_unavailable_unhandled_error",
),
pytest.param(
403,
Status.FAILED,
True,
["Forbidden. You don't have permission to access this resource."],
["Source's API denied access. Configured credentials have insufficient permissions."],
id="test_stream_unavailable_handled_error",
),
pytest.param(200, Status.SUCCEEDED, True, [], id="test_stream_available"),
Expand All @@ -128,7 +128,7 @@
401,
Status.FAILED,
True,
["Unauthorized. Please ensure you are authenticated correctly."],
["Authentication failed on source's API. Credentials may be invalid, expired, or lack required access."],
id="test_stream_unauthorized_error",
),
],
Expand Down
16 changes: 8 additions & 8 deletions unit_tests/sources/declarative/checks/test_check_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ def test_check_stream_with_no_stream_slices_aborts():
"test_stream_unavailable_unhandled_error",
404,
False,
["Not found. The requested resource was not found on the server."],
["Requested resource not found on source's API."],
),
(
"test_stream_unavailable_handled_error",
403,
False,
["Forbidden. You don't have permission to access this resource."],
["Source's API denied access. Configured credentials have insufficient permissions."],
),
("test_stream_available", 200, True, []),
],
Expand Down Expand Up @@ -521,7 +521,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
Status.FAILED,
False,
404,
["Not found. The requested resource was not found on the server."],
["Requested resource not found on source's API."],
0,
id="test_stream_unavailable_unhandled_error",
),
Expand All @@ -530,7 +530,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
Status.FAILED,
False,
403,
["Forbidden. You don't have permission to access this resource."],
["Source's API denied access. Configured credentials have insufficient permissions."],
0,
id="test_stream_unavailable_handled_error",
),
Expand All @@ -539,7 +539,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
Status.FAILED,
False,
401,
["Unauthorized. Please ensure you are authenticated correctly."],
["Authentication failed on source's API. Credentials may be invalid, expired, or lack required access."],
0,
id="test_stream_unauthorized_error",
),
Expand All @@ -563,7 +563,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
Status.FAILED,
False,
404,
["Not found. The requested resource was not found on the server."],
["Requested resource not found on source's API."],
0,
id="test_dynamic_stream_unavailable_unhandled_error",
),
Expand All @@ -587,7 +587,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
Status.FAILED,
False,
403,
["Forbidden. You don't have permission to access this resource."],
["Source's API denied access. Configured credentials have insufficient permissions."],
0,
id="test_dynamic_stream_unavailable_handled_error",
),
Expand All @@ -611,7 +611,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
Status.FAILED,
False,
401,
["Unauthorized. Please ensure you are authenticated correctly."],
["Authentication failed on source's API. Credentials may be invalid, expired, or lack required access."],
0,
id="test_dynamic_stream_unauthorized_error",
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def test_default_error_handler_with_default_response_filter(
),
ResponseAction.RETRY,
FailureType.system_error,
"HTTP Status Code: 400. Error: Bad request. Please check your request parameters.",
"Bad request response from source's API.",
),
(
"_with_http_response_status_402_fail_with_default_failure_type",
Expand All @@ -118,7 +118,7 @@ def test_default_error_handler_with_default_response_filter(
),
ResponseAction.FAIL,
FailureType.config_error,
"HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
"Source's API denied access. Configured credentials have insufficient permissions.",
),
(
"_with_http_response_status_200_fail_with_contained_error_message",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
ErrorResolution(
response_action=ResponseAction.IGNORE,
failure_type=FailureType.config_error,
error_message="HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
error_message="Source's API denied access. Configured credentials have insufficient permissions.",
),
id="test_http_code_matches_ignore_action",
),
Expand All @@ -59,7 +59,7 @@
ErrorResolution(
response_action=ResponseAction.RETRY,
failure_type=FailureType.transient_error,
error_message="HTTP Status Code: 429. Error: Too many requests.",
error_message="Rate limit exceeded on source's API.",
),
id="test_http_code_matches_retry_action",
),
Expand Down Expand Up @@ -104,7 +104,7 @@
ErrorResolution(
response_action=ResponseAction.FAIL,
failure_type=FailureType.config_error,
error_message="HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
error_message="Source's API denied access. Configured credentials have insufficient permissions.",
),
id="test_predicate_matches_headers",
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ def test_given_ok_response_http_status_error_handler_returns_success_action(mock
403,
ResponseAction.FAIL,
FailureType.config_error,
"HTTP Status Code: 403. Error: Forbidden. You don't have permission to access this resource.",
"Source's API denied access. Configured credentials have insufficient permissions.",
),
(
404,
ResponseAction.FAIL,
FailureType.system_error,
"HTTP Status Code: 404. Error: Not found. The requested resource was not found on the server.",
"Requested resource not found on source's API.",
),
],
)
Expand Down
15 changes: 7 additions & 8 deletions unit_tests/sources/streams/http/test_availability_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def retry_factor(self) -> float:
{"error": "Something went wrong"},
False,
[
"Forbidden. You don't have permission to access this resource.",
"Forbidden. You don't have permission to access this resource.",
"Source's API denied access. Configured credentials have insufficient permissions.",
"Source's API denied access. Configured credentials have insufficient permissions.",
],
),
(200, {}, True, []),
Expand All @@ -59,8 +59,8 @@ def retry_factor(self) -> float:
@pytest.mark.parametrize(
("include_source", "expected_docs_url_messages"),
[
(True, ["Forbidden. You don't have permission to access this resource."]),
(False, ["Forbidden. You don't have permission to access this resource."]),
(True, ["Source's API denied access. Configured credentials have insufficient permissions."]),
(False, ["Source's API denied access. Configured credentials have insufficient permissions."]),
],
)
@pytest.mark.parametrize("records_as_list", [True, False])
Expand Down Expand Up @@ -105,10 +105,9 @@ def test_http_availability_raises_unhandled_error(mocker):
req.status_code = 404
mocker.patch.object(requests.Session, "send", return_value=req)

assert (
False,
"HTTP Status Code: 404. Error: Not found. The requested resource was not found on the server.",
) == HttpAvailabilityStrategy().check_availability(http_stream, logger)
is_available, reason = HttpAvailabilityStrategy().check_availability(http_stream, logger)
assert is_available is False
assert "Requested resource not found on source's API." in reason


def test_send_handles_retries_when_checking_availability(mocker, caplog):
Expand Down
4 changes: 2 additions & 2 deletions unit_tests/sources/streams/http/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def get_error_handler(self) -> Optional[ErrorHandler]:
send_mock = mocker.patch.object(requests.Session, "send", return_value=req)

with pytest.raises(
AirbyteTracedException, match="Exception: HTTP Status Code: 429. Error: Too many requests."
AirbyteTracedException, match="Rate limit exceeded on source's API."
):
list(stream.read_records(SyncMode.full_refresh))
if retries <= 0:
Expand Down Expand Up @@ -316,7 +316,7 @@ def test_raise_on_http_errors_off_429(mocker):
mocker.patch.object(requests.Session, "send", return_value=req)
with pytest.raises(
AirbyteTracedException,
match="Exhausted available request attempts. Please see logs for more details. Exception: HTTP Status Code: 429. Error: Too many requests.",
match="Request retry limit exhausted. See logs for details.",
):
stream.exit_on_rate_limit = True
list(stream.read_records(SyncMode.full_refresh))
Expand Down
2 changes: 1 addition & 1 deletion unit_tests/sources/test_abstract_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -1765,7 +1765,7 @@ def test_resumable_full_refresh_skip_prior_successful_streams(self, mocker):
),
pytest.param(
Exception("Generic connector error message"),
"Something went wrong in the connector. See the logs for more details.",
"Unhandled connector error. See logs for details.",
"Generic connector error message",
id="test_raises_generic_exception",
),
Expand Down
4 changes: 2 additions & 2 deletions unit_tests/utils/test_traced_exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def test_exception_as_airbyte_message():
assert airbyte_message.trace.error.failure_type == FailureType.system_error
assert (
airbyte_message.trace.error.message
== "Something went wrong in the connector. See the logs for more details."
== "Unhandled connector error. See logs for details."
)
assert airbyte_message.trace.error.internal_message == "an internal message"
assert (
Expand All @@ -71,7 +71,7 @@ def test_existing_exception_as_airbyte_message(raised_exception):
assert airbyte_message.trace.type == TraceType.ERROR
assert (
airbyte_message.trace.error.message
== "Something went wrong in the connector. See the logs for more details."
== "Unhandled connector error. See logs for details."
)
assert airbyte_message.trace.error.internal_message == "an error has occurred"
assert airbyte_message.trace.error.stack_trace.startswith("Traceback (most recent call last):")
Expand Down
Loading