diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java index 285e8f50f9..bba732a8b0 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CatCommand.java @@ -27,12 +27,14 @@ import com.google.common.io.Closeables; import java.io.Closeable; import java.io.IOException; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.apache.avro.Schema; +import org.apache.avro.SchemaParseException; import org.apache.parquet.cli.BaseCommand; import org.apache.parquet.cli.util.Expressions; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; import org.slf4j.Logger; @Parameters(commandDescription = "Print the first N records from a file") @@ -60,41 +62,90 @@ public CatCommand(Logger console, long defaultNumRecords) { public int run() throws IOException { Preconditions.checkArgument(sourceFiles != null && !sourceFiles.isEmpty(), "Missing file name"); - // Ensure all source files have the columns specified first - Map schemas = new HashMap<>(); for (String source : sourceFiles) { - Schema schema = getAvroSchema(source); - schemas.put(source, Expressions.filterSchema(schema, columns)); + try { + runWithAvroSchema(source); + } catch (SchemaParseException e) { + console.debug( + "Avro schema conversion failed for {}, falling back to Group reader: {}", + source, + e.getMessage()); + runWithGroupReader(source); + } } - for (String source : sourceFiles) { - Schema projection = schemas.get(source); - Iterable reader = openDataFile(source, projection); - boolean threw = true; - long count = 0; - try { - for (Object record : reader) { - if (numRecords > 0 && count >= numRecords) { - break; - } - if (columns == null || columns.size() != 1) { - console.info(String.valueOf(record)); - } else { - console.info(String.valueOf(select(projection, record, columns.get(0)))); - } - count += 1; + return 0; + } + + private void runWithAvroSchema(String source) throws IOException { + Schema schema = getAvroSchema(source); + Schema projection = Expressions.filterSchema(schema, columns); + + Iterable reader = openDataFile(source, projection); + boolean threw = true; + long count = 0; + try { + for (Object record : reader) { + if (numRecords > 0 && count >= numRecords) { + break; } - threw = false; - } catch (RuntimeException e) { - throw new RuntimeException("Failed on record " + count + " in file " + source, e); - } finally { - if (reader instanceof Closeable) { - Closeables.close((Closeable) reader, threw); + if (columns == null || columns.size() != 1) { + console.info(String.valueOf(record)); + } else { + console.info(String.valueOf(select(projection, record, columns.get(0)))); } + count += 1; + } + threw = false; + } catch (RuntimeException e) { + throw new RuntimeException("Failed on record " + count + " in file " + source, e); + } finally { + if (reader instanceof Closeable) { + Closeables.close((Closeable) reader, threw); } } + } - return 0; + private void runWithGroupReader(String source) throws IOException { + ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), qualifiedPath(source)) + .withConf(getConf()) + .build(); + + boolean threw = true; + long count = 0; + try { + for (Group record = reader.read(); record != null; record = reader.read()) { + if (numRecords > 0 && count >= numRecords) { + break; + } + + if (columns == null) { + console.info(record.toString()); + } else { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < columns.size(); i++) { + String columnName = columns.get(i); + try { + Object value = + record.getValueToString(record.getType().getFieldIndex(columnName), 0); + if (i > 0) sb.append(", "); + sb.append(columnName).append(": ").append(value); + } catch (Exception e) { + console.warn("Column '{}' not found in file {}", columnName, source); + } + } + if (sb.length() > 0) { + console.info(sb.toString()); + } + } + count += 1; + } + threw = false; + } catch (RuntimeException e) { + throw new RuntimeException("Failed on record " + count + " in file " + source, e); + } finally { + Closeables.close(reader, threw); + } } @Override diff --git a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java index 4a781886a3..b8aa4ac13e 100644 --- a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java +++ b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/CatCommandTest.java @@ -24,8 +24,15 @@ import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.proto.ProtoParquetWriter; import org.apache.parquet.proto.test.TestProtobuf; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; +import org.apache.parquet.schema.Types; import org.junit.Assert; import org.junit.Test; @@ -96,6 +103,57 @@ public void testCatCommandWithSimpleReaderConfig() throws Exception { Assert.assertEquals(0, result); } + @Test + public void testCatCommandWithHyphenatedFieldNames() throws Exception { + File hyphenFile = new File(getTempFolder(), "hyphenated_fields.parquet"); + writeParquetWithHyphenatedFields(hyphenFile); + + CatCommand cmd = new CatCommand(createLogger(), 1); + cmd.sourceFiles = Arrays.asList(hyphenFile.getAbsolutePath()); + cmd.setConf(new Configuration()); + + int result = cmd.run(); + Assert.assertEquals(0, result); + } + + private static void writeParquetWithHyphenatedFields(File file) throws IOException { + MessageType schema = Types.buildMessage() + .required(PrimitiveTypeName.INT32) + .named("order_id") + .required(PrimitiveTypeName.BINARY) + .named("customer-name") + .required(PrimitiveTypeName.BINARY) + .named("product-category") + .required(PrimitiveTypeName.DOUBLE) + .named("sale-amount") + .required(PrimitiveTypeName.BINARY) + .named("region") + .named("SalesRecord"); + + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + + try (ParquetWriter writer = ExampleParquetWriter.builder(new Path(file.getAbsolutePath())) + .withType(schema) + .build()) { + + Group record1 = factory.newGroup() + .append("order_id", 1001) + .append("customer-name", "John Smith") + .append("product-category", "Electronics") + .append("sale-amount", 299.99) + .append("region", "North"); + writer.write(record1); + + Group record2 = factory.newGroup() + .append("order_id", 1002) + .append("customer-name", "Jane Doe") + .append("product-category", "Home-Garden") + .append("sale-amount", 149.50) + .append("region", "South"); + writer.write(record2); + } + } + private static void writeProtoParquet(File file) throws Exception { TestProtobuf.RepeatedIntMessage.Builder b = TestProtobuf.RepeatedIntMessage.newBuilder() .addRepeatedInt(1)