diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml
index 1eab1c162e..033896cf50 100644
--- a/parquet-cli/pom.xml
+++ b/parquet-cli/pom.xml
@@ -85,6 +85,28 @@
parquet-avro
${project.version}
+
+
+
+ org.apache.parquet
+ parquet-protobuf
+ ${project.version}
+ tests
+ test
+
+
+ com.google.protobuf
+ protobuf-java
+ 3.25.6
+ test
+
+
+ org.apache.parquet
+ parquet-protobuf
+ ${project.version}
+ test
+
+
org.apache.parquet
parquet-format-structures
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
index b30c9432d6..4ba843c6bf 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
@@ -35,6 +35,7 @@
import java.security.AccessController;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
@@ -55,13 +56,56 @@
import org.apache.parquet.cli.util.GetClassLoader;
import org.apache.parquet.cli.util.Schemas;
import org.apache.parquet.cli.util.SeekableFSDataInputStream;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.slf4j.Logger;
public abstract class BaseCommand implements Command, Configurable {
private static final String RESOURCE_URI_SCHEME = "resource";
private static final String STDIN_AS_SOURCE = "stdin";
+ public static final String PARQUET_CLI_ENABLE_GROUP_READER = "parquet.enable.simple-reader";
+
+ /**
+ * Note for dev: Due to legancy reasons, parquet-cli used the avro schema reader which
+ * breaks for files generated through proto. This logic is in place to auto-detect such cases
+ * and route the request to simple reader instead of avro.
+ */
+ private boolean isProtobufStyleSchema(String source) throws IOException {
+ try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) {
+ Map metadata = reader.getFooter().getFileMetaData().getKeyValueMetaData();
+ return metadata != null && metadata.containsKey("parquet.proto.class");
+ }
+ }
+
+ // Util to convert ParquetReader to Iterable
+ private static Iterable asIterable(final ParquetReader reader) {
+ return () -> new Iterator() {
+ private T next = advance();
+
+ private T advance() {
+ try {
+ return reader.read();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ @Override
+ public T next() {
+ T current = next;
+ next = advance();
+ return current;
+ }
+ };
+ }
protected final Logger console;
@@ -320,6 +364,15 @@ protected Iterable openDataFile(final String source, Schema projection) t
Formats.Format format = Formats.detectFormat(open(source));
switch (format) {
case PARQUET:
+ boolean isProtobufStyle = isProtobufStyleSchema(source);
+ boolean useGroupReader = getConf().getBoolean(PARQUET_CLI_ENABLE_GROUP_READER, false);
+ if (isProtobufStyle || useGroupReader) {
+ final ParquetReader grp = ParquetReader.builder(
+ new GroupReadSupport(), qualifiedPath(source))
+ .withConf(getConf())
+ .build();
+ return (Iterable) asIterable(grp);
+ }
Configuration conf = new Configuration(getConf());
// TODO: add these to the reader builder
AvroReadSupport.setRequestedProjection(conf, projection);
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java b/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java
index ef877d149c..bf5baeff9f 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java
@@ -82,7 +82,8 @@ public static Schema fromParquet(Configuration conf, URI location) throws IOExce
if (schemaString != null) {
return new Schema.Parser().parse(schemaString);
} else {
- return new AvroSchemaConverter().convert(footer.getFileMetaData().getSchema());
+ return new AvroSchemaConverter(conf)
+ .convert(footer.getFileMetaData().getSchema());
}
}
diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
index b5d092901d..4a781886a3 100644
--- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
+++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java
@@ -18,10 +18,14 @@
*/
package org.apache.parquet.cli.commands;
+import com.google.protobuf.Message;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.proto.ProtoParquetWriter;
+import org.apache.parquet.proto.test.TestProtobuf;
import org.junit.Assert;
import org.junit.Test;
@@ -63,4 +67,44 @@ public void testCatCommandWithInvalidColumn() throws IOException {
command.setConf(new Configuration());
command.run();
}
+
+ @Test
+ public void testCatCommandProtoParquetAutoDetected() throws Exception {
+ File protoFile = new File(getTempFolder(), "proto_someevent.parquet");
+ writeProtoParquet(protoFile);
+
+ CatCommand cmd = new CatCommand(createLogger(), 0);
+ cmd.sourceFiles = Arrays.asList(protoFile.getAbsolutePath());
+ cmd.setConf(new Configuration());
+
+ int result = cmd.run();
+ Assert.assertEquals(0, result);
+ }
+
+ @Test
+ public void testCatCommandWithSimpleReaderConfig() throws Exception {
+ File regularFile = parquetFile();
+
+ Configuration conf = new Configuration();
+ conf.setBoolean("parquet.enable.simple-reader", true);
+
+ CatCommand cmd = new CatCommand(createLogger(), 5);
+ cmd.sourceFiles = Arrays.asList(regularFile.getAbsolutePath());
+ cmd.setConf(conf);
+
+ int result = cmd.run();
+ Assert.assertEquals(0, result);
+ }
+
+ private static void writeProtoParquet(File file) throws Exception {
+ TestProtobuf.RepeatedIntMessage.Builder b = TestProtobuf.RepeatedIntMessage.newBuilder()
+ .addRepeatedInt(1)
+ .addRepeatedInt(2)
+ .addRepeatedInt(3);
+
+ try (ProtoParquetWriter w =
+ new ProtoParquetWriter<>(new Path(file.getAbsolutePath()), TestProtobuf.RepeatedIntMessage.class)) {
+ w.write(b.build());
+ }
+ }
}