Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions src/main/java/com/mixpanel/mixpanelapi/DeliveryOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package com.mixpanel.mixpanelapi;

/**
* Options for configuring how messages are delivered to Mixpanel.
* Use the {@link Builder} to create instances.
*
* <p>Different options apply to different message types:
* <ul>
* <li>{@code importStrictMode} - Only applies to import messages</li>
* <li>{@code useIpAddress} - Only applies to events, people, and groups messages (NOT imports)</li>
* </ul>
*
* <p>Example usage:
* <pre>{@code
* DeliveryOptions options = new DeliveryOptions.Builder()
* .importStrictMode(false) // Disable strict validation for imports
* .useIpAddress(true) // Use IP address for geolocation (events/people/groups only)
* .build();
*
* mixpanelApi.deliver(delivery, options);
* }</pre>
*/
public class DeliveryOptions {

private final boolean mImportStrictMode;
private final boolean mUseIpAddress;

private DeliveryOptions(Builder builder) {
mImportStrictMode = builder.importStrictMode;
mUseIpAddress = builder.useIpAddress;
}

/**
* Returns whether strict mode is enabled for import messages.
*
* <p><strong>Note:</strong> This option only applies to import messages (historical events).
* It has no effect on regular events, people, or groups messages.
*
* <p>When strict mode is enabled (default), the /import endpoint validates each event
* and returns a 400 error if any event has issues. Correctly formed events are still
* ingested, and problematic events are returned in the response with error messages.
*
* <p>When strict mode is disabled, validation is bypassed and all events are imported
* regardless of their validity.
*
* @return true if strict mode is enabled for imports, false otherwise
*/
public boolean isImportStrictMode() {
return mImportStrictMode;
}

/**
* Returns whether the IP address should be used for geolocation.
*
* <p><strong>Note:</strong> This option only applies to events, people, and groups messages.
* It does NOT apply to import messages, which use Basic Auth and don't support the ip parameter.
*
* @return true if IP address should be used for geolocation, false otherwise
*/
public boolean useIpAddress() {
return mUseIpAddress;
}

/**
* Builder for creating {@link DeliveryOptions} instances.
*/
public static class Builder {
private boolean importStrictMode = true;
private boolean useIpAddress = false;

/**
* Sets whether to use strict mode for import messages.
*
* will validate the supplied events and return a 400 status code if any of the events fail validation with details of the error
Copy link

Copilot AI Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line appears to be a leftover fragment from editing. It's not formatted as a proper JavaDoc comment (missing <p> tag or proper punctuation) and duplicates the content of the following line. Remove this line as the complete description starts on line 76.

Suggested change
* will validate the supplied events and return a 400 status code if any of the events fail validation with details of the error

Copilot uses AI. Check for mistakes.
*
* <p>Setting this value to true (default) will validate the supplied events and return
* a 400 status code if any of the events fail validation with details of the error.
* Setting this value to false disables validation.
*
* @param importStrictMode true to enable strict validation (default), false to disable
* @return this Builder instance for method chaining
*/
public Builder importStrictMode(boolean importStrictMode) {
this.importStrictMode = importStrictMode;
return this;
}

/**
* Sets whether to use the IP address for geolocation.
*
* <p><strong>Note:</strong> This option only applies to events, people, and groups messages.
* It does NOT apply to import messages.
*
* <p>When enabled, Mixpanel will use the IP address of the request to set
* geolocation properties on events and profiles.
*
* @param useIpAddress true to use IP address for geolocation, false otherwise (default)
* @return this Builder instance for method chaining
*/
public Builder useIpAddress(boolean useIpAddress) {
this.useIpAddress = useIpAddress;
return this;
}

/**
* Builds and returns a new {@link DeliveryOptions} instance.
*
* @return a new DeliveryOptions with the configured settings
*/
public DeliveryOptions build() {
return new DeliveryOptions(this);
}
}
}
124 changes: 94 additions & 30 deletions src/main/java/com/mixpanel/mixpanelapi/MixpanelAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class MixpanelAPI implements AutoCloseable {
protected final boolean mUseGzipCompression;
protected final Integer mConnectTimeout;
protected final Integer mReadTimeout;
protected Integer mImportMaxMessageCount;
protected final LocalFlagsProvider mLocalFlags;
protected final RemoteFlagsProvider mRemoteFlags;
protected final JsonSerializer mJsonSerializer;
Expand All @@ -71,7 +72,7 @@ public MixpanelAPI() {
* @param useGzipCompression whether to use gzip compression for network requests
*/
public MixpanelAPI(boolean useGzipCompression) {
this(null, null, null, null, useGzipCompression, null, null, null, null, null);
this(null, null, null, null, useGzipCompression, null, null, null, null, null, null);
}

/**
Expand Down Expand Up @@ -100,7 +101,7 @@ public MixpanelAPI(RemoteFlagsConfig remoteFlagsConfig) {
* @param remoteFlagsConfig configuration for remote feature flags evaluation (can be null)
*/
private MixpanelAPI(LocalFlagsConfig localFlagsConfig, RemoteFlagsConfig remoteFlagsConfig) {
this(null, null, null, null, false, localFlagsConfig, remoteFlagsConfig, null, null, null);
this(null, null, null, null, false, localFlagsConfig, remoteFlagsConfig, null, null, null, null);
}

/**
Expand All @@ -113,7 +114,7 @@ private MixpanelAPI(LocalFlagsConfig localFlagsConfig, RemoteFlagsConfig remoteF
* @see #MixpanelAPI()
*/
public MixpanelAPI(String eventsEndpoint, String peopleEndpoint) {
this(eventsEndpoint, peopleEndpoint, null, null, false, null, null, null, null, null);
this(eventsEndpoint, peopleEndpoint, null, null, false, null, null, null, null, null, null);
}

/**
Expand All @@ -127,7 +128,7 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint) {
* @see #MixpanelAPI()
*/
public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint) {
this(eventsEndpoint, peopleEndpoint, groupsEndpoint, null, false, null, null, null, null, null);
this(eventsEndpoint, peopleEndpoint, groupsEndpoint, null, false, null, null, null, null, null, null);
}

/**
Expand All @@ -142,7 +143,7 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEn
* @see #MixpanelAPI()
*/
public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint, String importEndpoint) {
this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, false, null, null, null, null, null);
this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, false, null, null, null, null, null, null);
}

