From bc298fae6c8aee9197f3bb110759c34667bb95ac Mon Sep 17 00:00:00 2001 From: arnavb Date: Thu, 28 Aug 2025 15:20:06 +0000 Subject: [PATCH 1/7] update --- parquet-cli/pom.xml | 13 +++++ .../org/apache/parquet/cli/BaseCommand.java | 51 +++++++++++++++++++ .../parquet/cli/commands/CatCommandTest.java | 42 +++++++++++++++ 3 files changed, 106 insertions(+) diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml index 1eab1c162e..ec43c3c200 100644 --- a/parquet-cli/pom.xml +++ b/parquet-cli/pom.xml @@ -85,6 +85,19 @@ parquet-avro ${project.version} + + org.apache.parquet + parquet-protobuf + ${project.version} + test + + + org.apache.parquet + parquet-protobuf + ${project.version} + tests + test + org.apache.parquet parquet-format-structures diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java index b30c9432d6..94e280447c 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java @@ -35,6 +35,7 @@ import java.security.AccessController; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import org.apache.avro.Schema; import org.apache.avro.file.DataFileReader; @@ -56,6 +57,10 @@ import org.apache.parquet.cli.util.Schemas; import org.apache.parquet.cli.util.SeekableFSDataInputStream; import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.schema.MessageType; import org.slf4j.Logger; public abstract class BaseCommand implements Command, Configurable { @@ -63,6 +68,45 @@ public abstract class BaseCommand implements Command, Configurable { private static final String RESOURCE_URI_SCHEME = "resource"; private static final String STDIN_AS_SOURCE = "stdin"; + /** + * Note for dev: Due to legancy reasons, parquet-cli used the avro schema reader which + * breaks for files generated through proto. This logic is in place to auto-detect such cases + * and route the request to simple reader instead of avro. + */ + private boolean isProtobufStyleSchema(String source) throws IOException { + try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) { + Map metadata = reader.getFooter().getFileMetaData().getKeyValueMetaData(); + return metadata != null && metadata.containsKey("parquet.proto.class"); + } + } + + // Util to convert ParquetReader to Iterable + private static Iterable asIterable(final ParquetReader reader) { + return () -> new Iterator() { + private T next = advance(); + + private T advance() { + try { + return reader.read(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public T next() { + T current = next; + next = advance(); + return current; + } + }; + } + protected final Logger console; private Configuration conf = null; @@ -320,6 +364,13 @@ protected Iterable openDataFile(final String source, Schema projection) t Formats.Format format = Formats.detectFormat(open(source)); switch (format) { case PARQUET: + boolean isProtobufStyle = isProtobufStyleSchema(source); + if (isProtobufStyle) { + final ParquetReader grp = ParquetReader.builder(new GroupReadSupport(), qualifiedPath(source)) + .withConf(getConf()) + .build(); + return (Iterable) asIterable(grp); + } Configuration conf = new Configuration(getConf()); // TODO: add these to the reader builder AvroReadSupport.setRequestedProjection(conf, projection); diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java index b5d092901d..12a7026218 100644 --- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java +++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java @@ -22,6 +22,12 @@ import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.proto.test.TestProtobuf; +import org.apache.parquet.proto.ProtoParquetWriter; +import org.apache.parquet.proto.ProtoWriteSupport; +import com.google.protobuf.Message; +import org.apache.parquet.cli.BaseCommand; import org.junit.Assert; import org.junit.Test; @@ -63,4 +69,40 @@ public void testCatCommandWithInvalidColumn() throws IOException { command.setConf(new Configuration()); command.run(); } + + @Test + public void testCatCommandProtoParquetAutoDetected() throws Exception { + File protoFile = new File(getTempFolder(), "proto_someevent.parquet"); + writeProtoParquet(protoFile); + + CatCommand cmd = new CatCommand(createLogger(), 0); + cmd.sourceFiles = Arrays.asList(protoFile.getAbsolutePath()); + cmd.setConf(new Configuration()); + + int result = cmd.run(); + Assert.assertEquals(0, result); + } + + @Test + public void testCatCommandProtoParquetSucceedsWithAutoDetection() throws Exception { + File protoFile = new File(getTempFolder(), "proto_someevent.parquet"); + writeProtoParquet(protoFile); + + CatCommand cmd = new CatCommand(createLogger(), 0); + cmd.sourceFiles = Arrays.asList(protoFile.getAbsolutePath()); + cmd.setConf(new Configuration()); + + int result = cmd.run(); + Assert.assertEquals(0, result); + } + + private static void writeProtoParquet(File file) throws Exception { + TestProtobuf.RepeatedIntMessage.Builder b = TestProtobuf.RepeatedIntMessage.newBuilder() + .addRepeatedInt(1).addRepeatedInt(2).addRepeatedInt(3); + + try (ProtoParquetWriter w = + new ProtoParquetWriter<>(new Path(file.getAbsolutePath()), TestProtobuf.RepeatedIntMessage.class)) { + w.write(b.build()); + } + } } From f01b69d86842fdc47cdbe335bd0f30064de0dc11 Mon Sep 17 00:00:00 2001 From: arnavb Date: Thu, 28 Aug 2025 15:36:04 +0000 Subject: [PATCH 2/7] lint --- .../java/org/apache/parquet/cli/BaseCommand.java | 8 ++++---- .../parquet/cli/commands/CatCommandTest.java | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java index 94e280447c..cd1c4e50f4 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java @@ -56,11 +56,10 @@ import org.apache.parquet.cli.util.GetClassLoader; import org.apache.parquet.cli.util.Schemas; import org.apache.parquet.cli.util.SeekableFSDataInputStream; -import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.example.data.Group; -import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.schema.MessageType; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; import org.slf4j.Logger; public abstract class BaseCommand implements Command, Configurable { @@ -366,7 +365,8 @@ protected Iterable openDataFile(final String source, Schema projection) t case PARQUET: boolean isProtobufStyle = isProtobufStyleSchema(source); if (isProtobufStyle) { - final ParquetReader grp = ParquetReader.builder(new GroupReadSupport(), qualifiedPath(source)) + final ParquetReader grp = ParquetReader.builder( + new GroupReadSupport(), qualifiedPath(source)) .withConf(getConf()) .build(); return (Iterable) asIterable(grp); diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java index 12a7026218..8ca0ba1937 100644 --- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java +++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java @@ -18,16 +18,14 @@ */ package org.apache.parquet.cli.commands; +import com.google.protobuf.Message; import java.io.File; import java.io.IOException; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.parquet.proto.test.TestProtobuf; import org.apache.parquet.proto.ProtoParquetWriter; -import org.apache.parquet.proto.ProtoWriteSupport; -import com.google.protobuf.Message; -import org.apache.parquet.cli.BaseCommand; +import org.apache.parquet.proto.test.TestProtobuf; import org.junit.Assert; import org.junit.Test; @@ -97,11 +95,13 @@ public void testCatCommandProtoParquetSucceedsWithAutoDetection() throws Excepti } private static void writeProtoParquet(File file) throws Exception { - TestProtobuf.RepeatedIntMessage.Builder b = TestProtobuf.RepeatedIntMessage.newBuilder() - .addRepeatedInt(1).addRepeatedInt(2).addRepeatedInt(3); + TestProtobuf.RepeatedIntMessage.Builder b = TestProtobuf.RepeatedIntMessage.newBuilder() + .addRepeatedInt(1) + .addRepeatedInt(2) + .addRepeatedInt(3); - try (ProtoParquetWriter w = - new ProtoParquetWriter<>(new Path(file.getAbsolutePath()), TestProtobuf.RepeatedIntMessage.class)) { + try (ProtoParquetWriter w = + new ProtoParquetWriter<>(new Path(file.getAbsolutePath()), TestProtobuf.RepeatedIntMessage.class)) { w.write(b.build()); } } From 3c2040dc4ce13a7d06676399f4693c147cfb1a82 Mon Sep 17 00:00:00 2001 From: arnavb Date: Thu, 28 Aug 2025 15:45:36 +0000 Subject: [PATCH 3/7] deps --- parquet-cli/pom.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml index ec43c3c200..e576c3ec16 100644 --- a/parquet-cli/pom.xml +++ b/parquet-cli/pom.xml @@ -98,6 +98,13 @@ tests test + + + com.google.protobuf + protobuf-java + 3.25.6 + test + org.apache.parquet parquet-format-structures From 26444c8475155ad2f9ab5b8389c234c588fc576a Mon Sep 17 00:00:00 2001 From: arnavb Date: Thu, 28 Aug 2025 18:58:10 +0000 Subject: [PATCH 4/7] check build --- parquet-cli/pom.xml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml index e576c3ec16..75183afcc5 100644 --- a/parquet-cli/pom.xml +++ b/parquet-cli/pom.xml @@ -85,12 +85,6 @@ parquet-avro ${project.version} - - org.apache.parquet - parquet-protobuf - ${project.version} - test - org.apache.parquet parquet-protobuf @@ -105,6 +99,12 @@ 3.25.6 test + + org.apache.parquet + parquet-protobuf + ${project.version} + test + org.apache.parquet parquet-format-structures From 817731c82bcfc6e64877d9170b956d1507c17f47 Mon Sep 17 00:00:00 2001 From: arnavb Date: Thu, 4 Sep 2025 16:56:11 +0000 Subject: [PATCH 5/7] comments --- parquet-cli/pom.xml | 4 ++-- .../src/main/java/org/apache/parquet/cli/BaseCommand.java | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml index 75183afcc5..8fab8261a8 100644 --- a/parquet-cli/pom.xml +++ b/parquet-cli/pom.xml @@ -97,13 +97,13 @@ com.google.protobuf protobuf-java 3.25.6 - test + ${deps.scope} org.apache.parquet parquet-protobuf ${project.version} - test + ${deps.scope} org.apache.parquet diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java index cd1c4e50f4..0cfdbd302b 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java @@ -66,6 +66,7 @@ public abstract class BaseCommand implements Command, Configurable { private static final String RESOURCE_URI_SCHEME = "resource"; private static final String STDIN_AS_SOURCE = "stdin"; + public static final String PARQUET_CLI_ENABLE_GROUP_READER = "parquet.enable.fallback-simple-reader"; /** * Note for dev: Due to legancy reasons, parquet-cli used the avro schema reader which @@ -364,7 +365,8 @@ protected Iterable openDataFile(final String source, Schema projection) t switch (format) { case PARQUET: boolean isProtobufStyle = isProtobufStyleSchema(source); - if (isProtobufStyle) { + boolean useGroupReader = getConf().getBoolean(PARQUET_CLI_ENABLE_GROUP_READER, false); + if (isProtobufStyle || useGroupReader) { final ParquetReader grp = ParquetReader.builder( new GroupReadSupport(), qualifiedPath(source)) .withConf(getConf()) From 8daa6f53649c4a599184a7198214d8f748dbe521 Mon Sep 17 00:00:00 2001 From: arnavb Date: Thu, 4 Sep 2025 16:59:37 +0000 Subject: [PATCH 6/7] update --- .../src/main/java/org/apache/parquet/cli/BaseCommand.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java index 0cfdbd302b..4ba843c6bf 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java @@ -66,7 +66,7 @@ public abstract class BaseCommand implements Command, Configurable { private static final String RESOURCE_URI_SCHEME = "resource"; private static final String STDIN_AS_SOURCE = "stdin"; - public static final String PARQUET_CLI_ENABLE_GROUP_READER = "parquet.enable.fallback-simple-reader"; + public static final String PARQUET_CLI_ENABLE_GROUP_READER = "parquet.enable.simple-reader"; /** * Note for dev: Due to legancy reasons, parquet-cli used the avro schema reader which From eb0b5301855974ce8f3a44e0162145b8517a9967 Mon Sep 17 00:00:00 2001 From: arnavb Date: Fri, 5 Sep 2025 10:35:02 +0000 Subject: [PATCH 7/7] comments --- parquet-cli/pom.xml | 8 +++++--- .../java/org/apache/parquet/cli/util/Schemas.java | 3 ++- .../parquet/cli/commands/CatCommandTest.java | 14 ++++++++------ 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml index 8fab8261a8..033896cf50 100644 --- a/parquet-cli/pom.xml +++ b/parquet-cli/pom.xml @@ -85,6 +85,8 @@ parquet-avro ${project.version} + + org.apache.parquet parquet-protobuf @@ -92,19 +94,19 @@ tests test - com.google.protobuf protobuf-java 3.25.6 - ${deps.scope} + test org.apache.parquet parquet-protobuf ${project.version} - ${deps.scope} + test + org.apache.parquet parquet-format-structures diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java b/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java index ef877d149c..bf5baeff9f 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/util/Schemas.java @@ -82,7 +82,8 @@ public static Schema fromParquet(Configuration conf, URI location) throws IOExce if (schemaString != null) { return new Schema.Parser().parse(schemaString); } else { - return new AvroSchemaConverter().convert(footer.getFileMetaData().getSchema()); + return new AvroSchemaConverter(conf) + .convert(footer.getFileMetaData().getSchema()); } } diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java index 8ca0ba1937..4a781886a3 100644 --- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java +++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java @@ -82,13 +82,15 @@ public void testCatCommandProtoParquetAutoDetected() throws Exception { } @Test - public void testCatCommandProtoParquetSucceedsWithAutoDetection() throws Exception { - File protoFile = new File(getTempFolder(), "proto_someevent.parquet"); - writeProtoParquet(protoFile); + public void testCatCommandWithSimpleReaderConfig() throws Exception { + File regularFile = parquetFile(); - CatCommand cmd = new CatCommand(createLogger(), 0); - cmd.sourceFiles = Arrays.asList(protoFile.getAbsolutePath()); - cmd.setConf(new Configuration()); + Configuration conf = new Configuration(); + conf.setBoolean("parquet.enable.simple-reader", true); + + CatCommand cmd = new CatCommand(createLogger(), 5); + cmd.sourceFiles = Arrays.asList(regularFile.getAbsolutePath()); + cmd.setConf(conf); int result = cmd.run(); Assert.assertEquals(0, result);