diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml index 1eab1c162e..033896cf50 100644 --- a/parquet-cli/pom.xml +++ b/parquet-cli/pom.xml @@ -85,6 +85,28 @@ parquet-avro ${project.version} + + + + org.apache.parquet + parquet-protobuf + ${project.version} + tests + test + + + com.google.protobuf + protobuf-java + 3.25.6 + test + + + org.apache.parquet + parquet-protobuf + ${project.version} + test + + org.apache.parquet parquet-format-structures diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java index b30c9432d6..4ba843c6bf 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java @@ -35,6 +35,7 @@ import java.security.AccessController; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; @@ -55,13 +56,56 @@ import org.apache.parquet.cli.util.GetClassLoader; import org.apache.parquet.cli.util.Schemas; import org.apache.parquet.cli.util.SeekableFSDataInputStream; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; import org.slf4j.Logger; public abstract class BaseCommand implements Command, Configurable { private static final String RESOURCE_URI_SCHEME = "resource"; private static final String STDIN_AS_SOURCE = "stdin"; + public static final String PARQUET_CLI_ENABLE_GROUP_READER = "parquet.enable.simple-reader"; + + /** + * Note for dev: Due to legancy reasons, parquet-cli used the avro schema reader which + * breaks for files generated through proto. This logic is in place to auto-detect such cases + * and route the request to simple reader instead of avro. + */ + private boolean isProtobufStyleSchema(String source) throws IOException { + try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) { + Map metadata = reader.getFooter().getFileMetaData().getKeyValueMetaData(); + return metadata != null && metadata.containsKey("parquet.proto.class"); + } + } + + // Util to convert ParquetReader to Iterable + private static Iterable asIterable(final ParquetReader reader) { + return () -> new Iterator() { + private T next = advance(); + + private T advance() { + try { + return reader.read(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public T next() { + T current = next; + next = advance(); + return current; + } + }; + } protected final Logger console; @@ -320,6 +364,15 @@ protected Iterable openDataFile(final String source, Schema projection) t Formats.Format format = Formats.detectFormat(open(source)); switch (format) { case PARQUET: + boolean isProtobufStyle = isProtobufStyleSchema(source); + boolean useGroupReader = getConf().getBoolean(PARQUET_CLI_ENABLE_GROUP_READER, false); + if (isProtobufStyle || useGroupReader) { + final ParquetReader grp = ParquetReader.builder( + new GroupReadSupport(), qualifiedPath(source)) + .withConf(getConf()) + .build(); + return (Iterable) asIterable(grp); + } Configuration conf = new Configuration(getConf()); // TODO: add these to the reader builder AvroReadSupport.setRequestedProjection(conf, projection); diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java b/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java index ef877d149c..bf5baeff9f 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java @@ -82,7 +82,8 @@ public static Schema fromParquet(Configuration conf, URI location) throws IOExce if (schemaString != null) { return new Schema.Parser().parse(schemaString); } else { - return new AvroSchemaConverter().convert(footer.getFileMetaData().getSchema()); + return new AvroSchemaConverter(conf) + .convert(footer.getFileMetaData().getSchema()); } } diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java index b5d092901d..4a781886a3 100644 --- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java +++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java @@ -18,10 +18,14 @@ */ package org.apache.parquet.cli.commands; +import com.google.protobuf.Message; import java.io.File; import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.proto.ProtoParquetWriter; +import org.apache.parquet.proto.test.TestProtobuf; import org.junit.Assert; import org.junit.Test; @@ -63,4 +67,44 @@ public void testCatCommandWithInvalidColumn() throws IOException { command.setConf(new Configuration()); command.run(); } + + @Test + public void testCatCommandProtoParquetAutoDetected() throws Exception { + File protoFile = new File(getTempFolder(), "proto_someevent.parquet"); + writeProtoParquet(protoFile); + + CatCommand cmd = new CatCommand(createLogger(), 0); + cmd.sourceFiles = Arrays.asList(protoFile.getAbsolutePath()); + cmd.setConf(new Configuration()); + + int result = cmd.run(); + Assert.assertEquals(0, result); + } + + @Test + public void testCatCommandWithSimpleReaderConfig() throws Exception { + File regularFile = parquetFile(); + + Configuration conf = new Configuration(); + conf.setBoolean("parquet.enable.simple-reader", true); + + CatCommand cmd = new CatCommand(createLogger(), 5); + cmd.sourceFiles = Arrays.asList(regularFile.getAbsolutePath()); + cmd.setConf(conf); + + int result = cmd.run(); + Assert.assertEquals(0, result); + } + + private static void writeProtoParquet(File file) throws Exception { + TestProtobuf.RepeatedIntMessage.Builder b = TestProtobuf.RepeatedIntMessage.newBuilder() + .addRepeatedInt(1) + .addRepeatedInt(2) + .addRepeatedInt(3); + + try (ProtoParquetWriter w = + new ProtoParquetWriter<>(new Path(file.getAbsolutePath()), TestProtobuf.RepeatedIntMessage.class)) { + w.write(b.build()); + } + } }