Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ public class RowHelper implements Serializable {

private static final long serialVersionUID = 1L;

/**
* Threshold in bytes for releasing the internal reuse buffer. When big records are written, the
* BinaryRowWriter's internal segment can grow very large via grow(). The {@link
* #resetIfTooLarge()} method checks this threshold and releases the bloated
* reuseRow/reuseWriter to avoid holding onto oversized buffers indefinitely.
*/
private static final int REUSE_RELEASE_THRESHOLD = 4 * 1024 * 1024; // 4MB

private final FieldGetter[] fieldGetters;
private final ValueSetter[] valueSetters;
private final boolean[] writeNulls;
Expand Down Expand Up @@ -81,6 +89,20 @@ public void copyInto(InternalRow row) {
reuseWriter.complete();
}

/**
* Release the internal reuse buffer if the segment exceeds the threshold. This should be called
* after the caller has finished using the reuseRow (e.g. after serialization), so that large
* records don't linger in memory.
*/
public void resetIfTooLarge() {
if (reuseWriter != null
&& reuseWriter.getSegments() != null
&& reuseWriter.getSegments().size() > REUSE_RELEASE_THRESHOLD) {
reuseRow = null;
reuseWriter = null;
}
}

public BinaryRow reuseRow() {
return reuseRow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,27 @@ public void fill(byte[] value) {
Arrays.fill(this.length, value.length);
}

/** The maximum size of array to allocate. Some VMs reserve header words in an array. */
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

private void reserveBytes(int newCapacity) {
if (newCapacity > buffer.length) {
int newBytesCapacity = newCapacity * 2;
try {
buffer = Arrays.copyOf(buffer, newBytesCapacity);
} catch (NegativeArraySizeException e) {
if (newCapacity > MAX_ARRAY_SIZE) {
throw new RuntimeException(
String.format(
"The new claimed capacity %s is too large, will overflow the INTEGER.MAX after multiply by 2. "
+ "Try reduce `read.batch-size` to avoid this exception.",
newCapacity),
e);
"The required byte buffer capacity %s exceeds the maximum array size. "
+ "Try reducing `read.batch-size` to avoid this exception.",
newCapacity));
}
// Try to double the capacity for amortized growth. If doubling would overflow,
// fall back to the exact required capacity (capped at MAX_ARRAY_SIZE).
int newBytesCapacity;
if (newCapacity <= (MAX_ARRAY_SIZE >> 1)) {
newBytesCapacity = newCapacity << 1;
} else {
newBytesCapacity = MAX_ARRAY_SIZE;
}
buffer = Arrays.copyOf(buffer, newBytesCapacity);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ public BinaryRow deserialize(DataInputView source) throws IOException {
return row;
}

/**
* Threshold above which we consider a reuse buffer "oversized" and eligible for shrinking. This
* prevents accumulation of large byte arrays when a few large records inflate the reuse buffer
* and subsequent small records never trigger reallocation.
*/
private static final int REUSE_SHRINK_THRESHOLD = 4 * 1024 * 1024; // 4MB

public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOException {
MemorySegment[] segments = reuse.getSegments();
checkArgument(
Expand All @@ -88,6 +95,12 @@ public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOExc

int length = source.readInt();
if (segments == null || segments[0].size() < length) {
// Need a larger buffer
segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])};
} else if (segments[0].size() > REUSE_SHRINK_THRESHOLD) {
// The existing buffer is oversized (> 4MB). Shrink it to avoid holding onto large
// byte arrays indefinitely, which can cause OOM when many merge channels each
// retain a bloated reuse buffer.
segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])};
}
source.readFully(segments[0].getArray(), 0, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,16 @@ public InternalRowSerializer duplicate() {

@Override
public void serialize(InternalRow row, DataOutputView target) throws IOException {
binarySerializer.serialize(toBinaryRow(row), target);
try {
binarySerializer.serialize(toBinaryRow(row), target);
} finally {
// Must use finally here: toBinaryRow() may inflate RowHelper's internal buffer
// for large records (e.g. 100MB+). The serialization can exit via EOFException
// thrown by SimpleCollectingOutputView.nextSegment() when the sort buffer is
// full, which is caught by BinaryInMemorySortBuffer.write() as a normal signal.
// Without finally, the bloated buffer would never be released on that path.
rowHelper.resetIfTooLarge();
}
}

@Override
Expand Down Expand Up @@ -132,7 +141,13 @@ public InternalRow createReuseInstance() {
@Override
public int serializeToPages(InternalRow row, AbstractPagedOutputView target)
throws IOException {
return binarySerializer.serializeToPages(toBinaryRow(row), target);
try {
return binarySerializer.serializeToPages(toBinaryRow(row), target);
} finally {
// Same as serialize(): must use finally because EOFException may bypass normal
// return when the sort buffer is full.
rowHelper.resetIfTooLarge();
}
}

@Override
Expand Down
101 changes: 101 additions & 0 deletions paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.data;

import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;

import org.junit.jupiter.api.Test;

import java.util.Arrays;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link RowHelper}, focusing on the resetIfTooLarge() behavior. */
class RowHelperTest {

@Test
void testResetIfTooLargeReleasesOversizedBuffer() {
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES()));

// Write a large record (> 4MB) to inflate the internal buffer
byte[] largePayload = new byte[5 * 1024 * 1024]; // 5MB
Arrays.fill(largePayload, (byte) 'x');
GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload);
largeRow.setRowKind(RowKind.INSERT);
helper.copyInto(largeRow);

assertThat(helper.reuseRow()).isNotNull();

// resetIfTooLarge() should release the bloated buffer
helper.resetIfTooLarge();
assertThat(helper.reuseRow()).isNull();
}

