1717
1818package com .uber .cadence .internal .worker ;
1919
20- import com .uber .cadence .*;
20+ import com .uber .cadence .BadRequestError ;
21+ import com .uber .cadence .DomainNotActiveError ;
22+ import com .uber .cadence .EntityNotExistsError ;
23+ import com .uber .cadence .Header ;
24+ import com .uber .cadence .PollForActivityTaskResponse ;
25+ import com .uber .cadence .RespondActivityTaskCanceledRequest ;
26+ import com .uber .cadence .RespondActivityTaskCompletedRequest ;
27+ import com .uber .cadence .RespondActivityTaskFailedRequest ;
28+ import com .uber .cadence .WorkflowExecution ;
2129import com .uber .cadence .common .RetryOptions ;
2230import com .uber .cadence .context .ContextPropagator ;
2331import com .uber .cadence .internal .common .Retryer ;
@@ -132,21 +140,8 @@ public boolean isSuspended() {
132140 return poller .isSuspended ();
133141 }
134142
135- static class MeasurableActivityTask {
136- PollForActivityTaskResponse task ;
137- Stopwatch sw ;
138-
139- MeasurableActivityTask (PollForActivityTaskResponse task , Stopwatch sw ) {
140- this .task = Objects .requireNonNull (task );
141- this .sw = Objects .requireNonNull (sw );
142- }
143-
144- void markDone () {
145- sw .stop ();
146- }
147- }
148-
149- private class TaskHandlerImpl implements PollTaskExecutor .TaskHandler <MeasurableActivityTask > {
143+ private class TaskHandlerImpl
144+ implements PollTaskExecutor .TaskHandler <PollForActivityTaskResponse > {
150145
151146 final ActivityTaskHandler handler ;
152147
@@ -155,43 +150,52 @@ private TaskHandlerImpl(ActivityTaskHandler handler) {
155150 }
156151
157152 @ Override
158- public void handle (MeasurableActivityTask task ) throws Exception {
153+ public void handle (PollForActivityTaskResponse task ) throws Exception {
159154 Scope metricsScope =
160155 options
161156 .getMetricsScope ()
162157 .tagged (
163- ImmutableMap .of (MetricsTag .ACTIVITY_TYPE , task .task .getActivityType ().getName ()));
158+ ImmutableMap .of (
159+ MetricsTag .ACTIVITY_TYPE ,
160+ task .getActivityType ().getName (),
161+ MetricsTag .WORKFLOW_TYPE ,
162+ task .getWorkflowType ().getName ()));
163+
164164 metricsScope
165- .timer (MetricsType .TASK_LIST_QUEUE_LATENCY )
165+ .timer (MetricsType .ACTIVITY_SCHEDULED_TO_START_LATENCY )
166166 .record (
167167 Duration .ofNanos (
168- task .task . getStartedTimestamp () - task .task . getScheduledTimestamp ()));
168+ task .getStartedTimestamp () - task .getScheduledTimestampOfThisAttempt ()));
169169
170170 // The following tags are for logging.
171- MDC .put (LoggerTag .ACTIVITY_ID , task .task . getActivityId ());
172- MDC .put (LoggerTag .ACTIVITY_TYPE , task .task . getActivityType ().getName ());
173- MDC .put (LoggerTag .WORKFLOW_ID , task .task . getWorkflowExecution ().getWorkflowId ());
174- MDC .put (LoggerTag .RUN_ID , task .task . getWorkflowExecution ().getRunId ());
171+ MDC .put (LoggerTag .ACTIVITY_ID , task .getActivityId ());
172+ MDC .put (LoggerTag .ACTIVITY_TYPE , task .getActivityType ().getName ());
173+ MDC .put (LoggerTag .WORKFLOW_ID , task .getWorkflowExecution ().getWorkflowId ());
174+ MDC .put (LoggerTag .RUN_ID , task .getWorkflowExecution ().getRunId ());
175175
176- propagateContext (task . task );
176+ propagateContext (task );
177177
178178 try {
179179 Stopwatch sw = metricsScope .timer (MetricsType .ACTIVITY_EXEC_LATENCY ).start ();
180- ActivityTaskHandler .Result response = handler .handle (task . task , metricsScope , false );
180+ ActivityTaskHandler .Result response = handler .handle (task , metricsScope , false );
181181 sw .stop ();
182182
183183 sw = metricsScope .timer (MetricsType .ACTIVITY_RESP_LATENCY ).start ();
184- sendReply (task . task , response , metricsScope );
184+ sendReply (task , response , metricsScope );
185185 sw .stop ();
186186
187- task .markDone ();
187+ metricsScope
188+ .timer (MetricsType .ACTIVITY_E2E_LATENCY )
189+ .record (
190+ Duration .ofNanos (System .nanoTime () - task .getScheduledTimestampOfThisAttempt ()));
191+
188192 } catch (CancellationException e ) {
189193 RespondActivityTaskCanceledRequest cancelledRequest =
190194 new RespondActivityTaskCanceledRequest ();
191195 cancelledRequest .setDetails (
192196 String .valueOf (e .getMessage ()).getBytes (StandardCharsets .UTF_8 ));
193197 Stopwatch sw = metricsScope .timer (MetricsType .ACTIVITY_RESP_LATENCY ).start ();
194- sendReply (task . task , new Result (null , null , cancelledRequest , null ), metricsScope );
198+ sendReply (task , new Result (null , null , cancelledRequest , null ), metricsScope );
195199 sw .stop ();
196200 } finally {
197201 MDC .remove (LoggerTag .ACTIVITY_ID );
@@ -225,17 +229,17 @@ void propagateContext(PollForActivityTaskResponse response) {
225229 }
226230
227231 @ Override
228- public Throwable wrapFailure (MeasurableActivityTask task , Throwable failure ) {
229- WorkflowExecution execution = task .task . getWorkflowExecution ();
232+ public Throwable wrapFailure (PollForActivityTaskResponse task , Throwable failure ) {
233+ WorkflowExecution execution = task .getWorkflowExecution ();
230234 return new RuntimeException (
231235 "Failure processing activity task. WorkflowID="
232236 + execution .getWorkflowId ()
233237 + ", RunID="
234238 + execution .getRunId ()
235239 + ", ActivityType="
236- + task .task . getActivityType ().getName ()
240+ + task .getActivityType ().getName ()
237241 + ", ActivityID="
238- + task .task . getActivityId (),
242+ + task .getActivityId (),
239243 failure );
240244 }
241245
0 commit comments