Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import com.google.common.io.Closeables;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.parquet.cli.BaseCommand;
import org.apache.parquet.cli.util.Expressions;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.slf4j.Logger;

@Parameters(commandDescription = "Print the first N records from a file")
Expand Down Expand Up @@ -60,41 +62,90 @@ public CatCommand(Logger console, long defaultNumRecords) {
public int run() throws IOException {
Preconditions.checkArgument(sourceFiles != null && !sourceFiles.isEmpty(), "Missing file name");

// Ensure all source files have the columns specified first
Map<String, Schema> schemas = new HashMap<>();
for (String source : sourceFiles) {
Schema schema = getAvroSchema(source);
schemas.put(source, Expressions.filterSchema(schema, columns));
try {
runWithAvroSchema(source);
} catch (SchemaParseException e) {
console.debug(
"Avro schema conversion failed for {}, falling back to Group reader: {}",
source,
e.getMessage());
runWithGroupReader(source);
}
}

for (String source : sourceFiles) {
Schema projection = schemas.get(source);
Iterable<Object> reader = openDataFile(source, projection);
boolean threw = true;
long count = 0;
try {
for (Object record : reader) {
if (numRecords > 0 && count >= numRecords) {
break;
}
if (columns == null || columns.size() != 1) {
console.info(String.valueOf(record));
} else {
console.info(String.valueOf(select(projection, record, columns.get(0))));
}
count += 1;
return 0;
}

private void runWithAvroSchema(String source) throws IOException {
Schema schema = getAvroSchema(source);
Schema projection = Expressions.filterSchema(schema, columns);

Iterable<Object> reader = openDataFile(source, projection);
boolean threw = true;
long count = 0;
try {
for (Object record : reader) {
if (numRecords > 0 && count >= numRecords) {
break;
}
threw = false;
} catch (RuntimeException e) {
throw new RuntimeException("Failed on record " + count + " in file " + source, e);
} finally {
if (reader instanceof Closeable) {
Closeables.close((Closeable) reader, threw);
if (columns == null || columns.size() != 1) {
console.info(String.valueOf(record));
} else {
console.info(String.valueOf(select(projection, record, columns.get(0))));
}
count += 1;
}
threw = false;
} catch (RuntimeException e) {
throw new RuntimeException("Failed on record " + count + " in file " + source, e);
} finally {
if (reader instanceof Closeable) {
Closeables.close((Closeable) reader, threw);
}
}
}

return 0;
private void runWithGroupReader(String source) throws IOException {
ParquetReader<Group> reader = ParquetReader.<Group>builder(new GroupReadSupport(), qualifiedPath(source))
.withConf(getConf())
.build();

boolean threw = true;
long count = 0;
try {
for (Group record = reader.read(); record != null; record = reader.read()) {
if (numRecords > 0 && count >= numRecords) {
break;
}

if (columns == null) {
console.info(record.toString());
} else {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < columns.size(); i++) {
String columnName = columns.get(i);
try {
Object value =
record.getValueToString(record.getType().getFieldIndex(columnName), 0);
if (i > 0) sb.append(", ");
sb.append(columnName).append(": ").append(value);
} catch (Exception e) {
console.warn("Column '{}' not found in file {}", columnName, source);
}
}
if (sb.length() > 0) {
console.info(sb.toString());
}
}
count += 1;
}
threw = false;
} catch (RuntimeException e) {
throw new RuntimeException("Failed on record " + count + " in file " + source, e);
} finally {
Closeables.close(reader, threw);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,15 @@
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.proto.ProtoParquetWriter;
import org.apache.parquet.proto.test.TestProtobuf;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Types;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -96,6 +103,57 @@ public void testCatCommandWithSimpleReaderConfig() throws Exception {
Assert.assertEquals(0, result);
}

@Test
public void testCatCommandWithHyphenatedFieldNames() throws Exception {
File hyphenFile = new File(getTempFolder(), "hyphenated_fields.parquet");
writeParquetWithHyphenatedFields(hyphenFile);

CatCommand cmd = new CatCommand(createLogger(), 1);
cmd.sourceFiles = Arrays.asList(hyphenFile.getAbsolutePath());
cmd.setConf(new Configuration());

int result = cmd.run();
Assert.assertEquals(0, result);
}

private static void writeParquetWithHyphenatedFields(File file) throws IOException {
MessageType schema = Types.buildMessage()
.required(PrimitiveTypeName.INT32)
.named("order_id")
.required(PrimitiveTypeName.BINARY)
.named("customer-name")
.required(PrimitiveTypeName.BINARY)
.named("product-category")
.required(PrimitiveTypeName.DOUBLE)
.named("sale-amount")
.required(PrimitiveTypeName.BINARY)
.named("region")
.named("SalesRecord");

SimpleGroupFactory factory = new SimpleGroupFactory(schema);

try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(new Path(file.getAbsolutePath()))
.withType(schema)
.build()) {

Group record1 = factory.newGroup()
.append("order_id", 1001)
.append("customer-name", "John Smith")
.append("product-category", "Electronics")
.append("sale-amount", 299.99)
.append("region", "North");
writer.write(record1);

Group record2 = factory.newGroup()
.append("order_id", 1002)
.append("customer-name", "Jane Doe")
.append("product-category", "Home-Garden")
.append("sale-amount", 149.50)
.append("region", "South");
writer.write(record2);
}
}

private static void writeProtoParquet(File file) throws Exception {
TestProtobuf.RepeatedIntMessage.Builder b = TestProtobuf.RepeatedIntMessage.newBuilder()
.addRepeatedInt(1)
Expand Down