@Test
void testResetIfTooLargeKeepsSmallBuffer() {
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.INT()));

// Write a small record (< 4MB)
GenericRow smallRow = GenericRow.of(BinaryString.fromString("hello"), 42);
smallRow.setRowKind(RowKind.INSERT);
helper.copyInto(smallRow);

assertThat(helper.reuseRow()).isNotNull();

// resetIfTooLarge() should NOT release the small buffer
helper.resetIfTooLarge();
assertThat(helper.reuseRow()).isNotNull();
}

@Test
void testResetIfTooLargeBeforeCopyInto() {
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING()));

// reuseRow is null before any copyInto
assertThat(helper.reuseRow()).isNull();

// resetIfTooLarge() should be safe to call when reuseRow is null
helper.resetIfTooLarge();
assertThat(helper.reuseRow()).isNull();
}

@Test
void testReuseIsRecreatedAfterRelease() {
RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES()));

// Write a large record to inflate the buffer
byte[] largePayload = new byte[5 * 1024 * 1024];
GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload);
largeRow.setRowKind(RowKind.INSERT);
helper.copyInto(largeRow);
helper.resetIfTooLarge();
assertThat(helper.reuseRow()).isNull();

// Write a small record — reuseRow should be recreated
GenericRow smallRow = GenericRow.of(BinaryString.fromString("small"), new byte[10]);
smallRow.setRowKind(RowKind.INSERT);
helper.copyInto(smallRow);
assertThat(helper.reuseRow()).isNotNull();

// Small buffer should survive resetIfTooLarge()
helper.resetIfTooLarge();
assertThat(helper.reuseRow()).isNotNull();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.data.columnar.heap;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link HeapBytesVector#putByteArray}, focusing on reserveBytes() overflow safety. */
class HeapBytesVectorReserveBytesTest {

@Test
void testNormalGrowthDoublesCapacity() {
HeapBytesVector vector = new HeapBytesVector(4);
int initialBufferSize = vector.buffer.length;

// Write enough data to trigger growth
byte[] data = new byte[initialBufferSize + 1];
vector.putByteArray(0, data, 0, data.length);

// Buffer should have doubled
assertThat(vector.buffer.length).isEqualTo((initialBufferSize + 1) * 2);
}

@Test
void testPutByteArrayStoresDataCorrectly() {
HeapBytesVector vector = new HeapBytesVector(4);

byte[] data1 = new byte[] {1, 2, 3};
byte[] data2 = new byte[] {4, 5, 6, 7};
vector.putByteArray(0, data1, 0, data1.length);
vector.putByteArray(1, data2, 0, data2.length);

HeapBytesVector.Bytes bytes0 = vector.getBytes(0);
assertThat(bytes0.len).isEqualTo(3);
assertThat(vector.buffer[bytes0.offset]).isEqualTo((byte) 1);
assertThat(vector.buffer[bytes0.offset + 2]).isEqualTo((byte) 3);

HeapBytesVector.Bytes bytes1 = vector.getBytes(1);
assertThat(bytes1.len).isEqualTo(4);
assertThat(vector.buffer[bytes1.offset]).isEqualTo((byte) 4);
}

@Test
void testLargeCapacityDoesNotOverflow() {
HeapBytesVector vector = new HeapBytesVector(2);

// Simulate a scenario where the required capacity is large but still within
// MAX_ARRAY_SIZE. We can't actually allocate Integer.MAX_VALUE bytes in a test,
// but we can verify the logic by checking that a moderately large allocation works.
int largeSize = 64 * 1024 * 1024; // 64MB
byte[] largeData = new byte[largeSize];
vector.putByteArray(0, largeData, 0, largeData.length);

assertThat(vector.buffer.length).isGreaterThanOrEqualTo(largeSize);
assertThat(vector.getBytes(0).len).isEqualTo(largeSize);
}

@Test
void testResetClearsBytesAppended() {
HeapBytesVector vector = new HeapBytesVector(4);

byte[] data = new byte[] {1, 2, 3};
vector.putByteArray(0, data, 0, data.length);

vector.reset();

// After reset, we should be able to write again from the beginning
byte[] data2 = new byte[] {10, 20};
vector.putByteArray(0, data2, 0, data2.length);

HeapBytesVector.Bytes bytes = vector.getBytes(0);
assertThat(bytes.offset).isEqualTo(0);
assertThat(bytes.len).isEqualTo(2);
assertThat(vector.buffer[0]).isEqualTo((byte) 10);
}
}
Loading
Loading