Skip to content

Commit ecd53c0

Browse files
Changes with spotless fixes
1 parent d9753a6 commit ecd53c0

File tree

1 file changed

+88
-10
lines changed

1 file changed

+88
-10
lines changed

analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java

Lines changed: 88 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import java.util.concurrent.BlockingQueue;
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
28+
import java.util.concurrent.Future;
2829
import java.util.concurrent.LinkedBlockingQueue;
30+
import java.util.concurrent.RejectedExecutionException;
2931
import java.util.concurrent.ScheduledExecutorService;
3032
import java.util.concurrent.ThreadFactory;
3133
import java.util.concurrent.TimeUnit;
@@ -67,6 +69,7 @@ public class AnalyticsClient {
6769
private final ScheduledExecutorService flushScheduler;
6870
private final AtomicBoolean isShutDown;
6971
private final String writeKey;
72+
private volatile Future<?> looperFuture;
7073

7174
public static AnalyticsClient create(
7275
HttpUrl uploadUrl,
@@ -130,7 +133,9 @@ public AnalyticsClient(
130133

131134
this.currentQueueSizeInBytes = 0;
132135

133-
if (!isShutDown.get()) looperExecutor.submit(new Looper());
136+
if (!isShutDown.get()) {
137+
this.looperFuture = looperExecutor.submit(new Looper());
138+
}
134139

135140
flushScheduler = Executors.newScheduledThreadPool(1, threadFactory);
136141
flushScheduler.scheduleAtFixedRate(
@@ -218,6 +223,8 @@ public void shutdown() {
218223
// we can shutdown the flush scheduler without worrying
219224
flushScheduler.shutdownNow();
220225

226+
// Wait for the looper to complete processing before shutting down executors
227+
waitForLooperCompletion();
221228
shutdownAndWait(looperExecutor, "looper");
222229
shutdownAndWait(networkExecutor, "network");
223230

@@ -226,19 +233,76 @@ public void shutdown() {
226233
}
227234
}
228235

236+
/**
237+
* Wait for the looper to complete processing all messages before proceeding with shutdown. This
238+
* prevents the race condition where the network executor is shut down before the looper finishes
239+
* submitting all batches.
240+
*/
241+
private void waitForLooperCompletion() {
242+
if (looperFuture != null) {
243+
try {
244+
// Wait for the looper to complete processing the STOP message and finish
245+
// Use a reasonable timeout to avoid hanging indefinitely
246+
looperFuture.get(5, TimeUnit.SECONDS);
247+
log.print(VERBOSE, "Looper completed successfully.");
248+
} catch (Exception e) {
249+
log.print(ERROR, e, "Error waiting for looper to complete: %s", e.getMessage());
250+
// Cancel the looper if it's taking too long or if there's an error
251+
if (!looperFuture.isDone()) {
252+
looperFuture.cancel(true);
253+
log.print(VERBOSE, "Looper was cancelled due to timeout or error.");
254+
}
255+
}
256+
}
257+
}
258+
229259
public void shutdownAndWait(ExecutorService executor, String name) {
260+
boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper");
230261
try {
231262
executor.shutdown();
232-
final boolean executorTerminated = executor.awaitTermination(1, TimeUnit.SECONDS);
233-
234-
log.print(
235-
VERBOSE,
236-
"%s executor %s.",
237-
name,
238-
executorTerminated ? "terminated normally" : "timed out");
263+
boolean terminated = executor.awaitTermination(1, TimeUnit.MILLISECONDS);
264+
if (terminated) {
265+
log.print(VERBOSE, "%s executor terminated normally.", name);
266+
return;
267+
}
268+
if (isLooperExecutor) {
269+
// not terminated within timeout -> force shutdown
270+
log.print(VERBOSE, "%s did not terminate in %d ms; requesting shutdownNow().", name, 1);
271+
List<Runnable> dropped = executor.shutdownNow(); // interrupts running tasks
272+
log.print(
273+
VERBOSE,
274+
"%s shutdownNow returned %d queued tasks that never started.",
275+
name,
276+
dropped.size());
277+
278+
// optional short wait to give interrupted tasks a chance to exit
279+
boolean terminatedAfterForce = executor.awaitTermination(1, TimeUnit.MILLISECONDS);
280+
log.print(
281+
VERBOSE,
282+
"%s executor %s after shutdownNow().",
283+
name,
284+
terminatedAfterForce ? "terminated" : "still running (did not terminate)");
285+
286+
if (!terminatedAfterForce) {
287+
// final warning — investigate tasks that ignore interrupts
288+
log.print(
289+
ERROR,
290+
"%s executor still did not terminate; tasks may be ignoring interrupts.",
291+
name);
292+
}
293+
}
239294
} catch (InterruptedException e) {
295+
// Preserve interrupt status and attempt forceful shutdown
240296
log.print(ERROR, e, "Interrupted while stopping %s executor.", name);
241297
Thread.currentThread().interrupt();
298+
if (isLooperExecutor) {
299+
List<Runnable> dropped = executor.shutdownNow();
300+
log.print(
301+
VERBOSE,
302+
"%s shutdownNow invoked after interrupt; %d tasks returned.",
303+
name,
304+
dropped.size());
305+
}
242306
}
243307
}
244308

@@ -299,8 +363,22 @@ public void run() {
299363
"Batching %s message(s) into batch %s.",
300364
batch.batch().size(),
301365
batch.sequence());
302-
networkExecutor.submit(
303-
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
366+
try {
367+
networkExecutor.submit(
368+
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
369+
} catch (RejectedExecutionException e) {
370+
log.print(
371+
ERROR,
372+
e,
373+
"Failed to submit batch %s to network executor during shutdown. Batch will be lost.",
374+
batch.sequence());
375+
// Notify callbacks about the failure
376+
for (Message msg : batch.batch()) {
377+
for (Callback callback : callbacks) {
378+
callback.failure(msg, e);
379+
}
380+
}
381+
}
304382

305383
currentBatchSize.set(0);
306384
messages.clear();

0 commit comments

Comments
 (0)