Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions parquet-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,28 @@
<artifactId>parquet-avro</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Protobuf dependencies for CLI Tests -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-protobuf</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.25.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-protobuf</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
Comment on lines +97 to +108
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By adding these dependencies in test scope only, wouldn't cause them missing at the command line execution?
There are two ways we can use the cli. One is containing the "normal" scoped dependencies for the Hadoop env, and the other is containing the "provided" scope as well for standalone. I don't think these deps will be added to either one.

Copy link
Member Author

@ArnavBalyan ArnavBalyan Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats a great catch thanks! moved to ${deps.scope} which can be enabled via the profile being used


<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-format-structures</artifactId>
Expand Down
53 changes: 53 additions & 0 deletions parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.security.AccessController;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
Expand All @@ -55,13 +56,56 @@
import org.apache.parquet.cli.util.GetClassLoader;
import org.apache.parquet.cli.util.Schemas;
import org.apache.parquet.cli.util.SeekableFSDataInputStream;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.slf4j.Logger;

public abstract class BaseCommand implements Command, Configurable {

private static final String RESOURCE_URI_SCHEME = "resource";
private static final String STDIN_AS_SOURCE = "stdin";
public static final String PARQUET_CLI_ENABLE_GROUP_READER = "parquet.enable.simple-reader";

/**
* Note for dev: Due to legancy reasons, parquet-cli used the avro schema reader which
* breaks for files generated through proto. This logic is in place to auto-detect such cases
* and route the request to simple reader instead of avro.
*/
private boolean isProtobufStyleSchema(String source) throws IOException {
try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) {
Map<String, String> metadata = reader.getFooter().getFileMetaData().getKeyValueMetaData();
return metadata != null && metadata.containsKey("parquet.proto.class");
}
}

// Util to convert ParquetReader to Iterable
private static <T> Iterable<T> asIterable(final ParquetReader<T> reader) {
return () -> new Iterator<T>() {
private T next = advance();

private T advance() {
try {
return reader.read();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public boolean hasNext() {
return next != null;
}

@Override
public T next() {
T current = next;
next = advance();
return current;
}
};
}

protected final Logger console;

Expand Down Expand Up @@ -320,6 +364,15 @@ protected <D> Iterable<D> openDataFile(final String source, Schema projection) t
Formats.Format format = Formats.detectFormat(open(source));
switch (format) {
case PARQUET:
boolean isProtobufStyle = isProtobufStyleSchema(source);
boolean useGroupReader = getConf().getBoolean(PARQUET_CLI_ENABLE_GROUP_READER, false);
if (isProtobufStyle || useGroupReader) {
final ParquetReader<Group> grp = ParquetReader.<Group>builder(
new GroupReadSupport(), qualifiedPath(source))
.withConf(getConf())
.build();
return (Iterable<D>) asIterable(grp);
}
Configuration conf = new Configuration(getConf());
// TODO: add these to the reader builder
AvroReadSupport.setRequestedProjection(conf, projection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public static Schema fromParquet(Configuration conf, URI location) throws IOExce
if (schemaString != null) {
return new Schema.Parser().parse(schemaString);
} else {
return new AvroSchemaConverter().convert(footer.getFileMetaData().getSchema());
return new AvroSchemaConverter(conf)
.convert(footer.getFileMetaData().getSchema());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
*/
package org.apache.parquet.cli.commands;

import com.google.protobuf.Message;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.proto.ProtoParquetWriter;
import org.apache.parquet.proto.test.TestProtobuf;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -63,4 +67,44 @@ public void testCatCommandWithInvalidColumn() throws IOException {
command.setConf(new Configuration());
command.run();
}

@Test
public void testCatCommandProtoParquetAutoDetected() throws Exception {
File protoFile = new File(getTempFolder(), "proto_someevent.parquet");
writeProtoParquet(protoFile);

CatCommand cmd = new CatCommand(createLogger(), 0);
cmd.sourceFiles = Arrays.asList(protoFile.getAbsolutePath());
cmd.setConf(new Configuration());

int result = cmd.run();
Assert.assertEquals(0, result);
}

@Test
public void testCatCommandWithSimpleReaderConfig() throws Exception {
File regularFile = parquetFile();

Configuration conf = new Configuration();
conf.setBoolean("parquet.enable.simple-reader", true);

CatCommand cmd = new CatCommand(createLogger(), 5);
cmd.sourceFiles = Arrays.asList(regularFile.getAbsolutePath());
cmd.setConf(conf);

int result = cmd.run();
Assert.assertEquals(0, result);
}

private static void writeProtoParquet(File file) throws Exception {
TestProtobuf.RepeatedIntMessage.Builder b = TestProtobuf.RepeatedIntMessage.newBuilder()
.addRepeatedInt(1)
.addRepeatedInt(2)
.addRepeatedInt(3);

try (ProtoParquetWriter<Message> w =
new ProtoParquetWriter<>(new Path(file.getAbsolutePath()), TestProtobuf.RepeatedIntMessage.class)) {
w.write(b.build());
}
}
}