diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java index 4fb57ee407..e783815747 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexFilterUtils.java @@ -18,6 +18,9 @@ */ package org.apache.parquet.hadoop; +import static org.apache.parquet.hadoop.ParquetFileReaderMetrics.PagesIncluded; +import static org.apache.parquet.hadoop.ParquetFileReaderMetrics.PagesSkipped; + import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntList; import java.util.ArrayList; @@ -129,14 +132,30 @@ public String toString() { /* * Returns the filtered offset index containing only the pages which are overlapping with rowRanges. */ - static OffsetIndex filterOffsetIndex(OffsetIndex offsetIndex, RowRanges rowRanges, long totalRowCount) { + static OffsetIndex filterOffsetIndex( + OffsetIndex offsetIndex, + RowRanges rowRanges, + long totalRowCount, + org.apache.parquet.ParquetReadOptions options) { IntList indexMap = new IntArrayList(); + int pagesIncluded = 0; + int pagesSkipped = 0; for (int i = 0, n = offsetIndex.getPageCount(); i < n; ++i) { long from = offsetIndex.getFirstRowIndex(i); if (rowRanges.isOverlapping(from, offsetIndex.getLastRowIndex(i, totalRowCount))) { indexMap.add(i); + pagesIncluded++; + } else { + pagesSkipped++; } } + + if (options != null && options.getMetricsCallback() != null) { + final ParquetMetricsCallback metricsCallback = options.getMetricsCallback(); + metricsCallback.setValueInt(PagesIncluded.name(), pagesIncluded); + metricsCallback.setValueInt(PagesSkipped.name(), pagesSkipped); + } + return new FilteredOffsetIndex(offsetIndex, indexMap.toIntArray()); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 2ef39f7804..551b1bf6c7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -1423,7 +1423,8 @@ private ColumnChunkPageReadStore internalReadFilteredRowGroup( if (columnDescriptor != null) { OffsetIndex offsetIndex = ciStore.getOffsetIndex(mc.getPath()); - OffsetIndex filteredOffsetIndex = filterOffsetIndex(offsetIndex, rowRanges, block.getRowCount()); + OffsetIndex filteredOffsetIndex = + filterOffsetIndex(offsetIndex, rowRanges, block.getRowCount(), options); for (OffsetRange range : calculateOffsetRanges(filteredOffsetIndex, mc, offsetIndex.getOffset(0))) { BenchmarkCounter.incrementTotalBytes(range.getLength()); long startingPos = range.getOffset(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReaderMetrics.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReaderMetrics.java index 737e6abb96..6c4b7fa3bb 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReaderMetrics.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReaderMetrics.java @@ -27,7 +27,9 @@ public enum ParquetFileReaderMetrics { ReadThroughput("read throughput when reading Parquet file from storage (MB/sec)"), DecompressTime("time spent in block decompression"), DecompressSize("decompressed data size (MB)"), - DecompressThroughput("block decompression throughput (MB/sec)"); + DecompressThroughput("block decompression throughput (MB/sec)"), + PagesIncluded("pages included due to column index filtering"), + PagesSkipped("pages skipped due to column index filtering"); private final String desc;