Skip to content

Commit 40b20ee

Browse files
committed
update
1 parent 8c84dd0 commit 40b20ee

File tree

1 file changed

+49
-5
lines changed

1 file changed

+49
-5
lines changed

parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.List;
3737
import java.util.Map;
3838
import org.apache.commons.text.TextStringBuilder;
39+
import org.apache.hadoop.fs.Path;
3940
import org.apache.parquet.cli.BaseCommand;
4041
import org.apache.parquet.cli.rawpages.RawPagesReader;
4142
import org.apache.parquet.column.ColumnDescriptor;
@@ -46,9 +47,14 @@
4647
import org.apache.parquet.column.page.Page;
4748
import org.apache.parquet.column.page.PageReadStore;
4849
import org.apache.parquet.column.page.PageReader;
50+
import org.apache.parquet.format.PageHeader;
51+
import org.apache.parquet.format.Util;
4952
import org.apache.parquet.hadoop.ParquetFileReader;
53+
import org.apache.parquet.hadoop.metadata.BlockMetaData;
54+
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
5055
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
5156
import org.apache.parquet.hadoop.util.HadoopInputFile;
57+
import org.apache.parquet.io.SeekableInputStream;
5258
import org.apache.parquet.schema.MessageType;
5359
import org.apache.parquet.schema.PrimitiveType;
5460
import org.slf4j.Logger;
@@ -107,7 +113,7 @@ public int run() throws IOException {
107113
reader.getRowGroups().get(0).getColumns().get(0).getCodec();
108114
// accumulate formatted lines to print by column
109115
Map<String, List<String>> formatted = Maps.newLinkedHashMap();
110-
PageFormatter formatter = new PageFormatter();
116+
PageFormatter formatter = new PageFormatter(reader);
111117
PageReadStore pageStore;
112118
int rowGroupNum = 0;
113119
while ((pageStore = reader.readNextRowGroup()) != null) {
@@ -118,7 +124,7 @@ public int run() throws IOException {
118124
formatted.put(columnName(descriptor), lines);
119125
}
120126

121-
formatter.setContext(rowGroupNum, columns.get(descriptor), codec);
127+
formatter.setContext(rowGroupNum, columns.get(descriptor), codec, descriptor);
122128
PageReader pages = pageStore.getPageReader(descriptor);
123129

124130
DictionaryPage dict = pages.readDictionaryPage();
@@ -155,22 +161,29 @@ public List<String> getExamples() {
155161
}
156162

157163
private class PageFormatter implements DataPage.Visitor<String> {
164+
private final ParquetFileReader reader;
158165
private int rowGroupNum;
159166
private int pageNum;
160167
private PrimitiveType type;
161168
private String shortCodec;
169+
private ColumnDescriptor currentColumn;
170+
171+
PageFormatter(ParquetFileReader reader) {
172+
this.reader = reader;
173+
}
162174

163175
String getHeader() {
164176
return String.format(
165177
" %-6s %-5s %-4s %-7s %-10s %-10s %-8s %-7s %s",
166178
"page", "type", "enc", "count", "avg size", "size", "rows", "nulls", "min / max");
167179
}
168180

169-
void setContext(int rowGroupNum, PrimitiveType type, CompressionCodecName codec) {
181+
void setContext(int rowGroupNum, PrimitiveType type, CompressionCodecName codec, ColumnDescriptor column) {
170182
this.rowGroupNum = rowGroupNum;
171183
this.pageNum = 0;
172184
this.type = type;
173185
this.shortCodec = shortCodec(codec);
186+
this.currentColumn = column;
174187
}
175188

176189
String format(Page page) {
@@ -184,6 +197,37 @@ String format(Page page) {
184197
return formatted;
185198
}
186199

200+
private long getPageCompressedSize() {
201+
long compressedSize = 0;
202+
try (SeekableInputStream input = HadoopInputFile.fromPath(new Path(reader.getFile()), getConf())
203+
.newStream()) {
204+
BlockMetaData rowGroup = reader.getRowGroups().get(rowGroupNum);
205+
int columnIndex =
206+
reader.getFileMetaData().getSchema().getColumns().indexOf(currentColumn);
207+
ColumnChunkMetaData columnChunk = rowGroup.getColumns().get(columnIndex);
208+
209+
long startOffset = columnChunk.hasDictionaryPage()
210+
? columnChunk.getDictionaryPageOffset()
211+
: columnChunk.getFirstDataPageOffset();
212+
input.seek(startOffset);
213+
long endPos = startOffset + columnChunk.getTotalSize();
214+
int currentPageIndex = 0;
215+
216+
while (input.getPos() < endPos) {
217+
PageHeader pageHeader = Util.readPageHeader(input);
218+
if (currentPageIndex == pageNum) {
219+
compressedSize = pageHeader.getCompressed_page_size();
220+
break;
221+
}
222+
input.skip(pageHeader.getCompressed_page_size());
223+
currentPageIndex++;
224+
}
225+
return compressedSize;
226+
} catch (IOException e) {
227+
throw new RuntimeException(e);
228+
}
229+
}
230+
187231
private String printDictionaryPage(DictionaryPage dict) {
188232
// TODO: the compressed size of a dictionary page is lost in Parquet
189233
dict.getUncompressedSize();
@@ -212,7 +256,7 @@ private String printDictionaryPage(DictionaryPage dict) {
212256
@Override
213257
public String visit(DataPageV1 page) {
214258
String enc = encodingAsString(page.getValueEncoding(), false);
215-
long totalSize = page.getCompressedSize();
259+
long totalSize = getPageCompressedSize();
216260
int count = page.getValueCount();
217261
String numNulls = page.getStatistics().isNumNullsSet()
218262
? Long.toString(page.getStatistics().getNumNulls())
@@ -237,7 +281,7 @@ public String visit(DataPageV1 page) {
237281
@Override
238282
public String visit(DataPageV2 page) {
239283
String enc = encodingAsString(page.getDataEncoding(), false);
240-
long totalSize = page.getCompressedSize();
284+
long totalSize = getPageCompressedSize();
241285
int count = page.getValueCount();
242286
int numRows = page.getRowCount();
243287
int numNulls = page.getNullCount();

0 commit comments

Comments
 (0)