diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java index f8a5b00071..8ca531c6df 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Map; import org.apache.commons.text.TextStringBuilder; +import org.apache.hadoop.fs.Path; import org.apache.parquet.cli.BaseCommand; import org.apache.parquet.cli.rawpages.RawPagesReader; import org.apache.parquet.column.ColumnDescriptor; @@ -46,9 +47,14 @@ import org.apache.parquet.column.page.Page; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.column.page.PageReader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.Util; import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.slf4j.Logger; @@ -107,7 +113,7 @@ public int run() throws IOException { reader.getRowGroups().get(0).getColumns().get(0).getCodec(); // accumulate formatted lines to print by column Map> formatted = Maps.newLinkedHashMap(); - PageFormatter formatter = new PageFormatter(); + PageFormatter formatter = new PageFormatter(reader); PageReadStore pageStore; int rowGroupNum = 0; while ((pageStore = reader.readNextRowGroup()) != null) { @@ -118,7 +124,7 @@ public int run() throws IOException { formatted.put(columnName(descriptor), lines); } - formatter.setContext(rowGroupNum, columns.get(descriptor), codec); + formatter.setContext(rowGroupNum, columns.get(descriptor), codec, descriptor); PageReader pages = pageStore.getPageReader(descriptor); DictionaryPage dict = pages.readDictionaryPage(); @@ -155,10 +161,16 @@ public List getExamples() { } private class PageFormatter implements DataPage.Visitor { + private final ParquetFileReader reader; private int rowGroupNum; private int pageNum; private PrimitiveType type; private String shortCodec; + private ColumnDescriptor currentColumn; + + PageFormatter(ParquetFileReader reader) { + this.reader = reader; + } String getHeader() { return String.format( @@ -166,11 +178,12 @@ String getHeader() { "page", "type", "enc", "count", "avg size", "size", "rows", "nulls", "min / max"); } - void setContext(int rowGroupNum, PrimitiveType type, CompressionCodecName codec) { + void setContext(int rowGroupNum, PrimitiveType type, CompressionCodecName codec, ColumnDescriptor column) { this.rowGroupNum = rowGroupNum; this.pageNum = 0; this.type = type; this.shortCodec = shortCodec(codec); + this.currentColumn = column; } String format(Page page) { @@ -184,6 +197,37 @@ String format(Page page) { return formatted; } + private long getPageCompressedSize() { + long compressedSize = 0; + try (SeekableInputStream input = HadoopInputFile.fromPath(new Path(reader.getFile()), getConf()) + .newStream()) { + BlockMetaData rowGroup = reader.getRowGroups().get(rowGroupNum); + int columnIndex = + reader.getFileMetaData().getSchema().getColumns().indexOf(currentColumn); + ColumnChunkMetaData columnChunk = rowGroup.getColumns().get(columnIndex); + + long startOffset = columnChunk.hasDictionaryPage() + ? columnChunk.getDictionaryPageOffset() + : columnChunk.getFirstDataPageOffset(); + input.seek(startOffset); + long endPos = startOffset + columnChunk.getTotalSize(); + int currentPageIndex = 0; + + while (input.getPos() < endPos) { + PageHeader pageHeader = Util.readPageHeader(input); + if (currentPageIndex == pageNum) { + compressedSize = pageHeader.getCompressed_page_size(); + break; + } + input.skip(pageHeader.getCompressed_page_size()); + currentPageIndex++; + } + return compressedSize; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + private String printDictionaryPage(DictionaryPage dict) { // TODO: the compressed size of a dictionary page is lost in Parquet dict.getUncompressedSize(); @@ -212,7 +256,7 @@ private String printDictionaryPage(DictionaryPage dict) { @Override public String visit(DataPageV1 page) { String enc = encodingAsString(page.getValueEncoding(), false); - long totalSize = page.getCompressedSize(); + long totalSize = getPageCompressedSize(); int count = page.getValueCount(); String numNulls = page.getStatistics().isNumNullsSet() ? Long.toString(page.getStatistics().getNumNulls()) @@ -237,7 +281,7 @@ public String visit(DataPageV1 page) { @Override public String visit(DataPageV2 page) { String enc = encodingAsString(page.getDataEncoding(), false); - long totalSize = page.getCompressedSize(); + long totalSize = getPageCompressedSize(); int count = page.getValueCount(); int numRows = page.getRowCount(); int numNulls = page.getNullCount();