support for binlog compressed payload (zstd)#177
Conversation
…PAYLOAD (zstd / none) MySQL 8.0.20+ can compress groups of binlog events into a single TRANSACTION_PAYLOAD event when binlog_transaction_compression=ON. Previously, listeners received the opaque wrapper and never saw the individual inner events, causing GTID tracking to stall and making row-change events invisible. Changes: - BinaryLogClient: intercept TRANSACTION_PAYLOAD events, decompress the payload, restamp each inner event's position fields with those of the outer envelope, then notify listeners per inner event and advance the GTID set / binlog position as usual. - TransactionPayloadEventDataDeserializer: add COMPRESSION_TYPE_NONE (255) alongside ZSTD (0), propagate the outer EventDeserializer's CompatibilityMode flags into the inner deserializer used for decompressed events, and throw a clear error for unknown types. - EventDeserializer: forward CompatibilityMode changes to any registered TransactionPayloadEventDataDeserializer. - TransactionPayloadEventData: initialise uncompressedEvents to an empty list to avoid NPE on toString() before deserialization runs. - Tests: unit-test GTID advancement through a compressed transaction, transparent inner-event notification with correct header restamping, COMPRESSION_TYPE_NONE deserialization, unsupported-type error path, and CompatibilityMode propagation to inner row events. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@osheroff as you asked, this is the same change done in the connector. Please review. |
|
@osheroff i've added a further commit to iterate over decompressed payload chunks rather than just allocating an entire array buffer to temporarly store the result. |
| break; | ||
| } | ||
| restampInnerEventHeader(transactionPayloadEvent, innerEvent); | ||
| updateGtidSet(innerEvent); |
There was a problem hiding this comment.
arguable but i think this should happen after we finish processing all the compressed events -- ie after we break out of the while loop
There was a problem hiding this comment.
I guess in other places in code we bump the gtid set first, mph
There was a problem hiding this comment.
Resolved by the restructure: inner events now flow through the exact same updateGtidSet → notifyEventListeners → updateClientBinlogFilenameAndPosition path as every other event, so the gtid set is bumped first, per-event, just like everywhere else — no special-case ordering to reason about. (4467565)
| } | ||
| } | ||
|
|
||
| private void notifyTransactionPayloadEvent(Event transactionPayloadEvent) { |
There was a problem hiding this comment.
| private void notifyTransactionPayloadEvent(Event transactionPayloadEvent) { | |
| private void decompressTransaction(Event transactionPayloadEvent) { |
There was a problem hiding this comment.
This method no longer lives in the client — the decompress/stream loop moved into EventDeserializer.nextTransactionPayloadEvent() + fillTransactionPayloadBuffer(). So the rename is moot, but the 'this decompresses a transaction' intent carried over into the new names. (4467565)
|
|
||
| private void notifyTransactionPayloadEvent(Event transactionPayloadEvent) { | ||
| TransactionPayloadEventData transactionPayloadEventData = | ||
| (TransactionPayloadEventData) EventDataWrapper.internal(transactionPayloadEvent.getData()); |
There was a problem hiding this comment.
this line looks terrible, there must be a cleaner way to write this?
There was a problem hiding this comment.
Removed from the client. The one remaining cast now sits in EventDeserializer.nextTransactionPayloadEvent as a single (TransactionPayloadEventData) EventDataWrapper.internal(eventData), matching the EventDataWrapper.internal(...) pattern already used throughout that class. (4467565)
| void handleEvent(Event event) { | ||
| EventType eventType = event.getHeader().getEventType(); | ||
| if (eventType == EventType.TRANSACTION_PAYLOAD && | ||
| EventDataWrapper.internal(event.getData()) instanceof TransactionPayloadEventData) { |
There was a problem hiding this comment.
the AI is really fond of this pattern. I think just checking eventType is plenty enough, no idea wtf the second check is here
There was a problem hiding this comment.
Gone — handleEvent no longer special-cases TRANSACTION_PAYLOAD at all. The unwrap moved down into EventDeserializer, so there's neither the eventType check nor the instanceof here anymore. (4467565)
|
structurally, I think this could live at a lower level than the binlog client. maybe somewhere alongside |
…r, not BinaryLogClient Addresses osheroff's review on osheroff#177. The payload-unwrapping logic now lives below the binlog client: EventDeserializer.nextEvent() transparently streams a TRANSACTION_PAYLOAD's inner events (each restamped with the outer envelope's event-length / next-position) one at a time, reading one ahead so a packet-oriented reader knows when a payload is still being drained. As a result BinaryLogClient no longer special-cases TRANSACTION_PAYLOAD: handleEvent() is again a plain updateGtidSet -> notifyEventListeners -> updateClientBinlogFilenameAndPosition, and the read loop simply keeps pulling events until the packet (and any payload it carried) is drained. Inner events flow through the exact same gtid/position path as standalone events. BinaryLogFileReader gets the same transparent unpacking for free. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Good call — pushed the whole unwrap below the binlog client (4467565).
Net effect:
Streaming/backpressure is preserved (still one inner event at a time, no full materialization), and a mid-payload decompress/parse failure is surfaced as an ordinary deserialization failure (never an EOF/socket error) so it's reported-and-skipped rather than triggering a reconnect-and-refetch loop on the same payload. |
|
Pushed What changed in this update:
Validation run locally:
I cleaned the generated onetimeserver temp data afterward. |
|
@osheroff I've done the refactor you asked, along with paying attention to properly use streams vs byte[] to store the payload. Please let me know if it looks good to you, i'm going to test it tomorrow on a real environment |
| if (eventBodyLength > Integer.MAX_VALUE) { | ||
| throw new IOException("Event data length " + eventBodyLength + | ||
| " exceeds the maximum supported size of " + Integer.MAX_VALUE + " bytes"); | ||
| } |
| blockLength--; | ||
| } | ||
| return inputStream.read(); | ||
| return read; |
There was a problem hiding this comment.
no, undo all changes to this file
|
sorry, you gotta start over at this point; the aI has just littered changes all over the place I don't want and it's impossible to tell what's relevant and what isn't. usually claude isn't quite so aggressive. the structure is almost right except you need a new class, a |
|
Superseded by #178 — please review there instead. #178 is a refactor of this work with the same behavior and the same >2GB / cross-packet Continuing the review on #178. |
|
ok i kinda hacked this all into shape here #179, tested, works locally. I'm missing a gpg key from an old laptop but will release and do the maxwell dance later this week |
Summary
MySQL 8.0.20 introduced
binlog_transaction_compression=ON, which wraps groups of binlog events into a singleTRANSACTION_PAYLOADevent (optionally ZSTD-compressed). Without this change, listeners only ever received the opaque wrapper — individual row-change events were invisible and GTID tracking stalled.handleEvent()interceptsTRANSACTION_PAYLOADevents, decompresses/unpacks inner events, restamps each inner event'seventLengthandnextPositionwith the outer envelope's values (sogetBinlogPosition()stays consistent), then dispatches them through the normalupdateGtidSet→notifyEventListeners→updateClientBinlogFilenameAndPositionpipeline.COMPRESSION_TYPE_NONE(255) alongsideZSTD(0); propagates the outerEventDeserializer'sCompatibilityModeflags into the inner deserializer so row events (e.g.CHAR_AND_BINARY_AS_BYTE_ARRAY) behave consistently; throws a clearIOExceptionfor unknown compression types.setCompatibilityMode()calls to any registeredTransactionPayloadEventDataDeserializer.uncompressedEventsto an empty list to prevent NPE intoString()before deserialization completes.Test plan
BinaryLogClientTest#testTransactionPayloadNotifiesInnerEvents— inner events are notified with correct type, XID value, and restamped position headersBinaryLogClientTest#testGtidSetAdvancesWhenCompressedTransactionCommitsInsidePayload— GTID set advances correctly when the GTID precedes a compressed transaction payloadTransactionPayloadEventDataDeserializerTest#deserializeUncompressedPayload—COMPRESSION_TYPE_NONEpayload deserializes successfullyTransactionPayloadEventDataDeserializerTest#deserializeUnsupportedCompressionType— unknown type throwsIOExceptionwith a descriptive messageTransactionPayloadEventDataDeserializerTest#deserializeAppliesCompatibilityModesToInnerEvents—CompatibilityModepropagates to inner row events (verifiesbyte[]column type)TransactionPayloadEventDataDeserializerTest#deserializeCompressedPayload— existing ZSTD path still passes🤖 Generated with Claude Code