Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion clients/python/src/taskbroker_client/worker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,18 @@ def request_deserializer(serialized_request: bytes) -> Any:
_RPC_SIGNATURE_AUTH_TLS.failed = True
return inner_deserializer(b"")

return inner_deserializer(serialized_request)
try:
return inner_deserializer(serialized_request)
except Exception:
# gRPC swallows deserializer exceptions in `grpc._common._transform`
# (it logs to the `grpc._common` logger and returns None), and the server
# then aborts the call with an opaque INTERNAL "Exception deserializing
# request!". Log here so the failure is visible on our own logger.
logger.exception(
"taskworker.grpc_server.request_deserialization_failed",
extra={"method": method},
)
raise

def unary_unary(request: Any, context: grpc.ServicerContext) -> Any:
if getattr(_RPC_SIGNATURE_AUTH_TLS, "failed", False):
Expand Down
70 changes: 69 additions & 1 deletion clients/python/tests/worker/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,25 @@
import string
import time
from collections import defaultdict
from collections.abc import Callable
from collections.abc import Callable, Generator
from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from pathlib import Path
from typing import Any
from unittest.mock import Mock, patch

import grpc
import pytest
from google.protobuf.message import Message
from sentry_protos.taskbroker.v1 import taskbroker_pb2_grpc
from sentry_protos.taskbroker.v1.taskbroker_pb2 import (
TASK_ACTIVATION_STATUS_COMPLETE,
TASK_ACTIVATION_STATUS_RETRY,
FetchNextTask,
GetTaskRequest,
GetTaskResponse,
PushTaskRequest,
PushTaskResponse,
SetTaskStatusRequest,
SetTaskStatusResponse,
TaskActivation,
Expand Down Expand Up @@ -394,6 +398,70 @@ def test_request_signature_server_interceptor_skips_grpc_health_check() -> None:
assert interceptor.intercept_service(lambda _: handler, handler_call_details) is handler


class _RecordingWorkerServicer(taskbroker_pb2_grpc.WorkerServiceServicer):
"""Records the requests it receives, like the real WorkerServicer would process them"""

def __init__(self) -> None:
self.requests: list[PushTaskRequest] = []

def PushTask(self, request: PushTaskRequest, context: grpc.ServicerContext) -> PushTaskResponse:
self.requests.append(request)
return PushTaskResponse()


@contextmanager
def _running_worker_server(
secrets: list[str],
) -> Generator[tuple[_RecordingWorkerServicer, grpc.Channel], None, None]:
"""Run a real gRPC server with the signature interceptor, as PushTaskWorker does.

The signature is verified inside a deserializer wrapper that gRPC runs on a
different thread than the servicer, so this behaviour can only be exercised
against a real server, not a mocked channel.
"""
servicer = _RecordingWorkerServicer()
server = grpc.server(
ThreadPoolExecutor(max_workers=2),
interceptors=[RequestSignatureServerInterceptor(secrets)],
)
taskbroker_pb2_grpc.add_WorkerServiceServicer_to_server(servicer, server)
port = server.add_insecure_port("[::]:0")
server.start()
channel = grpc.insecure_channel(f"localhost:{port}")
try:
yield servicer, channel
finally:
channel.close()
server.stop(grace=None)


def test_request_signature_server_interceptor_logs_deserialization_errors(
caplog: pytest.LogCaptureFixture,
) -> None:
# gRPC swallows deserializer exceptions in `grpc._common._transform` and the
# server aborts the call with an opaque INTERNAL status, so the interceptor must
# log the exception itself to leave a trace on our own logger.
body = b"\xff\xff\xff\xff not a protobuf"
signature = _push_task_hmac(b"secret", body)

with _running_worker_server(["secret"]) as (servicer, channel):
multicallable = channel.unary_unary(
_PUSH_TASK_METHOD,
request_serializer=None,
response_deserializer=PushTaskResponse.FromString,
)
with caplog.at_level("ERROR", logger="taskbroker_client.worker.client"):
with pytest.raises(grpc.RpcError) as excinfo:
multicallable(body, timeout=5, metadata=(("sentry-signature", signature),))

assert excinfo.value.code() == grpc.StatusCode.INTERNAL
assert servicer.requests == []
assert any(
record.message == "taskworker.grpc_server.request_deserialization_failed"
for record in caplog.records
)


def test_get_task_with_namespace() -> None:
channel = MockChannel()
channel.add_response(
Expand Down
Loading