diff --git a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java index a5df6cdf7875..eff490d6f6e6 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/RowHelper.java @@ -36,6 +36,14 @@ public class RowHelper implements Serializable { private static final long serialVersionUID = 1L; + /** + * Threshold in bytes for releasing the internal reuse buffer. When big records are written, the + * BinaryRowWriter's internal segment can grow very large via grow(). The {@link + * #resetIfTooLarge()} method checks this threshold and releases the bloated + * reuseRow/reuseWriter to avoid holding onto oversized buffers indefinitely. + */ + private static final int REUSE_RELEASE_THRESHOLD = 4 * 1024 * 1024; // 4MB + private final FieldGetter[] fieldGetters; private final ValueSetter[] valueSetters; private final boolean[] writeNulls; @@ -81,6 +89,20 @@ public void copyInto(InternalRow row) { reuseWriter.complete(); } + /** + * Release the internal reuse buffer if the segment exceeds the threshold. This should be called + * after the caller has finished using the reuseRow (e.g. after serialization), so that large + * records don't linger in memory. + */ + public void resetIfTooLarge() { + if (reuseWriter != null + && reuseWriter.getSegments() != null + && reuseWriter.getSegments().size() > REUSE_RELEASE_THRESHOLD) { + reuseRow = null; + reuseWriter = null; + } + } + public BinaryRow reuseRow() { return reuseRow; } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java index be0c05eee629..64af656c4dbd 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapBytesVector.java @@ -106,19 +106,27 @@ public void fill(byte[] value) { Arrays.fill(this.length, value.length); } + /** The maximum size of array to allocate. Some VMs reserve header words in an array. */ + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + private void reserveBytes(int newCapacity) { if (newCapacity > buffer.length) { - int newBytesCapacity = newCapacity * 2; - try { - buffer = Arrays.copyOf(buffer, newBytesCapacity); - } catch (NegativeArraySizeException e) { + if (newCapacity > MAX_ARRAY_SIZE) { throw new RuntimeException( String.format( - "The new claimed capacity %s is too large, will overflow the INTEGER.MAX after multiply by 2. " - + "Try reduce `read.batch-size` to avoid this exception.", - newCapacity), - e); + "The required byte buffer capacity %s exceeds the maximum array size. " + + "Try reducing `read.batch-size` to avoid this exception.", + newCapacity)); + } + // Try to double the capacity for amortized growth. If doubling would overflow, + // fall back to the exact required capacity (capped at MAX_ARRAY_SIZE). + int newBytesCapacity; + if (newCapacity <= (MAX_ARRAY_SIZE >> 1)) { + newBytesCapacity = newCapacity << 1; + } else { + newBytesCapacity = MAX_ARRAY_SIZE; } + buffer = Arrays.copyOf(buffer, newBytesCapacity); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java index 49dcee73ef27..5446b9cdd775 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/BinaryRowSerializer.java @@ -80,6 +80,13 @@ public BinaryRow deserialize(DataInputView source) throws IOException { return row; } + /** + * Threshold above which we consider a reuse buffer "oversized" and eligible for shrinking. This + * prevents accumulation of large byte arrays when a few large records inflate the reuse buffer + * and subsequent small records never trigger reallocation. + */ + private static final int REUSE_SHRINK_THRESHOLD = 4 * 1024 * 1024; // 4MB + public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOException { MemorySegment[] segments = reuse.getSegments(); checkArgument( @@ -88,6 +95,12 @@ public BinaryRow deserialize(BinaryRow reuse, DataInputView source) throws IOExc int length = source.readInt(); if (segments == null || segments[0].size() < length) { + // Need a larger buffer + segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])}; + } else if (segments[0].size() > REUSE_SHRINK_THRESHOLD) { + // The existing buffer is oversized (> 4MB). Shrink it to avoid holding onto large + // byte arrays indefinitely, which can cause OOM when many merge channels each + // retain a bloated reuse buffer. segments = new MemorySegment[] {MemorySegment.wrap(new byte[length])}; } source.readFully(segments[0].getArray(), 0, length); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java index c3abc2f4cee6..744e76e631b3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/serializer/InternalRowSerializer.java @@ -59,7 +59,16 @@ public InternalRowSerializer duplicate() { @Override public void serialize(InternalRow row, DataOutputView target) throws IOException { - binarySerializer.serialize(toBinaryRow(row), target); + try { + binarySerializer.serialize(toBinaryRow(row), target); + } finally { + // Must use finally here: toBinaryRow() may inflate RowHelper's internal buffer + // for large records (e.g. 100MB+). The serialization can exit via EOFException + // thrown by SimpleCollectingOutputView.nextSegment() when the sort buffer is + // full, which is caught by BinaryInMemorySortBuffer.write() as a normal signal. + // Without finally, the bloated buffer would never be released on that path. + rowHelper.resetIfTooLarge(); + } } @Override @@ -132,7 +141,13 @@ public InternalRow createReuseInstance() { @Override public int serializeToPages(InternalRow row, AbstractPagedOutputView target) throws IOException { - return binarySerializer.serializeToPages(toBinaryRow(row), target); + try { + return binarySerializer.serializeToPages(toBinaryRow(row), target); + } finally { + // Same as serialize(): must use finally because EOFException may bypass normal + // return when the sort buffer is full. + rowHelper.resetIfTooLarge(); + } } @Override diff --git a/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java b/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java new file mode 100644 index 000000000000..6af641e56ffe --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/RowHelperTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data; + +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowKind; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link RowHelper}, focusing on the resetIfTooLarge() behavior. */ +class RowHelperTest { + + @Test + void testResetIfTooLargeReleasesOversizedBuffer() { + RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES())); + + // Write a large record (> 4MB) to inflate the internal buffer + byte[] largePayload = new byte[5 * 1024 * 1024]; // 5MB + Arrays.fill(largePayload, (byte) 'x'); + GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload); + largeRow.setRowKind(RowKind.INSERT); + helper.copyInto(largeRow); + + assertThat(helper.reuseRow()).isNotNull(); + + // resetIfTooLarge() should release the bloated buffer + helper.resetIfTooLarge(); + assertThat(helper.reuseRow()).isNull(); + } + + @Test + void testResetIfTooLargeKeepsSmallBuffer() { + RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.INT())); + + // Write a small record (< 4MB) + GenericRow smallRow = GenericRow.of(BinaryString.fromString("hello"), 42); + smallRow.setRowKind(RowKind.INSERT); + helper.copyInto(smallRow); + + assertThat(helper.reuseRow()).isNotNull(); + + // resetIfTooLarge() should NOT release the small buffer + helper.resetIfTooLarge(); + assertThat(helper.reuseRow()).isNotNull(); + } + + @Test + void testResetIfTooLargeBeforeCopyInto() { + RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING())); + + // reuseRow is null before any copyInto + assertThat(helper.reuseRow()).isNull(); + + // resetIfTooLarge() should be safe to call when reuseRow is null + helper.resetIfTooLarge(); + assertThat(helper.reuseRow()).isNull(); + } + + @Test + void testReuseIsRecreatedAfterRelease() { + RowHelper helper = new RowHelper(Arrays.asList(DataTypes.STRING(), DataTypes.BYTES())); + + // Write a large record to inflate the buffer + byte[] largePayload = new byte[5 * 1024 * 1024]; + GenericRow largeRow = GenericRow.of(BinaryString.fromString("key"), largePayload); + largeRow.setRowKind(RowKind.INSERT); + helper.copyInto(largeRow); + helper.resetIfTooLarge(); + assertThat(helper.reuseRow()).isNull(); + + // Write a small record — reuseRow should be recreated + GenericRow smallRow = GenericRow.of(BinaryString.fromString("small"), new byte[10]); + smallRow.setRowKind(RowKind.INSERT); + helper.copyInto(smallRow); + assertThat(helper.reuseRow()).isNotNull(); + + // Small buffer should survive resetIfTooLarge() + helper.resetIfTooLarge(); + assertThat(helper.reuseRow()).isNotNull(); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/HeapBytesVectorReserveBytesTest.java b/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/HeapBytesVectorReserveBytesTest.java new file mode 100644 index 000000000000..b635c220cedb --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/columnar/heap/HeapBytesVectorReserveBytesTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.columnar.heap; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link HeapBytesVector#putByteArray}, focusing on reserveBytes() overflow safety. */ +class HeapBytesVectorReserveBytesTest { + + @Test + void testNormalGrowthDoublesCapacity() { + HeapBytesVector vector = new HeapBytesVector(4); + int initialBufferSize = vector.buffer.length; + + // Write enough data to trigger growth + byte[] data = new byte[initialBufferSize + 1]; + vector.putByteArray(0, data, 0, data.length); + + // Buffer should have doubled + assertThat(vector.buffer.length).isEqualTo((initialBufferSize + 1) * 2); + } + + @Test + void testPutByteArrayStoresDataCorrectly() { + HeapBytesVector vector = new HeapBytesVector(4); + + byte[] data1 = new byte[] {1, 2, 3}; + byte[] data2 = new byte[] {4, 5, 6, 7}; + vector.putByteArray(0, data1, 0, data1.length); + vector.putByteArray(1, data2, 0, data2.length); + + HeapBytesVector.Bytes bytes0 = vector.getBytes(0); + assertThat(bytes0.len).isEqualTo(3); + assertThat(vector.buffer[bytes0.offset]).isEqualTo((byte) 1); + assertThat(vector.buffer[bytes0.offset + 2]).isEqualTo((byte) 3); + + HeapBytesVector.Bytes bytes1 = vector.getBytes(1); + assertThat(bytes1.len).isEqualTo(4); + assertThat(vector.buffer[bytes1.offset]).isEqualTo((byte) 4); + } + + @Test + void testLargeCapacityDoesNotOverflow() { + HeapBytesVector vector = new HeapBytesVector(2); + + // Simulate a scenario where the required capacity is large but still within + // MAX_ARRAY_SIZE. We can't actually allocate Integer.MAX_VALUE bytes in a test, + // but we can verify the logic by checking that a moderately large allocation works. + int largeSize = 64 * 1024 * 1024; // 64MB + byte[] largeData = new byte[largeSize]; + vector.putByteArray(0, largeData, 0, largeData.length); + + assertThat(vector.buffer.length).isGreaterThanOrEqualTo(largeSize); + assertThat(vector.getBytes(0).len).isEqualTo(largeSize); + } + + @Test + void testResetClearsBytesAppended() { + HeapBytesVector vector = new HeapBytesVector(4); + + byte[] data = new byte[] {1, 2, 3}; + vector.putByteArray(0, data, 0, data.length); + + vector.reset(); + + // After reset, we should be able to write again from the beginning + byte[] data2 = new byte[] {10, 20}; + vector.putByteArray(0, data2, 0, data2.length); + + HeapBytesVector.Bytes bytes = vector.getBytes(0); + assertThat(bytes.offset).isEqualTo(0); + assertThat(bytes.len).isEqualTo(2); + assertThat(vector.buffer[0]).isEqualTo((byte) 10); + } +} diff --git a/paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java b/paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java new file mode 100644 index 000000000000..1058ec225ca5 --- /dev/null +++ b/paimon-common/src/test/java/org/apache/paimon/data/serializer/BinaryRowSerializerShrinkTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.data.serializer; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.io.DataInputDeserializer; +import org.apache.paimon.io.DataOutputSerializer; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link BinaryRowSerializer#deserialize(BinaryRow, org.apache.paimon.io.DataInputView)}, + * focusing on the REUSE_SHRINK_THRESHOLD behavior. + */ +class BinaryRowSerializerShrinkTest { + + private static final int SHRINK_THRESHOLD = 4 * 1024 * 1024; // 4MB + + @Test + void testDeserializeShrinksOversizedReuseBuffer() throws Exception { + BinaryRowSerializer serializer = new BinaryRowSerializer(1); + + // Serialize a large record (> 4MB) + BinaryRow largeRow = createRowWithPayload(5 * 1024 * 1024); + byte[] largeBytes = serializeRow(serializer, largeRow); + + // Deserialize into a fresh reuse row — buffer grows to hold the large record + BinaryRow reuse = serializer.createInstance(); + DataInputDeserializer largeInput = new DataInputDeserializer(largeBytes); + reuse = serializer.deserialize(reuse, largeInput); + int largeBufferSize = reuse.getSegments()[0].size(); + assertThat(largeBufferSize).isGreaterThanOrEqualTo(5 * 1024 * 1024); + + // Serialize a small record + BinaryRow smallRow = createRowWithPayload(100); + byte[] smallBytes = serializeRow(serializer, smallRow); + + // Deserialize the small record into the same reuse row + // The oversized buffer (> 4MB) should be shrunk to the exact size needed + DataInputDeserializer smallInput = new DataInputDeserializer(smallBytes); + reuse = serializer.deserialize(reuse, smallInput); + int shrunkBufferSize = reuse.getSegments()[0].size(); + assertThat(shrunkBufferSize).isLessThan(SHRINK_THRESHOLD); + } + + @Test + void testDeserializeKeepsSmallReuseBuffer() throws Exception { + BinaryRowSerializer serializer = new BinaryRowSerializer(1); + + // Serialize a small record (< 4MB) + BinaryRow row1 = createRowWithPayload(1024); + byte[] bytes1 = serializeRow(serializer, row1); + + BinaryRow reuse = serializer.createInstance(); + DataInputDeserializer input1 = new DataInputDeserializer(bytes1); + reuse = serializer.deserialize(reuse, input1); + int bufferSize1 = reuse.getSegments()[0].size(); + + // Serialize an even smaller record + BinaryRow row2 = createRowWithPayload(100); + byte[] bytes2 = serializeRow(serializer, row2); + + // Deserialize — buffer should be reused (not shrunk), since it's < 4MB + DataInputDeserializer input2 = new DataInputDeserializer(bytes2); + reuse = serializer.deserialize(reuse, input2); + int bufferSize2 = reuse.getSegments()[0].size(); + assertThat(bufferSize2).isEqualTo(bufferSize1); + } + + @Test + void testDeserializeGrowsBufferWhenNeeded() throws Exception { + BinaryRowSerializer serializer = new BinaryRowSerializer(1); + + // Start with a small record + BinaryRow smallRow = createRowWithPayload(100); + byte[] smallBytes = serializeRow(serializer, smallRow); + + BinaryRow reuse = serializer.createInstance(); + DataInputDeserializer smallInput = new DataInputDeserializer(smallBytes); + reuse = serializer.deserialize(reuse, smallInput); + + // Deserialize a larger record — buffer should grow + BinaryRow largerRow = createRowWithPayload(2048); + byte[] largerBytes = serializeRow(serializer, largerRow); + + DataInputDeserializer largerInput = new DataInputDeserializer(largerBytes); + reuse = serializer.deserialize(reuse, largerInput); + assertThat(reuse.getSegments()[0].size()).isGreaterThanOrEqualTo(2048); + } + + private static BinaryRow createRowWithPayload(int payloadSize) { + BinaryRow row = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(row, payloadSize + 32); + byte[] payload = new byte[payloadSize]; + writer.writeString(0, BinaryString.fromBytes(payload)); + writer.complete(); + return row; + } + + private static byte[] serializeRow(BinaryRowSerializer serializer, BinaryRow row) + throws Exception { + DataOutputSerializer output = new DataOutputSerializer(row.getSizeInBytes() + 4); + serializer.serialize(row, output); + return output.getCopyOfBuffer(); + } +} diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java index 1a66a55130e1..7d72c8a1a91a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/RowDataParquetBuilder.java @@ -96,7 +96,23 @@ public ParquetWriter createWriter(OutputFile out, String compressio .withBloomFilterEnabled( conf.getBoolean( ParquetOutputFormat.BLOOM_FILTER_ENABLED, - ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED)); + ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED)) + .withMinRowCountForPageSizeCheck( + conf.getInt( + ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, + ParquetProperties.DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK)) + .withMaxRowCountForPageSizeCheck( + conf.getInt( + ParquetOutputFormat.MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK, + ParquetProperties.DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK)) + .withStatisticsTruncateLength( + conf.getInt( + ParquetOutputFormat.STATISTICS_TRUNCATE_LENGTH, + ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH)) + .withColumnIndexTruncateLength( + conf.getInt( + ParquetOutputFormat.COLUMN_INDEX_TRUNCATE_LENGTH, + ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH)); new ColumnConfigParser() .withColumnConfig( ParquetOutputFormat.ENABLE_DICTIONARY,