Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.List;
import java.util.Map;
import org.apache.commons.text.TextStringBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.cli.BaseCommand;
import org.apache.parquet.cli.rawpages.RawPagesReader;
import org.apache.parquet.column.ColumnDescriptor;
Expand All @@ -46,9 +47,14 @@
import org.apache.parquet.column.page.Page;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.Util;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.slf4j.Logger;
Expand Down Expand Up @@ -107,7 +113,7 @@ public int run() throws IOException {
reader.getRowGroups().get(0).getColumns().get(0).getCodec();
// accumulate formatted lines to print by column
Map<String, List<String>> formatted = Maps.newLinkedHashMap();
PageFormatter formatter = new PageFormatter();
PageFormatter formatter = new PageFormatter(reader);
PageReadStore pageStore;
int rowGroupNum = 0;
while ((pageStore = reader.readNextRowGroup()) != null) {
Expand All @@ -118,7 +124,7 @@ public int run() throws IOException {
formatted.put(columnName(descriptor), lines);
}

formatter.setContext(rowGroupNum, columns.get(descriptor), codec);
formatter.setContext(rowGroupNum, columns.get(descriptor), codec, descriptor);
PageReader pages = pageStore.getPageReader(descriptor);

DictionaryPage dict = pages.readDictionaryPage();
Expand Down Expand Up @@ -155,22 +161,29 @@ public List<String> getExamples() {
}

private class PageFormatter implements DataPage.Visitor<String> {
private final ParquetFileReader reader;
private int rowGroupNum;
private int pageNum;
private PrimitiveType type;
private String shortCodec;
private ColumnDescriptor currentColumn;

PageFormatter(ParquetFileReader reader) {
this.reader = reader;
}

String getHeader() {
return String.format(
" %-6s %-5s %-4s %-7s %-10s %-10s %-8s %-7s %s",
"page", "type", "enc", "count", "avg size", "size", "rows", "nulls", "min / max");
}

void setContext(int rowGroupNum, PrimitiveType type, CompressionCodecName codec) {
void setContext(int rowGroupNum, PrimitiveType type, CompressionCodecName codec, ColumnDescriptor column) {
this.rowGroupNum = rowGroupNum;
this.pageNum = 0;
this.type = type;
this.shortCodec = shortCodec(codec);
this.currentColumn = column;
}

String format(Page page) {
Expand All @@ -184,6 +197,37 @@ String format(Page page) {
return formatted;
}

private long getPageCompressedSize() {
long compressedSize = 0;
try (SeekableInputStream input = HadoopInputFile.fromPath(new Path(reader.getFile()), getConf())
.newStream()) {
BlockMetaData rowGroup = reader.getRowGroups().get(rowGroupNum);
int columnIndex =
reader.getFileMetaData().getSchema().getColumns().indexOf(currentColumn);
ColumnChunkMetaData columnChunk = rowGroup.getColumns().get(columnIndex);

long startOffset = columnChunk.hasDictionaryPage()
? columnChunk.getDictionaryPageOffset()
: columnChunk.getFirstDataPageOffset();
input.seek(startOffset);
long endPos = startOffset + columnChunk.getTotalSize();
int currentPageIndex = 0;

while (input.getPos() < endPos) {
PageHeader pageHeader = Util.readPageHeader(input);
if (currentPageIndex == pageNum) {
compressedSize = pageHeader.getCompressed_page_size();
break;
}
input.skip(pageHeader.getCompressed_page_size());
currentPageIndex++;
}
return compressedSize;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private String printDictionaryPage(DictionaryPage dict) {
// TODO: the compressed size of a dictionary page is lost in Parquet
dict.getUncompressedSize();
Expand Down Expand Up @@ -212,7 +256,7 @@ private String printDictionaryPage(DictionaryPage dict) {
@Override
public String visit(DataPageV1 page) {
String enc = encodingAsString(page.getValueEncoding(), false);
long totalSize = page.getCompressedSize();
long totalSize = getPageCompressedSize();
int count = page.getValueCount();
String numNulls = page.getStatistics().isNumNullsSet()
? Long.toString(page.getStatistics().getNumNulls())
Expand All @@ -237,7 +281,7 @@ public String visit(DataPageV1 page) {
@Override
public String visit(DataPageV2 page) {
String enc = encodingAsString(page.getDataEncoding(), false);
long totalSize = page.getCompressedSize();
long totalSize = getPageCompressedSize();
int count = page.getValueCount();
int numRows = page.getRowCount();
int numNulls = page.getNullCount();
Expand Down