Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ but ended up as a complete rewrite. Key differences/features:
- JMX-friendly
- real-time stats
- availability in Maven Central
- no third-party dependencies
- minimal third-party dependencies
- transparent MySQL 8.0 `TRANSACTION_PAYLOAD` support for ZSTD-compressed transactions
- test suite over different versions of MySQL releases

> If you are looking for something similar in other languages - check out
Expand Down Expand Up @@ -84,6 +85,10 @@ kick off from a specific filename or position, use `client.setBinlogFilename(fil
> `client.connect()` is blocking (meaning that client will listen for events in the current thread).
`client.connect(timeout)`, on the other hand, spawns a separate thread.

> MySQL 8.0 `binlog_transaction_compression` is supported transparently. `TRANSACTION_PAYLOAD`
events are decompressed incrementally and delivered to listeners as their ordinary inner events,
so consumers continue to see `QUERY`, `TABLE_MAP`, row, and commit events.


#### MariaDB

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.zendesk</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.30.3</version>
<version>0.30.4-SNAPSHOT</version>

<name>mysql-binlog-connector-java</name>
<description>MySQL Binary Log connector</description>
Expand Down
125 changes: 92 additions & 33 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import javax.net.ssl.X509TrustManager;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
Expand Down Expand Up @@ -1130,32 +1131,36 @@ private void listenForEventPackets() throws IOException {
completeShutdown = true;
break;
}
Event event;
try {
event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?
new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :
inputStream);
if (event == null) {
throw new EOFException();
}
} catch (Exception e) {
Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
if (cause instanceof EOFException || cause instanceof SocketException) {
throw e;
ByteArrayInputStream eventStream = packetLength == MAX_PACKET_LENGTH ?
new ByteArrayInputStream(new PacketPayloadInputStream(inputStream, packetLength - 1, true)) :
inputStream;
// A TRANSACTION_PAYLOAD packet unpacks into several inner events; nextEvent() emits the
// first and buffers the rest, so keep pulling until this packet (and any payload it
// carried) is fully drained before reading the next one off the wire.
do {
Event event;
try {
event = eventDeserializer.nextEvent(eventStream);
if (event == null) {
throw new EOFException();
}
} catch (Exception e) {
Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
if (cause instanceof EOFException || cause instanceof SocketException) {
throw e;
}
if (isConnected()) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onEventDeserializationFailure(this, e);
}
}
break;
}
if (isConnected()) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onEventDeserializationFailure(this, e);
}
eventLastSeen = System.currentTimeMillis();
handleEvent(event);
}
continue;
}
if (isConnected()) {
eventLastSeen = System.currentTimeMillis();
updateGtidSet(event);
notifyEventListeners(event);
updateClientBinlogFilenameAndPosition(event);
}
} while (eventDeserializer.hasPendingTransactionPayloadEvent());
}
} catch (Exception e) {
if (isConnected()) {
Expand All @@ -1174,16 +1179,70 @@ private void listenForEventPackets() throws IOException {
}
}

private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int packetLength) throws IOException {
byte[] result = inputStream.read(packetLength);
int chunkLength;
do {
chunkLength = inputStream.readInteger(3);
inputStream.skip(1); // 1 byte for sequence
result = Arrays.copyOf(result, result.length + chunkLength);
inputStream.fill(result, result.length - chunkLength, chunkLength);
} while (chunkLength == Packet.MAX_LENGTH);
return result;
// TRANSACTION_PAYLOAD events are unpacked transparently by the EventDeserializer, so by the time
// an event reaches here it is always an ordinary event (an inner event of a compressed transaction
// is indistinguishable from a standalone one) and needs no special handling.
void handleEvent(Event event) {
updateGtidSet(event);
notifyEventListeners(event);
updateClientBinlogFilenameAndPosition(event);
}

static final class PacketPayloadInputStream extends InputStream {
private final ByteArrayInputStream inputStream;
private int chunkRemaining;
private boolean moreChunksExpected;

PacketPayloadInputStream(ByteArrayInputStream inputStream, int firstChunkRemaining,
boolean moreChunksExpected) {
this.inputStream = inputStream;
this.chunkRemaining = firstChunkRemaining;
this.moreChunksExpected = moreChunksExpected;
}

@Override
public int read() throws IOException {
if (!ensureChunkAvailable()) {
return -1;
}
int read = inputStream.read();
chunkRemaining--;
return read;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
if (!ensureChunkAvailable()) {
return -1;
}
int read = inputStream.read(b, off, Math.min(len, chunkRemaining));
if (read == -1) {
throw new EOFException("Unexpected end of packet; " + chunkRemaining +
" bytes still expected in the current chunk");
}
chunkRemaining -= read;
return read;
}

private boolean ensureChunkAvailable() throws IOException {
while (chunkRemaining == 0) {
if (!moreChunksExpected) {
return false;
}
int chunkLength = inputStream.readInteger(3);
inputStream.skip(1); // 1 byte for sequence
chunkRemaining = chunkLength;
moreChunksExpected = chunkLength == Packet.MAX_LENGTH;
}
return true;
}
}

private void updateClientBinlogFilenameAndPosition(Event event) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package com.github.shyiko.mysql.binlog.event;

import java.io.InputStream;
import java.util.ArrayList;


public class TransactionPayloadEventData implements EventData {
private int payloadSize;
private long payloadSize;
private long uncompressedSize;
private int compressionType;
private byte[] payload;
private ArrayList<Event> uncompressedEvents;
private transient InputStream payloadInputStream;
private ArrayList<Event> uncompressedEvents = new ArrayList<Event>();

public ArrayList<Event> getUncompressedEvents() {
return uncompressedEvents;
Expand All @@ -19,13 +21,25 @@ public void setUncompressedEvents(ArrayList<Event> uncompressedEvents) {
}

public int getPayloadSize() {
return payloadSize;
if (payloadSize > Integer.MAX_VALUE) {
throw new IllegalStateException("Transaction payload size " + payloadSize +
" exceeds the maximum int value");
}
return (int) payloadSize;
}

public void setPayloadSize(int payloadSize) {
this.payloadSize = payloadSize;
}

public long getPayloadSizeLong() {
return payloadSize;
}

public void setPayloadSize(long payloadSize) {
this.payloadSize = payloadSize;
}

public long getUncompressedSize() {
return uncompressedSize;
}
Expand All @@ -50,14 +64,22 @@ public void setPayload(byte[] payload) {
this.payload = payload;
}

public InputStream getPayloadInputStream() {
return payloadInputStream;
}

public void setPayloadInputStream(InputStream payloadInputStream) {
this.payloadInputStream = payloadInputStream;
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("TransactionPayloadEventData");
sb.append("{compression_type=").append(compressionType).append(", payload_size=").append(payloadSize).append(", uncompressed_size='").append(uncompressedSize).append('\'');
sb.append(", payload: ");
sb.append("\n");
for (Event e : uncompressedEvents) {
for (Event e : getUncompressedEvents()) {
sb.append(e.toString());
sb.append("\n");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public class EventDeserializer {
private EventDataDeserializer tableMapEventDataDeserializer;
private EventDataDeserializer formatDescEventDataDeserializer;

// Streams the inner events of a TRANSACTION_PAYLOAD out one per nextEvent() call (see nextEvent),
// so a compressed transaction is unpacked transparently without materializing its whole image.
private final TransactionPayloadEventBuffer transactionPayloadBuffer = new TransactionPayloadEventBuffer();

public EventDeserializer() {
this(new EventHeaderV4Deserializer(), new NullEventDataDeserializer());
}
Expand Down Expand Up @@ -217,6 +221,10 @@ private void ensureCompatibility(EventDataDeserializer eventDataDeserializer) {
deserializer.setDeserializeIntegerAsByteArray(
compatibilitySet.contains(CompatibilityMode.INTEGER_AS_BYTE_ARRAY)
);
} else if (eventDataDeserializer instanceof TransactionPayloadEventDataDeserializer) {
// Inner events of a transaction payload are parsed by a nested deserializer; carry the
// compatibility modes through to it so they apply to those events as well.
((TransactionPayloadEventDataDeserializer) eventDataDeserializer).setCompatibilityMode(compatibilitySet);
}
}

Expand All @@ -226,6 +234,14 @@ private void ensureCompatibility(EventDataDeserializer eventDataDeserializer) {
* @throws IOException if connection gets closed
*/
public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
// A previously-read TRANSACTION_PAYLOAD is unpacked transparently: emit its remaining inner
// events (or surface a deferred unpack failure) before reading the next event off the stream.
if (transactionPayloadBuffer.hasPending()) {
Event innerEvent = transactionPayloadBuffer.next();
if (innerEvent != null) {
return innerEvent;
}
}
if (inputStream.peek() == -1) {
return null;
}
Expand All @@ -238,16 +254,36 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
case TABLE_MAP:
eventData = deserializeTableMapEventData(inputStream, eventHeader);
break;
case TRANSACTION_PAYLOAD:
case TRANSACTION_PAYLOAD: {
EventDataDeserializer deserializer = getEventDataDeserializer(EventType.TRANSACTION_PAYLOAD);
if (deserializer instanceof TransactionPayloadEventDataDeserializer) {
// Stream the payload's inner events out as ordinary top-level events: the buffer
// returns the first one now and holds the rest for subsequent nextEvent() calls.
return transactionPayloadBuffer.open(
(TransactionPayloadEventDataDeserializer) deserializer, inputStream, eventHeader,
checksumLength);
}
// A custom deserializer handles the payload its own way; surface the TRANSACTION_PAYLOAD
// event as-is (pre-streaming behavior), with any inner table maps registered below.
eventData = deserializeTransactionPayloadEventData(inputStream, eventHeader);
break;
}
default:
EventDataDeserializer eventDataDeserializer = getEventDataDeserializer(eventHeader.getEventType());
eventData = deserializeEventData(inputStream, eventHeader, eventDataDeserializer);
}
return new Event(eventHeader, eventData);
}

/**
* @return {@code true} while a previously-read TRANSACTION_PAYLOAD still has inner events (or a
* deferred unpack failure) to emit, so a packet-oriented reader knows not to pull the next packet
* yet.
*/
public boolean hasPendingTransactionPayloadEvent() {
return transactionPayloadBuffer.hasPending();
}

private EventData deserializeFormatDescriptionEventData(ByteArrayInputStream inputStream, EventHeader eventHeader)
throws EventDataDeserializationException {
EventDataDeserializer eventDataDeserializer =
Expand Down Expand Up @@ -293,9 +329,12 @@ public EventData deserializeTransactionPayloadEventData(ByteArrayInputStream inp
TransactionPayloadEventData transactionPayloadEventData = (TransactionPayloadEventData) eventData;

/**
* Handling for TABLE_MAP events withing the transaction payload event. This is to ensure that for the table map
* events within the transaction payload, the target table id and the event gets added to the
* tableMapEventByTableId map. This is map is later used while deserializing rows.
* Handling for TABLE_MAP events within the transaction payload event, so a row event in the
* payload resolves against its table map. Inner events are now parsed lazily and streamed
* (see {@link TransactionPayloadEventBuffer}), each payload getting a self-contained inner
* deserializer whose own table-map cache is populated in stream order, so this loop is a no-op
* in the streaming path (getUncompressedEvents() is empty). It is kept for a custom
* TRANSACTION_PAYLOAD deserializer that still materializes inner events eagerly.
*/
for (Event event : transactionPayloadEventData.getUncompressedEvents()) {
if (event.getHeader().getEventType() == EventType.TABLE_MAP && event.getData() != null) {
Expand Down Expand Up @@ -329,11 +368,15 @@ public EventData deserializeTableMapEventData(ByteArrayInputStream inputStream,

private EventData deserializeEventData(ByteArrayInputStream inputStream, EventHeader eventHeader,
EventDataDeserializer eventDataDeserializer) throws EventDataDeserializationException {
int eventBodyLength = (int) eventHeader.getDataLength() - checksumLength;
long eventBodyLength = eventHeader.getDataLength() - checksumLength;
EventData eventData;
try {
inputStream.enterBlock(eventBodyLength);
try {
if (eventBodyLength > Integer.MAX_VALUE) {
throw new IOException("Event data length " + eventBodyLength +
" exceeds the maximum supported size of " + Integer.MAX_VALUE + " bytes");
}
eventData = eventDataDeserializer.deserialize(inputStream);
} finally {
inputStream.skipToTheEndOfTheBlock();
Expand Down
Loading