diff --git a/src/main/java/com/uber/cadence/internal/compatibility/Thrift2ProtoAdapter.java b/src/main/java/com/uber/cadence/internal/compatibility/Thrift2ProtoAdapter.java index b4bae6557..2034c4df0 100644 --- a/src/main/java/com/uber/cadence/internal/compatibility/Thrift2ProtoAdapter.java +++ b/src/main/java/com/uber/cadence/internal/compatibility/Thrift2ProtoAdapter.java @@ -139,7 +139,7 @@ public void RegisterDomain(RegisterDomainRequest registerRequest) .domainBlockingStub() .registerDomain(RequestMapper.registerDomainRequest(registerRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -154,7 +154,7 @@ public DescribeDomainResponse DescribeDomain(DescribeDomainRequest describeReque .describeDomain(RequestMapper.describeDomainRequest(describeRequest)); return ResponseMapper.describeDomainResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -177,7 +177,7 @@ public ListDomainsResponse ListDomains(ListDomainsRequest listRequest) .listDomains(RequestMapper.listDomainsRequest(listRequest)); return ResponseMapper.listDomainsResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -192,7 +192,7 @@ public UpdateDomainResponse UpdateDomain(UpdateDomainRequest updateRequest) .updateDomain(RequestMapper.updateDomainRequest(updateRequest)); return ResponseMapper.updateDomainResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -205,7 +205,7 @@ public void DeprecateDomain(DeprecateDomainRequest deprecateRequest) .domainBlockingStub() .deprecateDomain(RequestMapper.deprecateDomainRequest(deprecateRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -241,7 +241,7 @@ public StartWorkflowExecutionAsyncResponse StartWorkflowExecutionAsync( RequestMapper.startWorkflowExecutionAsyncRequest(startRequest)); return ResponseMapper.startWorkflowExecutionAsyncResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -258,7 +258,7 @@ private StartWorkflowExecutionResponse startWorkflowExecution( .startWorkflowExecution(RequestMapper.startWorkflowExecutionRequest(startRequest)); return ResponseMapper.startWorkflowExecutionResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -281,7 +281,7 @@ public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistory( RequestMapper.getWorkflowExecutionHistoryRequest(getRequest)); return ResponseMapper.getWorkflowExecutionHistoryResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -296,7 +296,7 @@ public PollForDecisionTaskResponse PollForDecisionTask(PollForDecisionTaskReques .pollForDecisionTask(RequestMapper.pollForDecisionTaskRequest(pollRequest)); return ResponseMapper.pollForDecisionTaskResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -314,7 +314,7 @@ public RespondDecisionTaskCompletedResponse RespondDecisionTaskCompleted( RequestMapper.respondDecisionTaskCompletedRequest(completeRequest)); return ResponseMapper.respondDecisionTaskCompletedResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -328,7 +328,7 @@ public void RespondDecisionTaskFailed(RespondDecisionTaskFailedRequest failedReq .workerBlockingStub() .respondDecisionTaskFailed(RequestMapper.respondDecisionTaskFailedRequest(failedRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -343,7 +343,7 @@ public PollForActivityTaskResponse PollForActivityTask(PollForActivityTaskReques .pollForActivityTask(RequestMapper.pollForActivityTaskRequest(pollRequest)); return ResponseMapper.pollForActivityTaskResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -361,7 +361,7 @@ public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeat( RequestMapper.recordActivityTaskHeartbeatRequest(heartbeatRequest)); return ResponseMapper.recordActivityTaskHeartbeatResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -379,7 +379,7 @@ public RecordActivityTaskHeartbeatResponse RecordActivityTaskHeartbeatByID( RequestMapper.recordActivityTaskHeartbeatByIdRequest(heartbeatRequest)); return ResponseMapper.recordActivityTaskHeartbeatByIdResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -394,7 +394,7 @@ public void RespondActivityTaskCompleted(RespondActivityTaskCompletedRequest com .respondActivityTaskCompleted( RequestMapper.respondActivityTaskCompletedRequest(completeRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -410,7 +410,7 @@ public void RespondActivityTaskCompletedByID( .respondActivityTaskCompletedByID( RequestMapper.respondActivityTaskCompletedByIdRequest(completeRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -424,7 +424,7 @@ public void RespondActivityTaskFailed(RespondActivityTaskFailedRequest failReque .workerBlockingStub() .respondActivityTaskFailed(RequestMapper.respondActivityTaskFailedRequest(failRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -439,7 +439,7 @@ public void RespondActivityTaskFailedByID(RespondActivityTaskFailedByIDRequest f .respondActivityTaskFailedByID( RequestMapper.respondActivityTaskFailedByIdRequest(failRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -454,7 +454,7 @@ public void RespondActivityTaskCanceled(RespondActivityTaskCanceledRequest cance .respondActivityTaskCanceled( RequestMapper.respondActivityTaskCanceledRequest(canceledRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -470,7 +470,7 @@ public void RespondActivityTaskCanceledByID( .respondActivityTaskCanceledByID( RequestMapper.respondActivityTaskCanceledByIdRequest(canceledRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -488,7 +488,7 @@ public void RequestCancelWorkflowExecution(RequestCancelWorkflowExecutionRequest .requestCancelWorkflowExecution( RequestMapper.requestCancelWorkflowExecutionRequest(cancelRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -505,7 +505,7 @@ public void SignalWorkflowExecution(SignalWorkflowExecutionRequest signalRequest .workflowBlockingStub() .signalWorkflowExecution(RequestMapper.signalWorkflowExecutionRequest(signalRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -524,7 +524,7 @@ public StartWorkflowExecutionResponse SignalWithStartWorkflowExecution( RequestMapper.signalWithStartWorkflowExecutionRequest(signalWithStartRequest)); return ResponseMapper.signalWithStartWorkflowExecutionResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -544,7 +544,7 @@ public SignalWithStartWorkflowExecutionAsyncResponse SignalWithStartWorkflowExec signalWithStartRequest)); return ResponseMapper.signalWithStartWorkflowExecutionAsyncResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -570,7 +570,7 @@ public ResetWorkflowExecutionResponse ResetWorkflowExecution( .resetWorkflowExecution(RequestMapper.resetWorkflowExecutionRequest(resetRequest)); return ResponseMapper.resetWorkflowExecutionResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -585,7 +585,7 @@ public void TerminateWorkflowExecution(TerminateWorkflowExecutionRequest termina .terminateWorkflowExecution( RequestMapper.terminateWorkflowExecutionRequest(terminateRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -602,7 +602,7 @@ public ListOpenWorkflowExecutionsResponse ListOpenWorkflowExecutions( RequestMapper.listOpenWorkflowExecutionsRequest(listRequest)); return ResponseMapper.listOpenWorkflowExecutionsResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -619,7 +619,7 @@ public ListClosedWorkflowExecutionsResponse ListClosedWorkflowExecutions( RequestMapper.listClosedWorkflowExecutionsRequest(listRequest)); return ResponseMapper.listClosedWorkflowExecutionsResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -635,7 +635,7 @@ public ListWorkflowExecutionsResponse ListWorkflowExecutions( .listWorkflowExecutions(RequestMapper.listWorkflowExecutionsRequest(listRequest)); return ResponseMapper.listWorkflowExecutionsResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -652,7 +652,7 @@ public ListArchivedWorkflowExecutionsResponse ListArchivedWorkflowExecutions( RequestMapper.listArchivedWorkflowExecutionsRequest(listRequest)); return ResponseMapper.listArchivedWorkflowExecutionsResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -668,7 +668,7 @@ public ListWorkflowExecutionsResponse ScanWorkflowExecutions( .scanWorkflowExecutions(RequestMapper.scanWorkflowExecutionsRequest(listRequest)); return ResponseMapper.scanWorkflowExecutionsResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -684,7 +684,7 @@ public CountWorkflowExecutionsResponse CountWorkflowExecutions( .countWorkflowExecutions(RequestMapper.countWorkflowExecutionsRequest(countRequest)); return ResponseMapper.countWorkflowExecutionsResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -698,7 +698,7 @@ public GetSearchAttributesResponse GetSearchAttributes() .getSearchAttributes(GetSearchAttributesRequest.newBuilder().build()); return ResponseMapper.getSearchAttributesResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -712,7 +712,7 @@ public void RespondQueryTaskCompleted(RespondQueryTaskCompletedRequest completeR .respondQueryTaskCompleted( RequestMapper.respondQueryTaskCompletedRequest(completeRequest)); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -728,7 +728,7 @@ public ResetStickyTaskListResponse ResetStickyTaskList(ResetStickyTaskListReques .resetStickyTaskList(RequestMapper.resetStickyTaskListRequest(resetRequest)); return new ResetStickyTaskListResponse(); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -743,7 +743,7 @@ public QueryWorkflowResponse QueryWorkflow(QueryWorkflowRequest queryRequest) .queryWorkflow(RequestMapper.queryWorkflowRequest(queryRequest)); return ResponseMapper.queryWorkflowResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -760,7 +760,7 @@ public DescribeWorkflowExecutionResponse DescribeWorkflowExecution( RequestMapper.describeWorkflowExecutionRequest(describeRequest)); return ResponseMapper.describeWorkflowExecutionResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -775,7 +775,7 @@ public DescribeTaskListResponse DescribeTaskList(DescribeTaskListRequest request .describeTaskList(RequestMapper.describeTaskListRequest(request)); return ResponseMapper.describeTaskListResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -788,7 +788,7 @@ public ClusterInfo GetClusterInfo() throws InternalServiceError, ServiceBusyErro .getClusterInfo(com.uber.cadence.api.v1.GetClusterInfoRequest.newBuilder().build()); return ResponseMapper.getClusterInfoResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -809,7 +809,7 @@ public ListTaskListPartitionsResponse ListTaskListPartitions( .listTaskListPartitions(RequestMapper.listTaskListPartitionsRequest(request)); return ResponseMapper.listTaskListPartitionsResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -823,7 +823,7 @@ public void RefreshWorkflowTasks(RefreshWorkflowTasksRequest request) .refreshWorkflowTasks( com.uber.cadence.api.v1.RefreshWorkflowTasksRequest.newBuilder().build()); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -903,7 +903,7 @@ public void StartWorkflowExecutionAsync( }, ForkJoinPool.commonPool()); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -1026,7 +1026,7 @@ public void SignalWorkflowExecution( }, ForkJoinPool.commonPool()); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -1222,7 +1222,7 @@ public void StartWorkflowExecutionWithTimeout( }, ForkJoinPool.commonPool()); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -1253,7 +1253,7 @@ public void StartWorkflowExecutionAsyncWithTimeout( }, ForkJoinPool.commonPool()); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -1269,7 +1269,7 @@ public GetWorkflowExecutionHistoryResponse GetWorkflowExecutionHistoryWithTimeou RequestMapper.getWorkflowExecutionHistoryRequest(getRequest)); return ResponseMapper.getWorkflowExecutionHistoryResponse(response); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -1299,7 +1299,7 @@ public void GetWorkflowExecutionHistoryWithTimeout( }, ForkJoinPool.commonPool()); } catch (StatusRuntimeException e) { - throw ErrorMapper.Error(e); + throw ErrorMapper.mapError(e); } } @@ -1315,7 +1315,7 @@ public void SignalWorkflowExecutionWithTimeout( private void handleAsyncException(AsyncMethodCallback callback, Exception exception) { if (exception instanceof ExecutionException && exception.getCause() instanceof StatusRuntimeException) { - callback.onError(ErrorMapper.Error(((StatusRuntimeException) exception.getCause()))); + callback.onError(ErrorMapper.mapError(((StatusRuntimeException) exception.getCause()))); } else { callback.onError(exception); } diff --git a/src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java b/src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java index fdf5a2bab..3af41fd41 100644 --- a/src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java +++ b/src/main/java/com/uber/cadence/internal/compatibility/proto/serviceclient/GrpcServiceStubs.java @@ -16,6 +16,8 @@ package com.uber.cadence.internal.compatibility.proto.serviceclient; import com.google.common.base.Strings; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.uber.cadence.api.v1.DomainAPIGrpc; import com.uber.cadence.api.v1.MetaAPIGrpc; import com.uber.cadence.api.v1.MetaAPIGrpc.MetaAPIBlockingStub; @@ -63,6 +65,8 @@ final class GrpcServiceStubs implements IGrpcServiceStubs { Metadata.Key.of("rpc-caller", Metadata.ASCII_STRING_MARSHALLER); private static final Metadata.Key RPC_ENCODING_HEADER_KEY = Metadata.Key.of("rpc-encoding", Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key CLIENT_FEATURE_FLAGS_HEADER_KEY = + Metadata.Key.of("cadence-client-feature-flags", Metadata.ASCII_STRING_MARSHALLER); private static final Metadata.Key AUTHORIZATION_HEADER_KEY = Metadata.Key.of("cadence-authorization", Metadata.ASCII_STRING_MARSHALLER); @@ -109,6 +113,12 @@ final class GrpcServiceStubs implements IGrpcServiceStubs { if (!Strings.isNullOrEmpty(options.getIsolationGroup())) { headers.put(ISOLATION_GROUP_HEADER_KEY, options.getIsolationGroup()); } + if (options.getFeatureFlags() != null) { + GsonBuilder gsonBuilder = new GsonBuilder(); + Gson gson = gsonBuilder.create(); + String serialized = gson.toJson(options.getFeatureFlags()); + headers.put(CLIENT_FEATURE_FLAGS_HEADER_KEY, serialized); + } mergeHeaders(headers, options.getHeaders()); Channel interceptedChannel = diff --git a/src/main/java/com/uber/cadence/internal/compatibility/thrift/ErrorMapper.java b/src/main/java/com/uber/cadence/internal/compatibility/thrift/ErrorMapper.java index 366c2e6ce..921493430 100644 --- a/src/main/java/com/uber/cadence/internal/compatibility/thrift/ErrorMapper.java +++ b/src/main/java/com/uber/cadence/internal/compatibility/thrift/ErrorMapper.java @@ -15,6 +15,10 @@ */ package com.uber.cadence.internal.compatibility.thrift; +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.rpc.Status; import com.uber.cadence.AccessDeniedError; import com.uber.cadence.CancellationAlreadyRequestedError; import com.uber.cadence.ClientVersionNotSupportedError; @@ -25,90 +29,207 @@ import com.uber.cadence.InternalDataInconsistencyError; import com.uber.cadence.InternalServiceError; import com.uber.cadence.LimitExceededError; +import com.uber.cadence.QueryFailedError; import com.uber.cadence.ServiceBusyError; import com.uber.cadence.WorkflowExecutionAlreadyCompletedError; import com.uber.cadence.WorkflowExecutionAlreadyStartedError; -import io.grpc.Metadata; import io.grpc.StatusRuntimeException; +import io.grpc.protobuf.StatusProto; +import java.util.function.BiFunction; import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ErrorMapper { + private static final Logger LOGGER = LoggerFactory.getLogger(ErrorMapper.class); - public static TException Error(StatusRuntimeException ex) { - String details = getErrorDetails(ex); - switch (ex.getStatus().getCode()) { + public static TException mapError(StatusRuntimeException ex) { + Status status = StatusProto.fromThrowable(ex); + // No details, fall back to code mapping + if (status == null || status.getDetailsCount() == 0) { + return fromCode(ex); + } + String message = status.getMessage(); + Any firstDetail = status.getDetails(0); + + if (firstDetail.is(com.uber.cadence.api.v1.WorkflowExecutionAlreadyStartedError.class)) { + return mapDetails( + com.uber.cadence.api.v1.WorkflowExecutionAlreadyStartedError.class, + message, + firstDetail, + ErrorMapper::mapWorkflowExecutionAlreadyStarted); + } else if (firstDetail.is(com.uber.cadence.api.v1.EntityNotExistsError.class)) { + return mapDetails( + com.uber.cadence.api.v1.EntityNotExistsError.class, + message, + firstDetail, + ErrorMapper::mapEntityNotExistsError); + } else if (firstDetail.is( + com.uber.cadence.api.v1.WorkflowExecutionAlreadyCompletedError.class)) { + return mapDetails( + com.uber.cadence.api.v1.WorkflowExecutionAlreadyCompletedError.class, + message, + firstDetail, + ErrorMapper::mapWorkflowExecutionAlreadyCompleted); + } else if (firstDetail.is(com.uber.cadence.api.v1.DomainNotActiveError.class)) { + return mapDetails( + com.uber.cadence.api.v1.DomainNotActiveError.class, + message, + firstDetail, + ErrorMapper::mapDomainNotActive); + } else if (firstDetail.is(com.uber.cadence.api.v1.ClientVersionNotSupportedError.class)) { + return mapDetails( + com.uber.cadence.api.v1.ClientVersionNotSupportedError.class, + message, + firstDetail, + ErrorMapper::mapClientVersionNotSupported); + } else if (firstDetail.is(com.uber.cadence.api.v1.FeatureNotEnabledError.class)) { + return mapDetails( + com.uber.cadence.api.v1.FeatureNotEnabledError.class, + message, + firstDetail, + ErrorMapper::mapFeatureNotEnabled); + } else if (firstDetail.is(com.uber.cadence.api.v1.CancellationAlreadyRequestedError.class)) { + return mapDetails( + com.uber.cadence.api.v1.CancellationAlreadyRequestedError.class, + message, + firstDetail, + ErrorMapper::mapCancellationAlreadyRequested); + } else if (firstDetail.is(com.uber.cadence.api.v1.DomainAlreadyExistsError.class)) { + return mapDetails( + com.uber.cadence.api.v1.DomainAlreadyExistsError.class, + message, + firstDetail, + ErrorMapper::mapDomainAlreadyExists); + } else if (firstDetail.is(com.uber.cadence.api.v1.LimitExceededError.class)) { + return mapDetails( + com.uber.cadence.api.v1.LimitExceededError.class, + message, + firstDetail, + ErrorMapper::mapLimitExceeded); + } else if (firstDetail.is(com.uber.cadence.api.v1.QueryFailedError.class)) { + return mapDetails( + com.uber.cadence.api.v1.QueryFailedError.class, + message, + firstDetail, + ErrorMapper::mapQueryFailed); + } else if (firstDetail.is(com.uber.cadence.api.v1.ServiceBusyError.class)) { + return mapDetails( + com.uber.cadence.api.v1.ServiceBusyError.class, + message, + firstDetail, + ErrorMapper::mapServiceBusy); + } + + // It has details, but they're unhandled. Fall back to the error code mapping + return fromCode(ex); + } + + private static TException fromCode(StatusRuntimeException exception) { + String message = exception.getStatus().getDescription(); + switch (exception.getStatus().getCode()) { case PERMISSION_DENIED: - return new AccessDeniedError(ex.getMessage()); + return new AccessDeniedError(message); case INTERNAL: - return new InternalServiceError(ex.getMessage()); - case NOT_FOUND: - { - if ("EntityNotExistsError".equals(details) - && ex.getMessage().contains("already completed.")) { - return new WorkflowExecutionAlreadyCompletedError(ex.getMessage()); - } else { - // TODO add cluster info - return new EntityNotExistsError(ex.getMessage()); - } - } - case ALREADY_EXISTS: - { - switch (details) { - case "CancellationAlreadyRequestedError": - return new CancellationAlreadyRequestedError(ex.getMessage()); - case "DomainAlreadyExistsError": - return new DomainAlreadyExistsError(ex.getMessage()); - case "WorkflowExecutionAlreadyStartedError": - { - // TODO add started wf info - WorkflowExecutionAlreadyStartedError e = new WorkflowExecutionAlreadyStartedError(); - e.setMessage(ex.getMessage()); - return e; - } - } - } + return new InternalServiceError(message); case DATA_LOSS: - return new InternalDataInconsistencyError(ex.getMessage()); - case FAILED_PRECONDITION: - switch (details) { - // TODO add infos - case "ClientVersionNotSupportedError": - return new ClientVersionNotSupportedError(); - case "FeatureNotEnabledError": - return new FeatureNotEnabledError(); - case "DomainNotActiveError": - { - DomainNotActiveError e = new DomainNotActiveError(); - e.setMessage(ex.getMessage()); - return e; - } - } - case RESOURCE_EXHAUSTED: - switch (details) { - case "LimitExceededError": - return new LimitExceededError(ex.getMessage()); - case "ServiceBusyError": - return new ServiceBusyError(ex.getMessage()); - } - case UNKNOWN: - return new TException(ex); + return new InternalDataInconsistencyError(message); default: - // If error does not match anything, return raw grpc status error - // There are some code that casts error to grpc status to check for deadline exceeded status - return new TException(ex); + return new TException(exception); } } - static String getErrorDetails(StatusRuntimeException ex) { - { - Metadata trailer = ex.getTrailers(); - Metadata.Key key = - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER); - if (trailer != null && trailer.containsKey(key)) { - return trailer.get(key); + private static TException mapDetails( + Class detailsType, + String message, + Any any, + BiFunction mapper) { + try { + if (any.is(detailsType)) { + T details = any.unpack(detailsType); + return mapper.apply(message, details); } else { - return ""; + LOGGER.error( + "Failed to decode exception. Message of type: {} was not: {}", + any.getTypeUrl(), + detailsType.getCanonicalName()); + return new TException(message); } + } catch (InvalidProtocolBufferException ex) { + LOGGER.error( + "Failed to decode exception of type: {} as {}", + any.getTypeUrl(), + detailsType.getCanonicalName(), + ex); + return new TException(message); } } + + private static WorkflowExecutionAlreadyStartedError mapWorkflowExecutionAlreadyStarted( + String message, com.uber.cadence.api.v1.WorkflowExecutionAlreadyStartedError protoErr) { + return new WorkflowExecutionAlreadyStartedError() + .setMessage(message) + .setStartRequestId(protoErr.getStartRequestId()) + .setRunId(protoErr.getRunId()); + } + + private static EntityNotExistsError mapEntityNotExistsError( + String message, com.uber.cadence.api.v1.EntityNotExistsError protoErr) { + return new EntityNotExistsError() + .setMessage(message) + .setActiveCluster(protoErr.getActiveCluster()) + .setCurrentCluster(protoErr.getCurrentCluster()); + } + + private static WorkflowExecutionAlreadyCompletedError mapWorkflowExecutionAlreadyCompleted( + String message, com.uber.cadence.api.v1.WorkflowExecutionAlreadyCompletedError protoErr) { + return new WorkflowExecutionAlreadyCompletedError().setMessage(message); + } + + private static DomainNotActiveError mapDomainNotActive( + String message, com.uber.cadence.api.v1.DomainNotActiveError protoErr) { + return new DomainNotActiveError() + .setMessage(message) + .setDomainName(protoErr.getDomain()) + .setActiveCluster(protoErr.getActiveCluster()) + .setCurrentCluster(protoErr.getCurrentCluster()); + } + + private static ClientVersionNotSupportedError mapClientVersionNotSupported( + String message, com.uber.cadence.api.v1.ClientVersionNotSupportedError protoErr) { + return new ClientVersionNotSupportedError() + .setFeatureVersion(protoErr.getFeatureVersion()) + .setClientImpl(protoErr.getClientImpl()) + .setSupportedVersions(protoErr.getSupportedVersions()); + } + + private static FeatureNotEnabledError mapFeatureNotEnabled( + String message, com.uber.cadence.api.v1.FeatureNotEnabledError protoErr) { + return new FeatureNotEnabledError().setFeatureFlag(protoErr.getFeatureFlag()); + } + + private static CancellationAlreadyRequestedError mapCancellationAlreadyRequested( + String message, com.uber.cadence.api.v1.CancellationAlreadyRequestedError protoErr) { + return new CancellationAlreadyRequestedError().setMessage(message); + } + + private static DomainAlreadyExistsError mapDomainAlreadyExists( + String message, com.uber.cadence.api.v1.DomainAlreadyExistsError protoErr) { + return new DomainAlreadyExistsError().setMessage(message); + } + + private static LimitExceededError mapLimitExceeded( + String message, com.uber.cadence.api.v1.LimitExceededError protoErr) { + return new LimitExceededError().setMessage(message); + } + + private static QueryFailedError mapQueryFailed( + String message, com.uber.cadence.api.v1.QueryFailedError protoErr) { + return new QueryFailedError().setMessage(message); + } + + private static ServiceBusyError mapServiceBusy( + String message, com.uber.cadence.api.v1.ServiceBusyError protoErr) { + return new ServiceBusyError().setMessage(message); + } } diff --git a/src/test/java/com/uber/cadence/internal/compatibility/thrift/ErrorMapperTest.java b/src/test/java/com/uber/cadence/internal/compatibility/thrift/ErrorMapperTest.java index 810f1166c..1352c7769 100644 --- a/src/test/java/com/uber/cadence/internal/compatibility/thrift/ErrorMapperTest.java +++ b/src/test/java/com/uber/cadence/internal/compatibility/thrift/ErrorMapperTest.java @@ -14,8 +14,10 @@ */ package com.uber.cadence.internal.compatibility.thrift; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; +import com.google.protobuf.Any; +import com.google.protobuf.Message; import com.uber.cadence.AccessDeniedError; import com.uber.cadence.CancellationAlreadyRequestedError; import com.uber.cadence.ClientVersionNotSupportedError; @@ -23,165 +25,204 @@ import com.uber.cadence.DomainNotActiveError; import com.uber.cadence.EntityNotExistsError; import com.uber.cadence.FeatureNotEnabledError; +import com.uber.cadence.InternalDataInconsistencyError; import com.uber.cadence.InternalServiceError; import com.uber.cadence.LimitExceededError; +import com.uber.cadence.QueryFailedError; import com.uber.cadence.ServiceBusyError; import com.uber.cadence.WorkflowExecutionAlreadyCompletedError; import com.uber.cadence.WorkflowExecutionAlreadyStartedError; -import io.grpc.Metadata; import io.grpc.Status; import io.grpc.StatusRuntimeException; -import org.apache.thrift.TException; +import io.grpc.protobuf.StatusProto; import org.junit.Test; public class ErrorMapperTest { @Test public void testPermissionDeniedError() { - StatusRuntimeException ex = - new StatusRuntimeException(Status.PERMISSION_DENIED.withDescription("Access denied")); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof AccessDeniedError); + assertEquals( + new AccessDeniedError("no access"), + ErrorMapper.mapError(toException(Status.Code.PERMISSION_DENIED, "no access"))); } @Test public void testInternalServiceError() { - StatusRuntimeException ex = - new StatusRuntimeException(Status.INTERNAL.withDescription("Internal error")); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof InternalServiceError); + assertEquals( + new InternalServiceError("no bueno"), + ErrorMapper.mapError(toException(Status.Code.INTERNAL, "no bueno"))); } @Test - public void testWorkflowExecutionAlreadyCompletedError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "EntityNotExistsError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.NOT_FOUND.withDescription("already completed."), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof WorkflowExecutionAlreadyCompletedError); + public void testInternalDataInconsistencyError() { + assertEquals( + new InternalDataInconsistencyError("no data"), + ErrorMapper.mapError(toException(Status.Code.DATA_LOSS, "no data"))); } @Test - public void testEntityNotExistsError() { - StatusRuntimeException ex = - new StatusRuntimeException(Status.NOT_FOUND.withDescription("Entity not found")); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof EntityNotExistsError); + public void testWorkflowExecutionAlreadyCompletedError() { + assertEquals( + new WorkflowExecutionAlreadyCompletedError().setMessage("done"), + ErrorMapper.mapError( + toException( + Status.Code.NOT_FOUND, + "done", + com.uber.cadence.api.v1.WorkflowExecutionAlreadyCompletedError + .getDefaultInstance()))); } @Test - public void testCancellationAlreadyRequestedError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "CancellationAlreadyRequestedError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.ALREADY_EXISTS.withDescription("Cancellation already requested"), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof CancellationAlreadyRequestedError); + public void testWorkflowExecutionAlreadyStartedError() { + assertEquals( + new WorkflowExecutionAlreadyStartedError() + .setMessage("already started") + .setStartRequestId("start-request-id") + .setRunId("run-id"), + ErrorMapper.mapError( + toException( + Status.Code.ALREADY_EXISTS, + "already started", + com.uber.cadence.api.v1.WorkflowExecutionAlreadyStartedError.newBuilder() + .setStartRequestId("start-request-id") + .setRunId("run-id") + .build()))); } @Test - public void testDomainAlreadyExistsError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "DomainAlreadyExistsError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.ALREADY_EXISTS.withDescription("Domain already exists"), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof DomainAlreadyExistsError); + public void testEntityNotExistsError() { + assertEquals( + new EntityNotExistsError() + .setMessage("not found") + .setActiveCluster("active-cluster") + .setCurrentCluster("current-cluster"), + ErrorMapper.mapError( + toException( + Status.Code.NOT_FOUND, + "not found", + com.uber.cadence.api.v1.EntityNotExistsError.newBuilder() + .setActiveCluster("active-cluster") + .setCurrentCluster("current-cluster") + .build()))); } @Test - public void testWorkflowExecutionAlreadyStartedError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "WorkflowExecutionAlreadyStartedError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.ALREADY_EXISTS.withDescription("Workflow already started"), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof WorkflowExecutionAlreadyStartedError); + public void testDomainNotActiveError() { + assertEquals( + new DomainNotActiveError() + .setMessage("domain inactive") + .setDomainName("domain-name") + .setActiveCluster("active-cluster") + .setCurrentCluster("current-cluster"), + ErrorMapper.mapError( + toException( + Status.Code.FAILED_PRECONDITION, + "domain inactive", + com.uber.cadence.api.v1.DomainNotActiveError.newBuilder() + .setDomain("domain-name") + .setActiveCluster("active-cluster") + .setCurrentCluster("current-cluster") + .build()))); } @Test public void testClientVersionNotSupportedError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "ClientVersionNotSupportedError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.FAILED_PRECONDITION.withDescription("Client version not supported"), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof ClientVersionNotSupportedError); + assertEquals( + new ClientVersionNotSupportedError() + .setFeatureVersion("feature-version") + .setClientImpl("client-impl") + .setSupportedVersions("supported-versions"), + ErrorMapper.mapError( + toException( + Status.Code.FAILED_PRECONDITION, + "unsupported version", + com.uber.cadence.api.v1.ClientVersionNotSupportedError.newBuilder() + .setFeatureVersion("feature-version") + .setClientImpl("client-impl") + .setSupportedVersions("supported-versions") + .build()))); } @Test public void testFeatureNotEnabledError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "FeatureNotEnabledError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.FAILED_PRECONDITION.withDescription("Feature not enabled"), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof FeatureNotEnabledError); + assertEquals( + new FeatureNotEnabledError().setFeatureFlag("feature-flag"), + ErrorMapper.mapError( + toException( + Status.Code.FAILED_PRECONDITION, + "feature not enabled", + com.uber.cadence.api.v1.FeatureNotEnabledError.newBuilder() + .setFeatureFlag("feature-flag") + .build()))); } @Test - public void testDomainNotActiveError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "DomainNotActiveError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.FAILED_PRECONDITION.withDescription("Domain not active"), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof DomainNotActiveError); + public void testCancellationAlreadyRequestedError() { + assertEquals( + new CancellationAlreadyRequestedError().setMessage("cancellation requested"), + ErrorMapper.mapError( + toException( + Status.Code.FAILED_PRECONDITION, + "cancellation requested", + com.uber.cadence.api.v1.CancellationAlreadyRequestedError.newBuilder().build()))); + } + + @Test + public void testDomainAlreadyExistsError() { + assertEquals( + new DomainAlreadyExistsError().setMessage("domain exists"), + ErrorMapper.mapError( + toException( + Status.Code.ALREADY_EXISTS, + "domain exists", + com.uber.cadence.api.v1.DomainAlreadyExistsError.newBuilder().build()))); } @Test public void testLimitExceededError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "LimitExceededError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.RESOURCE_EXHAUSTED.withDescription("Limit exceeded"), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof LimitExceededError); + assertEquals( + new LimitExceededError().setMessage("limit exceeded"), + ErrorMapper.mapError( + toException( + Status.Code.RESOURCE_EXHAUSTED, + "limit exceeded", + com.uber.cadence.api.v1.LimitExceededError.newBuilder().build()))); } @Test - public void testServiceBusyError() { - Metadata metadata = new Metadata(); - metadata.put( - Metadata.Key.of("rpc-application-error-name", Metadata.ASCII_STRING_MARSHALLER), - "ServiceBusyError"); - StatusRuntimeException ex = - new StatusRuntimeException( - Status.RESOURCE_EXHAUSTED.withDescription("Service busy"), metadata); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof ServiceBusyError); + public void testQueryFailedError() { + assertEquals( + new QueryFailedError().setMessage("query failed"), + ErrorMapper.mapError( + toException( + Status.Code.INVALID_ARGUMENT, + "query failed", + com.uber.cadence.api.v1.QueryFailedError.newBuilder().build()))); } @Test - public void testUnknownError() { - StatusRuntimeException ex = - new StatusRuntimeException(Status.UNKNOWN.withDescription("Unknown error")); - TException result = ErrorMapper.Error(ex); - assertTrue(result instanceof TException); + public void testServiceBusyError() { + assertEquals( + new ServiceBusyError().setMessage("service busy"), + ErrorMapper.mapError( + toException( + Status.Code.UNAVAILABLE, + "service busy", + com.uber.cadence.api.v1.ServiceBusyError.newBuilder().build()))); + } + + private static StatusRuntimeException toException(Status.Code code, String message) { + return StatusProto.toStatusRuntimeException( + com.google.rpc.Status.newBuilder().setCode(code.value()).setMessage(message).build()); + } + + private static StatusRuntimeException toException( + Status.Code code, String message, Message details) { + return StatusProto.toStatusRuntimeException( + com.google.rpc.Status.newBuilder() + .setCode(code.value()) + .addDetails(Any.pack(details)) + .setMessage(message) + .build()); } } diff --git a/src/test/java/com/uber/cadence/testUtils/TestEnvironment.java b/src/test/java/com/uber/cadence/testUtils/TestEnvironment.java index 08faf2c67..7b188b949 100644 --- a/src/test/java/com/uber/cadence/testUtils/TestEnvironment.java +++ b/src/test/java/com/uber/cadence/testUtils/TestEnvironment.java @@ -14,6 +14,7 @@ */ package com.uber.cadence.testUtils; +import com.uber.cadence.FeatureFlags; import com.uber.cadence.internal.compatibility.Thrift2ProtoAdapter; import com.uber.cadence.internal.compatibility.proto.serviceclient.IGrpcServiceStubs; import com.uber.cadence.serviceclient.ClientOptions; @@ -43,6 +44,11 @@ public static boolean isUseDockerService() { public static IWorkflowService getDockerService() { return new Thrift2ProtoAdapter( - IGrpcServiceStubs.newInstance(ClientOptions.newBuilder().setPort(7833).build())); + IGrpcServiceStubs.newInstance( + ClientOptions.newBuilder() + .setPort(7833) + .setFeatureFlags( + new FeatureFlags().setWorkflowExecutionAlreadyCompletedErrorEnabled(true)) + .build())); } }