From 1a736905c57cb5b87c604b53f402d374de20f437 Mon Sep 17 00:00:00 2001 From: arnavb Date: Tue, 23 Sep 2025 09:24:29 +0000 Subject: [PATCH 1/3] update --- .../columnindex/ColumnIndexFilter.java | 38 ++++++++- .../hadoop/TestColumnIndexFiltering.java | 81 +++++++++++++++++++ 2 files changed, 118 insertions(+), 1 deletion(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java index 8b6ee1f95d..2232149085 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java @@ -192,7 +192,12 @@ private RowRanges applyPredicate( return allRows(); } - return RowRanges.create(rowCount, func.apply(ci), oi); + if (!isValidMetadata(ci, oi, columnPath)) { + return allRows(); + } + + PrimitiveIterator.OfInt pageIndexes = func.apply(ci); + return RowRanges.create(rowCount, pageIndexes, oi); } @Override @@ -220,4 +225,35 @@ public RowRanges visit(Not not) { throw new IllegalArgumentException( "Predicates containing a NOT must be run through LogicalInverseRewriter. " + not); } + + /** + * Validates that column index and offset index metadata are consistent and can be used safely. + * + * @param columnIndex the column index to validate + * @param offsetIndex the offset index to validate + * @param columnPath the column path for error reporting + * @return true if metadata is valid and safe to use, false if corrupt and should be ignored + */ + private static boolean isValidMetadata(ColumnIndex columnIndex, OffsetIndex offsetIndex, ColumnPath columnPath) { + if (columnIndex == null || offsetIndex == null) { + return true; + } + + int columnIndexSize = columnIndex.getMinValues().size(); + int offsetIndexSize = offsetIndex.getPageCount(); + + if (columnIndexSize != offsetIndexSize) { + LOGGER.warn( + "Column index and offset index size mismatch for column {}: " + + "column index has {} entries but offset index has {} pages. " + + "This indicates corrupted metadata from the writer. " + + "Ignoring column index for filtering to avoid errors.", + columnPath, + columnIndexSize, + offsetIndexSize); + return false; + } + + return true; + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java index 154dd6f5c5..a2558f4910 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java @@ -58,6 +58,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.function.Predicate; @@ -69,6 +70,7 @@ import org.apache.parquet.bytes.TrackingByteBufferAllocator; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.column.statistics.BinaryStatistics; import org.apache.parquet.crypto.ColumnEncryptionProperties; import org.apache.parquet.crypto.DecryptionKeyRetrieverMock; import org.apache.parquet.crypto.FileDecryptionProperties; @@ -86,8 +88,16 @@ import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; +import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter; +import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; +import org.apache.parquet.internal.filter2.columnindex.RowRanges; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; import org.junit.After; import org.junit.AfterClass; @@ -650,4 +660,75 @@ public void testFilteringWithProjection() throws IOException { false, true)); } + + @Test + public void testValidMetadata() throws Exception { + String testColumnName = "test_column"; + long testRowCount = 100L; + Binary testMinValue = Binary.fromString("a"); + Binary testMaxValue = Binary.fromString("z"); + Binary filterValue = Binary.fromString(""); + + OffsetIndex offsetIndex = createValidOffsetIndex(); + ColumnIndex columnIndex = createValidColumnIndex(testColumnName, testMinValue, testMaxValue); + + MockColumnIndexStore store = new MockColumnIndexStore(columnIndex, offsetIndex); + RowRanges result = ColumnIndexFilter.calculateRowRanges( + FilterCompat.get(gtEq(binaryColumn(testColumnName), filterValue)), + store, + Collections.singleton(ColumnPath.fromDotString(testColumnName)), + testRowCount); + + assertEquals("Should return all rows for this filter", testRowCount, result.rowCount()); + assertEquals("Should have single range", 1, result.getRanges().size()); + assertEquals("Range should start at 0", 0L, result.getRanges().get(0).from); + assertEquals( + "Range should end at last row", + testRowCount - 1, + result.getRanges().get(0).to); + } + + private OffsetIndex createValidOffsetIndex() { + OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder(); + builder.add(1000L, 100, 0L, Optional.empty()); + builder.add(1100L, 100, 50L, Optional.empty()); + return builder.build(); + } + + private ColumnIndex createValidColumnIndex(String columnName, Binary minValue, Binary maxValue) { + MessageType schema = Types.buildMessage() + .required(PrimitiveType.PrimitiveTypeName.BINARY) + .named(columnName) + .named("test_schema"); + PrimitiveType primitiveType = schema.getColumns().get(0).getPrimitiveType(); + ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(primitiveType, Integer.MAX_VALUE); + + for (int i = 0; i < 2; i++) { + BinaryStatistics stats = new BinaryStatistics(); + stats.updateStats(minValue); + stats.updateStats(maxValue); + builder.add(stats); + } + return builder.build(); + } + + private static class MockColumnIndexStore implements ColumnIndexStore { + private final ColumnIndex columnIndex; + private final OffsetIndex offsetIndex; + + public MockColumnIndexStore(ColumnIndex columnIndex, OffsetIndex offsetIndex) { + this.columnIndex = columnIndex; + this.offsetIndex = offsetIndex; + } + + @Override + public ColumnIndex getColumnIndex(ColumnPath column) { + return columnIndex; + } + + @Override + public OffsetIndex getOffsetIndex(ColumnPath column) { + return offsetIndex; + } + } } From 3f6336d89ad00674569f88ac840e1bdb3a836601 Mon Sep 17 00:00:00 2001 From: arnavb Date: Wed, 24 Sep 2025 13:50:46 +0000 Subject: [PATCH 2/3] Update --- .../internal/filter2/columnindex/ColumnIndexFilter.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java index 2232149085..fd26e54d7f 100644 --- a/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java +++ b/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java @@ -192,7 +192,7 @@ private RowRanges applyPredicate( return allRows(); } - if (!isValidMetadata(ci, oi, columnPath)) { + if (!isValidIndexSize(ci, oi, columnPath)) { return allRows(); } @@ -234,10 +234,7 @@ public RowRanges visit(Not not) { * @param columnPath the column path for error reporting * @return true if metadata is valid and safe to use, false if corrupt and should be ignored */ - private static boolean isValidMetadata(ColumnIndex columnIndex, OffsetIndex offsetIndex, ColumnPath columnPath) { - if (columnIndex == null || offsetIndex == null) { - return true; - } + private static boolean isValidIndexSize(ColumnIndex columnIndex, OffsetIndex offsetIndex, ColumnPath columnPath) { int columnIndexSize = columnIndex.getMinValues().size(); int offsetIndexSize = offsetIndex.getPageCount(); From 33c11291668313734c5ebe7e6a72b91cdbb78719 Mon Sep 17 00:00:00 2001 From: arnavb Date: Thu, 25 Sep 2025 11:32:54 +0000 Subject: [PATCH 3/3] update --- .../hadoop/TestColumnIndexFiltering.java | 81 ------------------- 1 file changed, 81 deletions(-) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java index a2558f4910..154dd6f5c5 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java @@ -58,7 +58,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.function.Predicate; @@ -70,7 +69,6 @@ import org.apache.parquet.bytes.TrackingByteBufferAllocator; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; -import org.apache.parquet.column.statistics.BinaryStatistics; import org.apache.parquet.crypto.ColumnEncryptionProperties; import org.apache.parquet.crypto.DecryptionKeyRetrieverMock; import org.apache.parquet.crypto.FileDecryptionProperties; @@ -88,16 +86,8 @@ import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.metadata.ColumnPath; -import org.apache.parquet.internal.column.columnindex.ColumnIndex; -import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; -import org.apache.parquet.internal.column.columnindex.OffsetIndex; -import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; -import org.apache.parquet.internal.filter2.columnindex.ColumnIndexFilter; -import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore; -import org.apache.parquet.internal.filter2.columnindex.RowRanges; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; import org.junit.After; import org.junit.AfterClass; @@ -660,75 +650,4 @@ public void testFilteringWithProjection() throws IOException { false, true)); } - - @Test - public void testValidMetadata() throws Exception { - String testColumnName = "test_column"; - long testRowCount = 100L; - Binary testMinValue = Binary.fromString("a"); - Binary testMaxValue = Binary.fromString("z"); - Binary filterValue = Binary.fromString(""); - - OffsetIndex offsetIndex = createValidOffsetIndex(); - ColumnIndex columnIndex = createValidColumnIndex(testColumnName, testMinValue, testMaxValue); - - MockColumnIndexStore store = new MockColumnIndexStore(columnIndex, offsetIndex); - RowRanges result = ColumnIndexFilter.calculateRowRanges( - FilterCompat.get(gtEq(binaryColumn(testColumnName), filterValue)), - store, - Collections.singleton(ColumnPath.fromDotString(testColumnName)), - testRowCount); - - assertEquals("Should return all rows for this filter", testRowCount, result.rowCount()); - assertEquals("Should have single range", 1, result.getRanges().size()); - assertEquals("Range should start at 0", 0L, result.getRanges().get(0).from); - assertEquals( - "Range should end at last row", - testRowCount - 1, - result.getRanges().get(0).to); - } - - private OffsetIndex createValidOffsetIndex() { - OffsetIndexBuilder builder = OffsetIndexBuilder.getBuilder(); - builder.add(1000L, 100, 0L, Optional.empty()); - builder.add(1100L, 100, 50L, Optional.empty()); - return builder.build(); - } - - private ColumnIndex createValidColumnIndex(String columnName, Binary minValue, Binary maxValue) { - MessageType schema = Types.buildMessage() - .required(PrimitiveType.PrimitiveTypeName.BINARY) - .named(columnName) - .named("test_schema"); - PrimitiveType primitiveType = schema.getColumns().get(0).getPrimitiveType(); - ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(primitiveType, Integer.MAX_VALUE); - - for (int i = 0; i < 2; i++) { - BinaryStatistics stats = new BinaryStatistics(); - stats.updateStats(minValue); - stats.updateStats(maxValue); - builder.add(stats); - } - return builder.build(); - } - - private static class MockColumnIndexStore implements ColumnIndexStore { - private final ColumnIndex columnIndex; - private final OffsetIndex offsetIndex; - - public MockColumnIndexStore(ColumnIndex columnIndex, OffsetIndex offsetIndex) { - this.columnIndex = columnIndex; - this.offsetIndex = offsetIndex; - } - - @Override - public ColumnIndex getColumnIndex(ColumnPath column) { - return columnIndex; - } - - @Override - public OffsetIndex getOffsetIndex(ColumnPath column) { - return offsetIndex; - } - } }