From a4340c5a1b8f89726f8688825b44ae3333e5a79c Mon Sep 17 00:00:00 2001 From: Nathanael Mortensen Date: Fri, 12 Dec 2025 14:55:29 -0800 Subject: [PATCH] Correctly map all Error fields from gRPC details It's not enough to just return the correct error type. User logic, not to mention client internals, rely on specific fields from the error returned by the server. Extract these from the Status details and set them on the corresponding TException type. Additionally include the Client feature flags header, which can disable/enable returning WorkflowExecutionAlreadyCompletedError. The current implementation incorrectly translated EntityNotExistsErrors to WorkflowExecutionAlreadyCompletedError based on the message name, which is different from the Thrift behavior. If the client sends the correct header it will receive the correct type. --- .../compatibility/Thrift2ProtoAdapter.java | 96 +++---- .../proto/serviceclient/GrpcServiceStubs.java | 10 + .../compatibility/thrift/ErrorMapper.java | 255 ++++++++++++----- .../compatibility/thrift/ErrorMapperTest.java | 257 ++++++++++-------- .../cadence/testUtils/TestEnvironment.java | 8 +- 5 files changed, 402 insertions(+), 224 deletions(-) diff --git a/src/main/java/com/uber/cadence/internal/compatibility/Thrift2ProtoAdapter.java b/src/main/java/com/uber/cadence/internal/compatibility/Thrift2ProtoAdapter.java index b4bae6557..2034c4df0 100644 --- a/src/main/java/com/uber/cadence/internal/compatibility/Thrift2ProtoAdapter.java +++ b/src/main/java/com/uber/cadence/internal/compatibility/Thrift2ProtoAdapter.java @@ -139,7 +139,7 @@ public void RegisterDomain(RegisterDomainRequest registerRequest) .domainBlockingStub() .registerDomain(RequestMapper.registerDomainRequest(registerRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -154,7 +154,7 @@ public DescribeDomainResponse DescribeDomain(DescribeDomainRequest describeReque .describeDomain(RequestMapper.describeDomainRequest(describeRequest)); return ResponseMapper.describeDomainResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -177,7 +177,7 @@ public ListDomainsResponse ListDomains(ListDomainsRequest listRequest) .listDomains(RequestMapper.listDomainsRequest(listRequest)); return ResponseMapper.listDomainsResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -192,7 +192,7 @@ public UpdateDomainResponse UpdateDomain(UpdateDomainRequest updateRequest) .updateDomain(RequestMapper.updateDomainRequest(updateRequest)); return ResponseMapper.updateDomainResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -205,7 +205,7 @@ public void DeprecateDomain(DeprecateDomainRequest deprecateRequest) .domainBlockingStub() .deprecateDomain(RequestMapper.deprecateDomainRequest(deprecateRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -241,7 +241,7 @@ public StartWorkflowExecutionAsyncResponse StartWorkflowExecutionAsync( RequestMapper.startWorkflowExecutionAsyncRequest(startRequest)); return ResponseMapper.startWorkflowExecutionAsyncResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -258,7 +258,7 @@ private StartWorkflowExecutionResponse startWorkflowExecution( .startWorkflowExecution(RequestMapper.startWorkflowExecutionRequest(startRequest)); return ResponseMapper.startWorkflowExecutionResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -281,7 +281,7 @@ public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistory( RequestMapper.getWorkflowExecutionHistoryRequest(getRequest)); return ResponseMapper.getWorkflowExecutionHistoryResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -296,7 +296,7 @@ public PollForDecisionTaskResponse PollForDecisionTask(PollForDecisionTaskReques .pollForDecisionTask(RequestMapper.pollForDecisionTaskRequest(pollRequest)); return ResponseMapper.pollForDecisionTaskResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -314,7 +314,7 @@ public RespondDecisionTaskCompletedResponse RespondDecisionTaskCompleted( RequestMapper.respondDecisionTaskCompletedRequest(completeRequest)); return ResponseMapper.respondDecisionTaskCompletedResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -328,7 +328,7 @@ public void RespondDecisionTaskFailed(RespondDecisionTaskFailedRequest failedReq .workerBlockingStub() .respondDecisionTaskFailed(RequestMapper.respondDecisionTaskFailedRequest(failedRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -343,7 +343,7 @@ public PollForActivityTaskResponse PollForActivityTask(PollForActivityTaskReques .pollForActivityTask(RequestMapper.pollForActivityTaskRequest(pollRequest)); return ResponseMapper.pollForActivityTaskResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -361,7 +361,7 @@ public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeat( RequestMapper.recordActivityTaskHeartbeatRequest(heartbeatRequest)); return ResponseMapper.recordActivityTaskHeartbeatResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -379,7 +379,7 @@ public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeatByID( RequestMapper.recordActivityTaskHeartbeatByIdRequest(heartbeatRequest)); return ResponseMapper.recordActivityTaskHeartbeatByIdResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -394,7 +394,7 @@ public void RespondActivityTaskCompleted(RespondActivityTaskCompletedRequest com .respondActivityTaskCompleted( RequestMapper.respondActivityTaskCompletedRequest(completeRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -410,7 +410,7 @@ public void RespondActivityTaskCompletedByID( .respondActivityTaskCompletedByID( RequestMapper.respondActivityTaskCompletedByIdRequest(completeRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -424,7 +424,7 @@ public void RespondActivityTaskFailed(RespondActivityTaskFailedRequest failReque .workerBlockingStub() .respondActivityTaskFailed(RequestMapper.respondActivityTaskFailedRequest(failRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -439,7 +439,7 @@ public void RespondActivityTaskFailedByID(RespondActivityTaskFailedByIDRequest f .respondActivityTaskFailedByID( RequestMapper.respondActivityTaskFailedByIdRequest(failRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -454,7 +454,7 @@ public void RespondActivityTaskCanceled(RespondActivityTaskCanceledRequest cance .respondActivityTaskCanceled( RequestMapper.respondActivityTaskCanceledRequest(canceledRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -470,7 +470,7 @@ public void RespondActivityTaskCanceledByID( .respondActivityTaskCanceledByID( RequestMapper.respondActivityTaskCanceledByIdRequest(canceledRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -488,7 +488,7 @@ public void RequestCancelWorkflowExecution(RequestCancelWorkflowExecutionRequest .requestCancelWorkflowExecution( RequestMapper.requestCancelWorkflowExecutionRequest(cancelRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -505,7 +505,7 @@ public void SignalWorkflowExecution(SignalWorkflowExecutionRequest signalRequest .workflowBlockingStub() .signalWorkflowExecution(RequestMapper.signalWorkflowExecutionRequest(signalRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -524,7 +524,7 @@ public StartWorkflowExecutionResponse SignalWithStartWorkflowExecution( RequestMapper.signalWithStartWorkflowExecutionRequest(signalWithStartRequest)); return ResponseMapper.signalWithStartWorkflowExecutionResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -544,7 +544,7 @@ public SignalWithStartWorkflowExecutionAsyncResponse SignalWithStartWorkflowExec signalWithStartRequest)); return ResponseMapper.signalWithStartWorkflowExecutionAsyncResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -570,7 +570,7 @@ public ResetWorkflowExecutionResponse ResetWorkflowExecution( .resetWorkflowExecution(RequestMapper.resetWorkflowExecutionRequest(resetRequest)); return ResponseMapper.resetWorkflowExecutionResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -585,7 +585,7 @@ public void TerminateWorkflowExecution(TerminateWorkflowExecutionRequest termina .terminateWorkflowExecution( RequestMapper.terminateWorkflowExecutionRequest(terminateRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -602,7 +602,7 @@ public ListOpenWorkflowExecutionsResponse ListOpenWorkflowExecutions( RequestMapper.listOpenWorkflowExecutionsRequest(listRequest)); return ResponseMapper.listOpenWorkflowExecutionsResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -619,7 +619,7 @@ public ListClosedWorkflowExecutionsResponse ListClosedWorkflowExecutions( RequestMapper.listClosedWorkflowExecutionsRequest(listRequest)); return ResponseMapper.listClosedWorkflowExecutionsResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -635,7 +635,7 @@ public ListWorkflowExecutionsResponse ListWorkflowExecutions( .listWorkflowExecutions(RequestMapper.listWorkflowExecutionsRequest(listRequest)); return ResponseMapper.listWorkflowExecutionsResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -652,7 +652,7 @@ public ListArchivedWorkflowExecutionsResponse ListArchivedWorkflowExecutions( RequestMapper.listArchivedWorkflowExecutionsRequest(listRequest)); return ResponseMapper.listArchivedWorkflowExecutionsResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -668,7 +668,7 @@ public ListWorkflowExecutionsResponse ScanWorkflowExecutions( .scanWorkflowExecutions(RequestMapper.scanWorkflowExecutionsRequest(listRequest)); return ResponseMapper.scanWorkflowExecutionsResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -684,7 +684,7 @@ public CountWorkflowExecutionsResponse CountWorkflowExecutions( .countWorkflowExecutions(RequestMapper.countWorkflowExecutionsRequest(countRequest)); return ResponseMapper.countWorkflowExecutionsResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -698,7 +698,7 @@ public GetSearchAttributesResponse GetSearchAttributes() .getSearchAttributes(GetSearchAttributesRequest.newBuilder().build()); return ResponseMapper.getSearchAttributesResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -712,7 +712,7 @@ public void RespondQueryTaskCompleted(RespondQueryTaskCompletedRequest completeR .respondQueryTaskCompleted( RequestMapper.respondQueryTaskCompletedRequest(completeRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -728,7 +728,7 @@ public ResetStickyTaskListResponse ResetStickyTaskList(ResetStickyTaskListReques .resetStickyTaskList(RequestMapper.resetStickyTaskListRequest(resetRequest)); return new ResetStickyTaskListResponse(); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -743,7 +743,7 @@ public QueryWorkflowResponse QueryWorkflow(QueryWorkflowRequest queryRequest) .queryWorkflow(RequestMapper.queryWorkflowRequest(queryRequest)); return ResponseMapper.queryWorkflowResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -760,7 +760,7 @@ public DescribeWorkflowExecutionResponse DescribeWorkflowExecution( RequestMapper.describeWorkflowExecutionRequest(describeRequest)); return ResponseMapper.describeWorkflowExecutionResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -775,7 +775,7 @@ public DescribeTaskListResponse DescribeTaskList(DescribeTaskListRequest request .describeTaskList(RequestMapper.describeTaskListRequest(request)); return ResponseMapper.describeTaskListResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -788,7 +788,7 @@ public ClusterInfo GetClusterInfo() throws InternalServiceError, ServiceBusyErro .getClusterInfo(com.uber.cadence.api.v1.GetClusterInfoRequest.newBuilder().build()); return ResponseMapper.getClusterInfoResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -809,7 +809,7 @@ public ListTaskListPartitionsResponse ListTaskListPartitions( .listTaskListPartitions(RequestMapper.listTaskListPartitionsRequest(request)); return ResponseMapper.listTaskListPartitionsResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -823,7 +823,7 @@ public void RefreshWorkflowTasks(RefreshWorkflowTasksRequest request) .refreshWorkflowTasks( com.uber.cadence.api.v1.RefreshWorkflowTasksRequest.newBuilder().build()); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -903,7 +903,7 @@ public void StartWorkflowExecutionAsync( }, ForkJoinPool.commonPool()); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -1026,7 +1026,7 @@ public void SignalWorkflowExecution( }, ForkJoinPool.commonPool()); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -1222,7 +1222,7 @@ public void StartWorkflowExecutionWithTimeout( }, ForkJoinPool.commonPool()); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -1253,7 +1253,7 @@ public void StartWorkflowExecutionAsyncWithTimeout( }, ForkJoinPool.commonPool()); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -1269,7 +1269,7 @@ public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistoryWithTimeou RequestMapper.getWorkflowExecutionHistoryRequest(getRequest)); return ResponseMapper.getWorkflowExecutionHistoryResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -1299,7 +1299,7 @@ public void GetWorkflowExecutionHistoryWithTimeout( }, ForkJoinPool.commonPool()); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -1315,7 +1315,7 @@ public void SignalWorkflowExecutionWithTimeout( private void handleAsyncException(AsyncMethodCallback callback, Exception exception) { if (exception instanceof ExecutionException && exception.getCause() instanceof StatusRuntimeException) { - callback.onError(ErrorMapper.Error(((StatusRuntimeException) exception.getCause()))); + callback.onError(ErrorMapper.mapError(((StatusRuntimeException) exception.getCause()))); } else { callback.onError(exception); } diff --git a/src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java b/src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java index fdf5a2bab..3af41fd41 100644 --- a/src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java +++ b/src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java @@ -16,6 +16,8 @@ package com.uber.cadence.internal.compatibility.proto.serviceclient; import com.google.common.base.Strings; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.uber.cadence.api.v1.DomainAPIGrpc; import com.uber.cadence.api.v1.MetaAPIGrpc; import com.uber.cadence.api.v1.MetaAPIGrpc.MetaAPIBlockingStub; @@ -63,6 +65,8 @@ final class GrpcServiceStubs implements IGrpcServiceStubs { Metadata.Key.of("rpc-caller", Metadata.ASCII_STRING_MARSHALLER); private static final Metadata.Key RPC_ENCODING_HEADER_KEY = Metadata.Key.of("rpc-encoding", Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key CLIENT_FEATURE_FLAGS_HEADER_KEY = + Metadata.Key.of("cadence-client-feature-flags", Metadata.ASCII_STRING_MARSHALLER); private static final Metadata.Key AUTHORIZATION_HEADER_KEY = Metadata.Key.of("cadence-authorization", Metadata.ASCII_STRING_MARSHALLER); @@ -109,6 +113,12 @@ final class GrpcServiceStubs implements IGrpcServiceStubs { if (!Strings.isNullOrEmpty(options.getIsolationGroup())) { headers.put(ISOLATION_GROUP_HEADER_KEY, options.getIsolationGroup()); } + if (options.getFeatureFlags() != null) { + GsonBuilder gsonBuilder = new GsonBuilder(); + Gson gson = gsonBuilder.create(); + String serialized = gson.toJson(options.getFeatureFlags()); + headers.put(CLIENT_FEATURE_FLAGS_HEADER_KEY, serialized); + } mergeHeaders(headers, options.getHeaders()); Channel interceptedChannel = diff --git a/src/main/java/com/uber/cadence/internal/compatibility/thrift/ErrorMapper.java b/src/main/java/com/uber/cadence/internal/compatibility/thrift/ErrorMapper.java index 366c2e6ce..921493430 100644 --- a/src/main/java/com/uber/cadence/internal/compatibility/thrift/ErrorMapper.java +++ b/src/main/java/com/uber/cadence/internal/compatibility/thrift/ErrorMapper.java @@ -15,6 +15,10 @@ */ package com.uber.cadence.internal.compatibility.thrift; +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.rpc.Status; import com.uber.cadence.AccessDeniedError; import com.uber.cadence.CancellationAlreadyRequestedError; import com.uber.cadence.ClientVersionNotSupportedError; @@ -25,90 +29,207 @@ import com.uber.cadence.InternalDataInconsistencyError; import com.uber.cadence.InternalServiceError; import com.uber.cadence.LimitExceededError; +import com.uber.cadence.QueryFailedError; import com.uber.cadence.ServiceBusyError; import com.uber.cadence.WorkflowExecutionAlreadyCompletedError; import com.uber.cadence.WorkflowExecutionAlreadyStartedError; -import io.grpc.Metadata; import io.grpc.StatusRuntimeException; +import io.grpc.protobuf.StatusProto; +import java.util.function.BiFunction; import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ErrorMapper { + private static final Logger LOGGER = LoggerFactory.getLogger(ErrorMapper.class); - public static TException Error(StatusRuntimeException ex) { - String details = getErrorDetails(ex); - switch (ex.getStatus().getCode()) { + public static TException mapError(StatusRuntimeException ex) { + Status status = StatusProto.fromThrowable(ex); + // No details, fall back to code mapping + if (status == null || status.getDetailsCount() == 0) { + return fromCode(ex); + } + String message = status.getMessage(); + Any firstDetail = status.getDetails(0); + + if (firstDetail.is(com.uber.cadence.api.v1.WorkflowExecutionAlreadyStartedError.class)) { + return mapDetails( + com.uber.cadence.api.v1.WorkflowExecutionAlreadyStartedError.class, + message, + firstDetail, + ErrorMapper::mapWorkflowExecutionAlreadyStarted); + } else if (firstDetail.is(com.uber.cadence.api.v1.EntityNotExistsError.class)) { + return mapDetails( + com.uber.cadence.api.v1.EntityNotExistsError.class, + message, + firstDetail, + ErrorMapper::mapEntityNotExistsError); + } else if (firstDetail.is( + com.uber.cadence.api.v1.WorkflowExecutionAlreadyCompletedError.class)) { + return mapDetails( + com.uber.cadence.api.v1.WorkflowExecutionAlreadyCompletedError.class, + message, + firstDetail, + ErrorMapper::mapWorkflowExecutionAlreadyCompleted); + } else if (firstDetail.is(com.uber.cadence.api.v1.DomainNotActiveError.class)) { + return mapDetails( + com.uber.cadence.api.v1.DomainNotActiveError.class, + message, + firstDetail, + ErrorMapper::mapDomainNotActive); + } else if (firstDetail.is(com.uber.cadence.api.v1.ClientVersionNotSupportedError.class)) { + return mapDetails( + com.uber.cadence.api.v1.ClientVersionNotSupportedError.class, + message, + firstDetail, + ErrorMapper::mapClientVersionNotSupported); + } else if (firstDetail.is(com.uber.cadence.api.v1.FeatureNotEnabledError.class)) { + return mapDetails( + com.uber.cadence.api.v1.FeatureNotEnabledError.class, + message, + firstDetail, + ErrorMapper::mapFeatureNotEnabled); + } else if (firstDetail.is(com.uber.cadence.api.v1.CancellationAlreadyRequestedError.class)) { + return mapDetails( + com.uber.cadence.api.v1.CancellationAlreadyRequestedError.class, + message, + firstDetail, + ErrorMapper::mapCancellationAlreadyRequested); + } else if (firstDetail.is(com.uber.cadence.api.v1.DomainAlreadyExistsError.class)) { + return mapDetails( + com.uber.cadence.api.v1.DomainAlreadyExistsError.class, + message, + firstDetail, + ErrorMapper::mapDomainAlreadyExists); + } else if (firstDetail.is(com.uber.cadence.api.v1.LimitExceededError.class)) { + return mapDetails( + com.uber.cadence.api.v1.LimitExceededError.class, + message, + firstDetail, + ErrorMapper::mapLimitExceeded); + } else if (firstDetail.is(com.uber.cadence.api.v1.QueryFailedError.class)) { + return mapDetails( + com.uber.cadence.api.v1.QueryFailedError.class, + message, + firstDetail, + ErrorMapper::mapQueryFailed); + } else if (firstDetail.is(com.uber.cadence.api.v1.ServiceBusyError.class)) { + return mapDetails( + com.uber.cadence.api.v1.ServiceBusyError.class, + message, + firstDetail, + ErrorMapper::mapServiceBusy); + } + + // It has details, but they're unhandled. Fall back to the error code mapping + return fromCode(ex); + } + + private static TException fromCode(StatusRuntimeException exception) { + String message = exception.getStatus().getDescription(); + switch (exception.getStatus().getCode()) { case PERMISSION_DENIED: - return new AccessDeniedError(ex.getMessage()); + return new AccessDeniedError(message); case INTERNAL: - return new InternalServiceError(ex.getMessage()); - case NOT_FOUND: - { - if ("EntityNotExistsError".equals(details) - && ex.getMessage().contains("already completed.")) { - return new WorkflowExecutionAlreadyCompletedError(ex.getMessage()); - } else { - // TODO add cluster info - return new EntityNotExistsError(ex.getMessage()); - } - } - case ALREADY_EXISTS: - { - switch (details) { - case "CancellationAlreadyRequestedError": - return new CancellationAlreadyRequestedError(ex.getMessage()); - case "DomainAlreadyExistsError": - return new DomainAlreadyExistsError(ex.getMessage()); - case "WorkflowExecutionAlreadyStartedError": - { - // TODO add started wf info - WorkflowExecutionAlreadyStartedError e = new WorkflowExecutionAlreadyStartedError(); - e.setMessage(ex.getMessage()); - return e; - } - } - } + return new InternalServiceError(message); case DATA_LOSS: - return new InternalDataInconsistencyError(ex.getMessage()); - case FAILED_PRECONDITION: - switch (details) { - // TODO add infos - case "ClientVersionNotSupportedError": - return new ClientVersionNotSupportedError(); - case "FeatureNotEnabledError": - return new FeatureNotEnabledError(); - case "DomainNotActiveError": - { - DomainNotActiveError e = new DomainNotActiveError(); - e.setMessage(ex.getMessage()); - return e; - } - } - case RESOURCE_EXHAUSTED: - switch (details) { - case "LimitExceededError": - return new LimitExceededError(ex.getMessage()); - case "ServiceBusyError": - return new ServiceBusyError(ex.getMessage()); - } - case UNKNOWN: - return new TException(ex); + return new InternalDataInconsistencyError(message); default: - // If error does not match anything, return raw grpc status error - // There are some code that casts error to grpc status to check for deadline exceeded status - return new TException(ex); + return new TException(exception); } } - static String getErrorDetails(StatusRuntimeException ex) { - { - Metadata trailer = ex.getTrailers(); - Metadata.Key key = - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER); - if (trailer != null && trailer.containsKey(key)) { - return trailer.get(key); + private static TException mapDetails( + Class detailsType, + String message, + Any any, + BiFunction mapper) { + try { + if (any.is(detailsType)) { + T details = any.unpack(detailsType); + return mapper.apply(message, details); } else { - return ""; + LOGGER.error( + "Failed to decode exception. Message of type: {} was not: {}", + any.getTypeUrl(), + detailsType.getCanonicalName()); + return new TException(message); } + } catch (InvalidProtocolBufferException ex) { + LOGGER.error( + "Failed to decode exception of type: {} as {}", + any.getTypeUrl(), + detailsType.getCanonicalName(), + ex); + return new TException(message); } } + + private static WorkflowExecutionAlreadyStartedError mapWorkflowExecutionAlreadyStarted( + String message, com.uber.cadence.api.v1.WorkflowExecutionAlreadyStartedError protoErr) { + return new WorkflowExecutionAlreadyStartedError() + .setMessage(message) + .setStartRequestId(protoErr.getStartRequestId()) + .setRunId(protoErr.getRunId()); + } + + private static EntityNotExistsError mapEntityNotExistsError( + String message, com.uber.cadence.api.v1.EntityNotExistsError protoErr) { + return new EntityNotExistsError() + .setMessage(message) + .setActiveCluster(protoErr.getActiveCluster()) + .setCurrentCluster(protoErr.getCurrentCluster()); + } + + private static WorkflowExecutionAlreadyCompletedError mapWorkflowExecutionAlreadyCompleted( + String message, com.uber.cadence.api.v1.WorkflowExecutionAlreadyCompletedError protoErr) { + return new WorkflowExecutionAlreadyCompletedError().setMessage(message); + } + + private static DomainNotActiveError mapDomainNotActive( + String message, com.uber.cadence.api.v1.DomainNotActiveError protoErr) { + return new DomainNotActiveError() + .setMessage(message) + .setDomainName(protoErr.getDomain()) + .setActiveCluster(protoErr.getActiveCluster()) + .setCurrentCluster(protoErr.getCurrentCluster()); + } + + private static ClientVersionNotSupportedError mapClientVersionNotSupported( + String message, com.uber.cadence.api.v1.ClientVersionNotSupportedError protoErr) { + return new ClientVersionNotSupportedError() + .setFeatureVersion(protoErr.getFeatureVersion()) + .setClientImpl(protoErr.getClientImpl()) + .setSupportedVersions(protoErr.getSupportedVersions()); + } + + private static FeatureNotEnabledError mapFeatureNotEnabled( + String message, com.uber.cadence.api.v1.FeatureNotEnabledError protoErr) { + return new FeatureNotEnabledError().setFeatureFlag(protoErr.getFeatureFlag()); + } + + private static CancellationAlreadyRequestedError mapCancellationAlreadyRequested( + String message, com.uber.cadence.api.v1.CancellationAlreadyRequestedError protoErr) { + return new CancellationAlreadyRequestedError().setMessage(message); + } + + private static DomainAlreadyExistsError mapDomainAlreadyExists( + String message, com.uber.cadence.api.v1.DomainAlreadyExistsError protoErr) { + return new DomainAlreadyExistsError().setMessage(message); + } + + private static LimitExceededError mapLimitExceeded( + String message, com.uber.cadence.api.v1.LimitExceededError protoErr) { + return new LimitExceededError().setMessage(message); + } + + private static QueryFailedError mapQueryFailed( + String message, com.uber.cadence.api.v1.QueryFailedError protoErr) { + return new QueryFailedError().setMessage(message); + } + + private static ServiceBusyError mapServiceBusy( + String message, com.uber.cadence.api.v1.ServiceBusyError protoErr) { + return new ServiceBusyError().setMessage(message); + } } diff --git a/src/test/java/com/uber/cadence/internal/compatibility/thrift/ErrorMapperTest.java b/src/test/java/com/uber/cadence/internal/compatibility/thrift/ErrorMapperTest.java index 810f1166c..1352c7769 100644 --- a/src/test/java/com/uber/cadence/internal/compatibility/thrift/ErrorMapperTest.java +++ b/src/test/java/com/uber/cadence/internal/compatibility/thrift/ErrorMapperTest.java @@ -14,8 +14,10 @@ */ package com.uber.cadence.internal.compatibility.thrift; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import com.google.protobuf.Any; +import com.google.protobuf.Message; import com.uber.cadence.AccessDeniedError; import com.uber.cadence.CancellationAlreadyRequestedError; import com.uber.cadence.ClientVersionNotSupportedError; @@ -23,165 +25,204 @@ import com.uber.cadence.DomainNotActiveError; import com.uber.cadence.EntityNotExistsError; import com.uber.cadence.FeatureNotEnabledError; +import com.uber.cadence.InternalDataInconsistencyError; import com.uber.cadence.InternalServiceError; import com.uber.cadence.LimitExceededError; +import com.uber.cadence.QueryFailedError; import com.uber.cadence.ServiceBusyError; import com.uber.cadence.WorkflowExecutionAlreadyCompletedError; import com.uber.cadence.WorkflowExecutionAlreadyStartedError; -import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusRuntimeException; -import org.apache.thrift.TException; +import io.grpc.protobuf.StatusProto; import org.junit.Test; public class ErrorMapperTest { @Test public void testPermissionDeniedError() { - StatusRuntimeException ex = - new StatusRuntimeException(Status.PERMISSION_DENIED.withDescription("Access denied")); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof AccessDeniedError); + assertEquals( + new AccessDeniedError("no access"), + ErrorMapper.mapError(toException(Status.Code.PERMISSION_DENIED, "no access"))); } @Test public void testInternalServiceError() { - StatusRuntimeException ex = - new StatusRuntimeException(Status.INTERNAL.withDescription("Internal error")); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof InternalServiceError); + assertEquals( + new InternalServiceError("no bueno"), + ErrorMapper.mapError(toException(Status.Code.INTERNAL, "no bueno"))); } @Test - public void testWorkflowExecutionAlreadyCompletedError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "EntityNotExistsError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.NOT_FOUND.withDescription("already completed."), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof WorkflowExecutionAlreadyCompletedError); + public void testInternalDataInconsistencyError() { + assertEquals( + new InternalDataInconsistencyError("no data"), + ErrorMapper.mapError(toException(Status.Code.DATA_LOSS, "no data"))); } @Test - public void testEntityNotExistsError() { - StatusRuntimeException ex = - new StatusRuntimeException(Status.NOT_FOUND.withDescription("Entity not found")); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof EntityNotExistsError); + public void testWorkflowExecutionAlreadyCompletedError() { + assertEquals( + new WorkflowExecutionAlreadyCompletedError().setMessage("done"), + ErrorMapper.mapError( + toException( + Status.Code.NOT_FOUND, + "done", + com.uber.cadence.api.v1.WorkflowExecutionAlreadyCompletedError + .getDefaultInstance()))); } @Test - public void testCancellationAlreadyRequestedError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "CancellationAlreadyRequestedError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.ALREADY_EXISTS.withDescription("Cancellation already requested"), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof CancellationAlreadyRequestedError); + public void testWorkflowExecutionAlreadyStartedError() { + assertEquals( + new WorkflowExecutionAlreadyStartedError() + .setMessage("already started") + .setStartRequestId("start-request-id") + .setRunId("run-id"), + ErrorMapper.mapError( + toException( + Status.Code.ALREADY_EXISTS, + "already started", + com.uber.cadence.api.v1.WorkflowExecutionAlreadyStartedError.newBuilder() + .setStartRequestId("start-request-id") + .setRunId("run-id") + .build()))); } @Test - public void testDomainAlreadyExistsError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "DomainAlreadyExistsError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.ALREADY_EXISTS.withDescription("Domain already exists"), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof DomainAlreadyExistsError); + public void testEntityNotExistsError() { + assertEquals( + new EntityNotExistsError() + .setMessage("not found") + .setActiveCluster("active-cluster") + .setCurrentCluster("current-cluster"), + ErrorMapper.mapError( + toException( + Status.Code.NOT_FOUND, + "not found", + com.uber.cadence.api.v1.EntityNotExistsError.newBuilder() + .setActiveCluster("active-cluster") + .setCurrentCluster("current-cluster") + .build()))); } @Test - public void testWorkflowExecutionAlreadyStartedError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "WorkflowExecutionAlreadyStartedError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.ALREADY_EXISTS.withDescription("Workflow already started"), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof WorkflowExecutionAlreadyStartedError); + public void testDomainNotActiveError() { + assertEquals( + new DomainNotActiveError() + .setMessage("domain inactive") + .setDomainName("domain-name") + .setActiveCluster("active-cluster") + .setCurrentCluster("current-cluster"), + ErrorMapper.mapError( + toException( + Status.Code.FAILED_PRECONDITION, + "domain inactive", + com.uber.cadence.api.v1.DomainNotActiveError.newBuilder() + .setDomain("domain-name") + .setActiveCluster("active-cluster") + .setCurrentCluster("current-cluster") + .build()))); } @Test public void testClientVersionNotSupportedError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "ClientVersionNotSupportedError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.FAILED_PRECONDITION.withDescription("Client version not supported"), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof ClientVersionNotSupportedError); + assertEquals( + new ClientVersionNotSupportedError() + .setFeatureVersion("feature-version") + .setClientImpl("client-impl") + .setSupportedVersions("supported-versions"), + ErrorMapper.mapError( + toException( + Status.Code.FAILED_PRECONDITION, + "unsupported version", + com.uber.cadence.api.v1.ClientVersionNotSupportedError.newBuilder() + .setFeatureVersion("feature-version") + .setClientImpl("client-impl") + .setSupportedVersions("supported-versions") + .build()))); } @Test public void testFeatureNotEnabledError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "FeatureNotEnabledError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.FAILED_PRECONDITION.withDescription("Feature not enabled"), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof FeatureNotEnabledError); + assertEquals( + new FeatureNotEnabledError().setFeatureFlag("feature-flag"), + ErrorMapper.mapError( + toException( + Status.Code.FAILED_PRECONDITION, + "feature not enabled", + com.uber.cadence.api.v1.FeatureNotEnabledError.newBuilder() + .setFeatureFlag("feature-flag") + .build()))); } @Test - public void testDomainNotActiveError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "DomainNotActiveError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.FAILED_PRECONDITION.withDescription("Domain not active"), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof DomainNotActiveError); + public void testCancellationAlreadyRequestedError() { + assertEquals( + new CancellationAlreadyRequestedError().setMessage("cancellation requested"), + ErrorMapper.mapError( + toException( + Status.Code.FAILED_PRECONDITION, + "cancellation requested", + com.uber.cadence.api.v1.CancellationAlreadyRequestedError.newBuilder().build()))); + } + + @Test + public void testDomainAlreadyExistsError() { + assertEquals( + new DomainAlreadyExistsError().setMessage("domain exists"), + ErrorMapper.mapError( + toException( + Status.Code.ALREADY_EXISTS, + "domain exists", + com.uber.cadence.api.v1.DomainAlreadyExistsError.newBuilder().build()))); } @Test public void testLimitExceededError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "LimitExceededError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.RESOURCE_EXHAUSTED.withDescription("Limit exceeded"), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof LimitExceededError); + assertEquals( + new LimitExceededError().setMessage("limit exceeded"), + ErrorMapper.mapError( + toException( + Status.Code.RESOURCE_EXHAUSTED, + "limit exceeded", + com.uber.cadence.api.v1.LimitExceededError.newBuilder().build()))); } @Test - public void testServiceBusyError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "ServiceBusyError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.RESOURCE_EXHAUSTED.withDescription("Service busy"), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof ServiceBusyError); + public void testQueryFailedError() { + assertEquals( + new QueryFailedError().setMessage("query failed"), + ErrorMapper.mapError( + toException( + Status.Code.INVALID_ARGUMENT, + "query failed", + com.uber.cadence.api.v1.QueryFailedError.newBuilder().build()))); } @Test - public void testUnknownError() { - StatusRuntimeException ex = - new StatusRuntimeException(Status.UNKNOWN.withDescription("Unknown error")); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof TException); + public void testServiceBusyError() { + assertEquals( + new ServiceBusyError().setMessage("service busy"), + ErrorMapper.mapError( + toException( + Status.Code.UNAVAILABLE, + "service busy", + com.uber.cadence.api.v1.ServiceBusyError.newBuilder().build()))); + } + + private static StatusRuntimeException toException(Status.Code code, String message) { + return StatusProto.toStatusRuntimeException( + com.google.rpc.Status.newBuilder().setCode(code.value()).setMessage(message).build()); + } + + private static StatusRuntimeException toException( + Status.Code code, String message, Message details) { + return StatusProto.toStatusRuntimeException( + com.google.rpc.Status.newBuilder() + .setCode(code.value()) + .addDetails(Any.pack(details)) + .setMessage(message) + .build()); } } diff --git a/src/test/java/com/uber/cadence/testUtils/TestEnvironment.java b/src/test/java/com/uber/cadence/testUtils/TestEnvironment.java index 08faf2c67..7b188b949 100644 --- a/src/test/java/com/uber/cadence/testUtils/TestEnvironment.java +++ b/src/test/java/com/uber/cadence/testUtils/TestEnvironment.java @@ -14,6 +14,7 @@ */ package com.uber.cadence.testUtils; +import com.uber.cadence.FeatureFlags; import com.uber.cadence.internal.compatibility.Thrift2ProtoAdapter; import com.uber.cadence.internal.compatibility.proto.serviceclient.IGrpcServiceStubs; import com.uber.cadence.serviceclient.ClientOptions; @@ -43,6 +44,11 @@ public static boolean isUseDockerService() { public static IWorkflowService getDockerService() { return new Thrift2ProtoAdapter( - IGrpcServiceStubs.newInstance(ClientOptions.newBuilder().setPort(7833).build())); + IGrpcServiceStubs.newInstance( + ClientOptions.newBuilder() + .setPort(7833) + .setFeatureFlags( + new FeatureFlags().setWorkflowExecutionAlreadyCompletedErrorEnabled(true)) + .build())); } }