diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java index 0e66140744..9332e71bc1 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java @@ -19,7 +19,6 @@ package org.apache.parquet.bytes; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -367,8 +366,18 @@ private StreamBytesInput(InputStream in, int byteCount) { @Override public void writeAllTo(OutputStream out) throws IOException { LOG.debug("write All {} bytes", byteCount); - // TODO: more efficient - out.write(this.toByteArray()); + // Transfer in chunks to avoid allocating a byteCount-sized intermediate buffer + byte[] buffer = new byte[Math.min(byteCount, 8192)]; + int remaining = byteCount; + while (remaining > 0) { + int toRead = Math.min(remaining, buffer.length); + int n = in.readNBytes(buffer, 0, toRead); + if (n < toRead) { + throw new EOFException("Reached the end of stream with " + (remaining - n) + " bytes left to read"); + } + out.write(buffer, 0, n); + remaining -= n; + } } @Override @@ -395,8 +404,11 @@ void writeInto(ByteBuffer buffer) { public byte[] toByteArray() throws IOException { LOG.debug("read all {} bytes", byteCount); - byte[] buf = new byte[byteCount]; - new DataInputStream(in).readFully(buf); + byte[] buf = in.readNBytes(byteCount); + if (buf.length != byteCount) { + throw new EOFException( + "Reached the end of stream with " + (byteCount - buf.length) + " bytes left to read"); + } return buf; } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java index ae2bd87ac9..2db4d77f6e 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java @@ -195,8 +195,7 @@ public void testLz4RawHeapDecompressorCanCopyLargePage() throws IOException { final byte[] raw = new byte[size]; new Random(42).nextBytes(raw); - try (TrackingByteBufferAllocator allocator = - TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator()); + try (TrackingByteBufferAllocator allocator = TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator()); ByteBufferReleaser releaser = new ByteBufferReleaser(allocator)) { CodecFactory heapCodecFactory = new CodecFactory(new Configuration(), pageSize); BytesInputCompressor compressor = heapCodecFactory.getCompressor(CompressionCodecName.LZ4_RAW);