Skip to content

Commit bc4e6da

Browse files
Fixed Github Issue #524
1 parent 9b75259 commit bc4e6da

File tree

1 file changed

+71
-39
lines changed

1 file changed

+71
-39
lines changed

analytics/src/main/java/com/segment/analytics/internal/AnalyticsClient.java

Lines changed: 71 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ public int messageSizeInBytes(Message message) {
159159
private Boolean isBackPressuredAfterSize(int incomingSize) {
160160
int POISON_BYTE_SIZE = messageSizeInBytes(FlushMessage.POISON);
161161
int sizeAfterAdd = this.currentQueueSizeInBytes + incomingSize + POISON_BYTE_SIZE;
162-
// Leave a 10% buffer since the unsynchronized enqueue could add multiple at a time
162+
// Leave a 10% buffer since the unsynchronized enqueue could add multiple at a
163+
// time
163164
return sizeAfterAdd >= Math.min(this.maximumQueueByteSize, BATCH_MAX_SIZE) * 0.9;
164165
}
165166

@@ -174,12 +175,14 @@ public void enqueue(Message message) {
174175
}
175176

176177
try {
177-
// @jorgen25 message here could be regular msg, POISON or STOP. Only do regular logic if its
178+
// @jorgen25 message here could be regular msg, POISON or STOP. Only do regular
179+
// logic if its
178180
// valid message
179181
if (message != StopMessage.STOP && message != FlushMessage.POISON) {
180182
int messageByteSize = messageSizeInBytes(message);
181183

182-
// @jorgen25 check if message is below 32kb limit for individual messages, no need to check
184+
// @jorgen25 check if message is below 32kb limit for individual messages, no
185+
// need to check
183186
// for extra characters
184187
if (messageByteSize <= MSG_MAX_SIZE) {
185188
if (isBackPressuredAfterSize(messageByteSize)) {
@@ -234,7 +237,9 @@ public void shutdown() {
234237
}
235238

236239
/**
237-
* Wait for the looper to complete processing all messages before proceedin with shutdown. This prevents the race condition where the network executor is shut down before the looper finishes
240+
* Wait for the looper to complete processing all messages before proceedin with
241+
* shutdown. This prevents the race condition where the network executor is shut
242+
* down before the looper finishes
238243
* submitting all batches.
239244
*/
240245
private void waitForLooperCompletion() {
@@ -292,7 +297,8 @@ public void shutdownAndWait(ExecutorService executor, String name) {
292297
}
293298

294299
/**
295-
* Looper runs on a background thread and takes messages from the queue. Once it collects enough
300+
* Looper runs on a background thread and takes messages from the queue. Once it
301+
* collects enough
296302
* messages, it triggers a flush.
297303
*/
298304
class Looper implements Runnable {
@@ -320,18 +326,20 @@ public void run() {
320326
log.print(VERBOSE, "Flushing messages.");
321327
}
322328
} else {
323-
// we do +1 because we are accounting for this new message we just took from the queue
329+
// we do +1 because we are accounting for this new message we just took from the
330+
// queue
324331
// which is not in list yet
325-
// need to check if this message is going to make us go over the limit considering
332+
// need to check if this message is going to make us go over the limit
333+
// considering
326334
// default batch size as well
327-
int defaultBatchSize =
328-
BatchUtility.getBatchDefaultSize(contextSize, messages.size() + 1);
335+
int defaultBatchSize = BatchUtility.getBatchDefaultSize(contextSize, messages.size() + 1);
329336
int msgSize = messageSizeInBytes(message);
330337
if (currentBatchSize.get() + msgSize + defaultBatchSize <= BATCH_MAX_SIZE) {
331338
messages.add(message);
332339
currentBatchSize.addAndGet(msgSize);
333340
} else {
334-
// put message that did not make the cut this time back on the queue, we already took
341+
// put message that did not make the cut this time back on the queue, we already
342+
// took
335343
// this message if we dont put it back its lost
336344
// we take care of that after submitting the batch
337345
batchSizeLimitReached = true;
@@ -368,7 +376,8 @@ public void run() {
368376
currentBatchSize.set(0);
369377
messages.clear();
370378
if (batchSizeLimitReached) {
371-
// If this is true that means the last message that would make us go over the limit
379+
// If this is true that means the last message that would make us go over the
380+
// limit
372381
// was not added,
373382
// add it to the now cleared messages list so its not lost
374383
messages.add(message);
@@ -385,12 +394,11 @@ public void run() {
385394
}
386395

387396
static class BatchUploadTask implements Runnable {
388-
private static final Backo BACKO =
389-
Backo.builder() //
390-
.base(TimeUnit.SECONDS, 15) //
391-
.cap(TimeUnit.HOURS, 1) //
392-
.jitter(1) //
393-
.build();
397+
private static final Backo BACKO = Backo.builder() //
398+
.base(TimeUnit.SECONDS, 15) //
399+
.cap(TimeUnit.HOURS, 1) //
400+
.jitter(1) //
401+
.build();
394402

395403
private final AnalyticsClient client;
396404
private final Backo backo;
@@ -416,7 +424,10 @@ private void notifyCallbacksWithException(Batch batch, Exception exception) {
416424
}
417425
}
418426

419-
/** Returns {@code true} to indicate a batch should be retried. {@code false} otherwise. */
427+
/**
428+
* Returns {@code true} to indicate a batch should be retried. {@code false}
429+
* otherwise.
430+
*/
420431
boolean upload() {
421432
client.log.print(VERBOSE, "Uploading batch %s.", batch.sequence());
422433

@@ -469,7 +480,8 @@ public void run() {
469480
int attempt = 0;
470481
for (; attempt <= maxRetries; attempt++) {
471482
boolean retry = upload();
472-
if (!retry) return;
483+
if (!retry)
484+
return;
473485
try {
474486
backo.sleep(attempt);
475487
} catch (InterruptedException e) {
@@ -492,45 +504,65 @@ private static boolean is5xx(int status) {
492504
public static class BatchUtility {
493505

494506
/**
495-
* Method to determine what is the expected default size of the batch regardless of messages
507+
* Method to determine what is the expected default size of the batch regardless
508+
* of messages
496509
*
497-
* <p>Sample batch:
510+
* <p>
511+
* Sample batch:
498512
* {"batch":[{"type":"alias","messageId":"fc9198f9-d827-47fb-96c8-095bd3405d93","timestamp":"Nov
499513
* 18, 2021, 2:45:07
500514
* PM","userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias",
501-
* "messageId":"3ce6f88c-36cb-4991-83f8-157e10261a89","timestamp":"Nov 18, 2021, 2:45:07
515+
* "messageId":"3ce6f88c-36cb-4991-83f8-157e10261a89","timestamp":"Nov 18, 2021,
516+
* 2:45:07
502517
* PM","userId":"jorgen25",
503518
* "integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias",
504-
* "messageId":"a328d339-899a-4a14-9835-ec91e303ac4d","timestamp":"Nov 18, 2021, 2:45:07 PM",
519+
* "messageId":"a328d339-899a-4a14-9835-ec91e303ac4d","timestamp":"Nov 18, 2021,
520+
* 2:45:07 PM",
505521
* "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"},{"type":"alias",
506-
* "messageId":"57b0ceb4-a1cf-4599-9fba-0a44c7041004","timestamp":"Nov 18, 2021, 2:45:07 PM",
522+
* "messageId":"57b0ceb4-a1cf-4599-9fba-0a44c7041004","timestamp":"Nov 18, 2021,
523+
* 2:45:07 PM",
507524
* "userId":"jorgen25","integrations":{"someKey":{"data":"aaaaa"}},"previousId":"foo"}],
508-
* "sentAt":"Nov 18, 2021, 2:45:07 PM","context":{"library":{"name":"analytics-java",
525+
* "sentAt":"Nov 18, 2021, 2:45:07
526+
* PM","context":{"library":{"name":"analytics-java",
509527
* "version":"3.1.3"}},"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"}
510528
*
511-
* <p>total size of batch : 932
529+
* <p>
530+
* total size of batch : 932
512531
*
513-
* <p>BREAKDOWN: {"batch":[MESSAGE1,MESSAGE2,MESSAGE3,MESSAGE4],"sentAt":"MMM dd, yyyy, HH:mm:ss
532+
* <p>
533+
* BREAKDOWN: {"batch":[MESSAGE1,MESSAGE2,MESSAGE3,MESSAGE4],"sentAt":"MMM dd,
534+
* yyyy, HH:mm:ss
514535
* tt","context":CONTEXT,"sequence":1,"writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"}
515536
*
516-
* <p>so we need to account for: 1 -message size: 189 * 4 = 756 2 -context object size = 55 in
517-
* this sample -> 756 + 55 = 811 3 -Metadata (This has the sent data/sequence characters) +
518-
* extra chars (these are chars like "batch":[] or "context": etc and will be pretty much the
519-
* same length in every batch -> size is 73 --> 811 + 73 = 884 (well 72 actually, char 73 is the
520-
* sequence digit which we account for in point 5) 4 -Commas between each message, the total
521-
* number of commas is number_of_msgs - 1 = 3 -> 884 + 3 = 887 (sample is 886 because the hour
522-
* in sentData this time happens to be 2:45 but it could be 12:45 5 -Sequence Number increments
537+
* <p>
538+
* so we need to account for: 1 -message size: 189 * 4 = 756 2 -context object
539+
* size = 55 in
540+
* this sample -> 756 + 55 = 811 3 -Metadata (This has the sent data/sequence
541+
* characters) +
542+
* extra chars (these are chars like "batch":[] or "context": etc and will be
543+
* pretty much the
544+
* same length in every batch -> size is 73 --> 811 + 73 = 884 (well 72
545+
* actually, char 73 is the
546+
* sequence digit which we account for in point 5) 4 -Commas between each
547+
* message, the total
548+
* number of commas is number_of_msgs - 1 = 3 -> 884 + 3 = 887 (sample is 886
549+
* because the hour
550+
* in sentData this time happens to be 2:45 but it could be 12:45 5 -Sequence
551+
* Number increments
523552
* with every batch created
524553
*
525-
* <p>so formulae to determine the expected default size of the batch is
554+
* <p>
555+
* so formulae to determine the expected default size of the batch is
526556
*
527-
* @return: defaultSize = messages size + context size + metadata size + comma number + sequence
528-
* digits + writekey + buffer
557+
* @return: defaultSize = messages size + context size + metadata size + comma
558+
* number + sequence
559+
* digits + writekey + buffer
529560
* @return
530561
*/
531562
private static int getBatchDefaultSize(int contextSize, int currentMessageNumber) {
532-
// sample data: {"batch":[],"sentAt":"MMM dd, yyyy, HH:mm:ss tt","context":,"sequence":1,
533-
// "writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - 119
563+
// sample data: {"batch":[],"sentAt":"MMM dd, yyyy, HH:mm:ss
564+
// tt","context":,"sequence":1,
565+
// "writeKey":"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"} - 119
534566
// Don't need to squeeze everything possible into a batch, adding a buffer
535567
int metadataExtraCharsSize = 119 + 1024;
536568
int commaNumber = currentMessageNumber - 1;

0 commit comments

Comments
 (0)