/**
Expand All @@ -158,7 +159,7 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEn
* @see #MixpanelAPI()
*/
public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEndpoint, String importEndpoint, boolean useGzipCompression) {
this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, useGzipCompression, null, null, null, null, null);
this(eventsEndpoint, peopleEndpoint, groupsEndpoint, importEndpoint, useGzipCompression, null, null, null, null, null, null);
}

/**
Expand All @@ -168,16 +169,17 @@ public MixpanelAPI(String eventsEndpoint, String peopleEndpoint, String groupsEn
*/
private MixpanelAPI(Builder builder) {
this(
builder.eventsEndpoint,
builder.peopleEndpoint,
builder.groupsEndpoint,
builder.importEndpoint,
builder.eventsEndpoint,
builder.peopleEndpoint,
builder.groupsEndpoint,
builder.importEndpoint,
builder.useGzipCompression,
builder.flagsConfig instanceof LocalFlagsConfig ? (LocalFlagsConfig) builder.flagsConfig : null,
builder.flagsConfig instanceof RemoteFlagsConfig ? (RemoteFlagsConfig) builder.flagsConfig : null,
builder.jsonSerializer,
builder.connectTimeout,
builder.readTimeout
builder.readTimeout,
builder.importMaxMessageCount
);
}

Expand All @@ -192,18 +194,22 @@ private MixpanelAPI(Builder builder) {
* @param localFlagsConfig configuration for local feature flags
* @param remoteFlagsConfig configuration for remote feature flags
* @param jsonSerializer custom JSON serializer (null uses default)
* @param connectTimeout connection timeout in milliseconds (null uses default)
* @param readTimeout read timeout in milliseconds (null uses default)
* @param importMaxMessageCount maximum messages per import batch (null uses default)
*/
private MixpanelAPI(
String eventsEndpoint,
String peopleEndpoint,
String groupsEndpoint,
String importEndpoint,
boolean useGzipCompression,
LocalFlagsConfig localFlagsConfig,
String eventsEndpoint,
String peopleEndpoint,
String groupsEndpoint,
String importEndpoint,
boolean useGzipCompression,
LocalFlagsConfig localFlagsConfig,
RemoteFlagsConfig remoteFlagsConfig,
JsonSerializer jsonSerializer,
Integer connectTimeout,
Integer readTimeout
Integer readTimeout,
Integer importMaxMessageCount
) {
mEventsEndpoint = eventsEndpoint != null ? eventsEndpoint : Config.BASE_ENDPOINT + "/track";
mPeopleEndpoint = peopleEndpoint != null ? peopleEndpoint : Config.BASE_ENDPOINT + "/engage";
Expand All @@ -212,6 +218,8 @@ private MixpanelAPI(
mUseGzipCompression = useGzipCompression;
mConnectTimeout = connectTimeout != null ? connectTimeout : DEFAULT_CONNECT_TIMEOUT_MILLIS;
mReadTimeout = readTimeout != null ? readTimeout : DEFAULT_READ_TIMEOUT_MILLIS;
mImportMaxMessageCount = importMaxMessageCount != null ?
Math.min(importMaxMessageCount, Config.IMPORT_MAX_MESSAGE_SIZE) : Config.IMPORT_MAX_MESSAGE_SIZE;
mDefaultJsonSerializer = new OrgJsonSerializer();
if (jsonSerializer != null) {
logger.log(Level.INFO, "Custom JsonSerializer provided: " + jsonSerializer.getClass().getName());
Expand Down Expand Up @@ -269,14 +277,41 @@ public void deliver(ClientDelivery toSend) throws IOException {
* should be called in a separate thread or in a queue consumer.
*
* @param toSend a ClientDelivery containing a number of Mixpanel messages
* @param useIpAddress if true, Mixpanel will use the ip address of the request for geolocation
* @throws IOException
* @see ClientDelivery
*/
public void deliver(ClientDelivery toSend, boolean useIpAddress) throws IOException {
String ipParameter = "ip=0";
if (useIpAddress) {
ipParameter = "ip=1";
}
DeliveryOptions options = new DeliveryOptions.Builder()
.useIpAddress(useIpAddress)
.build();
deliver(toSend, options);
}

/**
* Attempts to send a given delivery to the Mixpanel servers with custom options.
* Will block, possibly on multiple server requests. For most applications, this method
* should be called in a separate thread or in a queue consumer.
*
* <p>Example usage:
* <pre>{@code
* DeliveryOptions options = new DeliveryOptions.Builder()
* .importStrictMode(false) // Disable strict validation for imports
* .useIpAddress(true) // Use IP address for geolocation (events/people/groups only)
* .build();
*
* mixpanelApi.deliver(delivery, options);
* }</pre>
*
* @param toSend a ClientDelivery containing a number of Mixpanel messages
* @param options configuration options for delivery
* @throws IOException if there's a network error
* @throws MixpanelServerException if the server rejects the messages
* @see ClientDelivery
* @see DeliveryOptions
*/
public void deliver(ClientDelivery toSend, DeliveryOptions options) throws IOException {
String ipParameter = options.useIpAddress() ? "ip=1" : "ip=0";

String eventsUrl = mEventsEndpoint + "?" + ipParameter;
List<JSONObject> events = toSend.getEventsMessages();
Expand All @@ -290,10 +325,10 @@ public void deliver(ClientDelivery toSend, boolean useIpAddress) throws IOExcept
List<JSONObject> groupMessages = toSend.getGroupMessages();
sendMessages(groupMessages, groupsUrl);

// Handle import messages - use strict mode and extract token for auth
List<JSONObject> importMessages = toSend.getImportMessages();
if (importMessages.size() > 0) {
String importUrl = mImportEndpoint + "?strict=1";
String strictParam = options.isImportStrictMode() ? "1" : "0";
String importUrl = mImportEndpoint + "?strict=" + strictParam;
sendImportMessages(importMessages, importUrl);
}
}
Expand Down Expand Up @@ -426,10 +461,10 @@ private void sendImportMessages(List<JSONObject> messages, String endpointUrl) t
}
}

// Send messages in batches (max 2000 per batch for /import)
// Send messages in batches (max 2000 per batch for /import by default)
// If token is empty, the server will reject with 401 Unauthorized
for (int i = 0; i < messages.size(); i += Config.IMPORT_MAX_MESSAGE_SIZE) {
int endIndex = i + Config.IMPORT_MAX_MESSAGE_SIZE;
for (int i = 0; i < messages.size(); i += mImportMaxMessageCount) {
int endIndex = i + mImportMaxMessageCount;
endIndex = Math.min(endIndex, messages.size());
List<JSONObject> batch = messages.subList(i, endIndex);

Expand Down Expand Up @@ -534,7 +569,7 @@ private String dataString(List<JSONObject> messages) {
responseStream = conn.getInputStream();
response = slurp(responseStream);
} catch (IOException e) {
// HTTP error codes (401, 400, etc.) throw IOException when calling getInputStream()
// HTTP error codes (401, 400, 413, etc.) throw IOException when calling getInputStream()
// Check if it's an HTTP error and read the error stream for details
InputStream errorStream = conn.getErrorStream();
if (errorStream != null) {
Expand All @@ -559,12 +594,24 @@ private String dataString(List<JSONObject> messages) {
}
}

// Import endpoint returns JSON like {"code":200,"status":"OK","num_records_imported":N}
// Import endpoint returns different formats depending on strict mode:
// - strict=1: JSON like {"code":200,"status":"OK","num_records_imported":N}
// - strict=0: Plain text "0" (not imported) or "1" (imported)
if (response == null) {
return false;
}

// Parse JSON response
// First, try to handle strict=0 response format (plain text "0" or "1")
String trimmedResponse = response.trim();
if ("1".equals(trimmedResponse)) {
// strict=0 with successful import
return true;
} else if ("0".equals(trimmedResponse)) {
// strict=0 with failed import (events not imported, reason unknown)
return false;
}

// Try to parse as JSON response (strict=1 format)
try {
JSONObject jsonResponse = new JSONObject(response);

Expand Down Expand Up @@ -659,6 +706,7 @@ public static class Builder {
private JsonSerializer jsonSerializer;
private Integer connectTimeout;
private Integer readTimeout;
private Integer importMaxMessageCount;

/**
* Sets the endpoint URL for Mixpanel events messages.
Expand Down Expand Up @@ -768,6 +816,22 @@ public Builder readTimeout(int readTimeoutInMillis) {
return this;
}

/**
* Sets the maximum number of messages to include in a single batch for the /import endpoint.
* The default value is 2000 messages per batch.
* The max accepted value is 2000
*
* @param importMaxMessageCount the maximum number of import messages per batch.
* Value must be greater than 0 and less than or equal to 2000.
* @return this Builder instance for method chaining
*/
public Builder importMaxMessageCount(int importMaxMessageCount) {
if (importMaxMessageCount > 0 && importMaxMessageCount <= Config.IMPORT_MAX_MESSAGE_SIZE) {
this.importMaxMessageCount = importMaxMessageCount;
}
return this;
}

/**
* Builds and returns a new MixpanelAPI instance with the configured settings.
*
Expand Down
Loading