Skip to content

Commit 82dfde2

Browse files
Patch for Github issue #524 (#526)
* Patch for Github issue #524
1 parent c57a583 commit 82dfde2

File tree

1 file changed

+95
-10
lines changed

1 file changed

+95
-10
lines changed

analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java

Lines changed: 95 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
import java.util.concurrent.BlockingQueue;
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.concurrent.Executors;
28+
import java.util.concurrent.Future;
2829
import java.util.concurrent.LinkedBlockingQueue;
30+
import java.util.concurrent.RejectedExecutionException;
2931
import java.util.concurrent.ScheduledExecutorService;
3032
import java.util.concurrent.ThreadFactory;
3133
import java.util.concurrent.TimeUnit;
@@ -42,6 +44,8 @@ public class AnalyticsClient {
4244
private static final Charset ENCODING = StandardCharsets.UTF_8;
4345
private Gson gsonInstance;
4446
private static final String instanceId = UUID.randomUUID().toString();
47+
private static final int WAIT_FOR_THREAD_COMPLETE_S = 5;
48+
private static final int TERMINATION_TIMEOUT_S = 1;
4549

4650
static {
4751
Map<String, String> library = new LinkedHashMap<>();
@@ -67,6 +71,7 @@ public class AnalyticsClient {
6771
private final ScheduledExecutorService flushScheduler;
6872
private final AtomicBoolean isShutDown;
6973
private final String writeKey;
74+
private volatile Future<?> looperFuture;
7075

7176
public static AnalyticsClient create(
7277
HttpUrl uploadUrl,
@@ -130,7 +135,9 @@ public AnalyticsClient(
130135

131136
this.currentQueueSizeInBytes = 0;
132137

133-
if (!isShutDown.get()) looperExecutor.submit(new Looper());
138+
if (!isShutDown.get()) {
139+
this.looperFuture = looperExecutor.submit(new Looper());
140+
}
134141

135142
flushScheduler = Executors.newScheduledThreadPool(1, threadFactory);
136143
flushScheduler.scheduleAtFixedRate(
@@ -218,6 +225,8 @@ public void shutdown() {
218225
// we can shutdown the flush scheduler without worrying
219226
flushScheduler.shutdownNow();
220227

228+
// Wait for the looper to complete processing before shutting down executors
229+
waitForLooperCompletion();
221230
shutdownAndWait(looperExecutor, "looper");
222231
shutdownAndWait(networkExecutor, "network");
223232

@@ -226,19 +235,81 @@ public void shutdown() {
226235
}
227236
}
228237

238+
/**
239+
* Wait for the looper to complete processing all messages before proceeding with shutdown. This
240+
* prevents the race condition where the network executor is shut down before the looper finishes
241+
* submitting all batches.
242+
*/
243+
private void waitForLooperCompletion() {
244+
if (looperFuture != null) {
245+
try {
246+
// Wait for the looper to complete processing the STOP message and finish
247+
// Use a reasonable timeout to avoid hanging indefinitely
248+
looperFuture.get(WAIT_FOR_THREAD_COMPLETE_S, TimeUnit.SECONDS);
249+
log.print(VERBOSE, "Looper completed successfully.");
250+
} catch (Exception e) {
251+
log.print(ERROR, e, "Error waiting for looper to complete.");
252+
// Cancel the looper if it's taking too long or if there's an error
253+
if (!looperFuture.isDone()) {
254+
looperFuture.cancel(true);
255+
log.print(VERBOSE, "Looper was cancelled due to timeout or error.");
256+
}
257+
}
258+
}
259+
}
260+
229261
public void shutdownAndWait(ExecutorService executor, String name) {
262+
boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper");
230263
try {
231264
executor.shutdown();
232-
final boolean executorTerminated = executor.awaitTermination(1, TimeUnit.SECONDS);
233-
234-
log.print(
235-
VERBOSE,
236-
"%s executor %s.",
237-
name,
238-
executorTerminated ? "terminated normally" : "timed out");
265+
boolean terminated = executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS);
266+
if (terminated) {
267+
log.print(VERBOSE, "%s executor terminated normally.", name);
268+
return;
269+
}
270+
if (isLooperExecutor) { // Handle looper - network should finish on its own
271+
// not terminated within timeout -> force shutdown
272+
log.print(
273+
VERBOSE,
274+
"%s did not terminate in %d seconds; requesting shutdownNow().",
275+
name,
276+
TERMINATION_TIMEOUT_S);
277+
List<Runnable> dropped = executor.shutdownNow(); // interrupts running tasks
278+
log.print(
279+
VERBOSE,
280+
"%s shutdownNow returned %d queued tasks that never started.",
281+
name,
282+
dropped.size());
283+
284+
// optional short wait to give interrupted tasks a chance to exit
285+
boolean terminatedAfterForce =
286+
executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS);
287+
log.print(
288+
VERBOSE,
289+
"%s executor %s after shutdownNow().",
290+
name,
291+
terminatedAfterForce ? "terminated" : "still running (did not terminate)");
292+
293+
if (!terminatedAfterForce) {
294+
// final warning — investigate tasks that ignore interrupts
295+
log.print(
296+
ERROR,
297+
"%s executor still did not terminate; tasks may be ignoring interrupts.",
298+
name);
299+
}
300+
}
239301
} catch (InterruptedException e) {
302+
// Preserve interrupt status and attempt forceful shutdown
240303
log.print(ERROR, e, "Interrupted while stopping %s executor.", name);
241304
Thread.currentThread().interrupt();
305+
if (isLooperExecutor) {
306+
List<Runnable> dropped = executor.shutdownNow();
307+
log.print(
308+
VERBOSE,
309+
"%s shutdownNow invoked after interrupt; %d tasks returned.",
310+
name,
311+
dropped.size());
312+
}
242313
}
243314
}
244315

@@ -299,8 +370,22 @@ public void run() {
299370
"Batching %s message(s) into batch %s.",
300371
batch.batch().size(),
301372
batch.sequence());
302-
networkExecutor.submit(
303-
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
373+
try {
374+
networkExecutor.submit(
375+
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
376+
} catch (RejectedExecutionException e) {
377+
log.print(
378+
ERROR,
379+
e,
380+
"Failed to submit batch %s to network executor during shutdown. Batch will be lost.",
381+
batch.sequence());
382+
// Notify callbacks about the failure
383+
for (Message msg : batch.batch()) {
384+
for (Callback callback : callbacks) {
385+
callback.failure(msg, e);
386+
}
387+
}
388+
}
304389

305390
currentBatchSize.set(0);
306391
messages.clear();

0 commit comments

Comments
 (0)