From f327623cfe9bd8a578cca0aecbabc32aa84dafb7 Mon Sep 17 00:00:00 2001 From: arnavb Date: Wed, 20 Aug 2025 15:50:36 +0000 Subject: [PATCH 1/6] update --- .../parquet/avro/AvroSchemaConverter.java | 101 ++++++++++++++-- .../parquet/avro/TestAvroSchemaConverter.java | 110 ++++++++++++++++++ 2 files changed, 199 insertions(+), 12 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 8e5a58df86..25ce457b10 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -51,6 +51,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -82,12 +83,26 @@ public class AvroSchemaConverter { public static final String ADD_LIST_ELEMENT_RECORDS = "parquet.avro.add-list-element-records"; private static final boolean ADD_LIST_ELEMENT_RECORDS_DEFAULT = true; + public static final String AVRO_MAX_RECURSION = "parquet.avro.max-recursion"; + private static final int AVRO_MAX_RECURSION_DEFAULT = 10; private final boolean assumeRepeatedIsListElement; private final boolean writeOldListStructure; private final boolean writeParquetUUID; private final boolean readInt96AsFixed; private final Set pathsToInt96; + private final int maxRecursion; + + /** + * Sets the maximum recursion depth for recursive schemas. + * + * @param config The hadoop configuration to be updated. + * @param maxRecursion The maximum recursion depth schemas are allowed to go before terminating + * with an UnsupportedOperationException instead of their actual schema. + */ + public static void setMaxRecursion(Configuration config, int maxRecursion) { + config.setInt(AVRO_MAX_RECURSION, maxRecursion); + } public AvroSchemaConverter() { this(ADD_LIST_ELEMENT_RECORDS_DEFAULT, READ_INT96_AS_FIXED_DEFAULT); @@ -106,6 +121,7 @@ public AvroSchemaConverter() { this.writeParquetUUID = WRITE_PARQUET_UUID_DEFAULT; this.readInt96AsFixed = readInt96AsFixed; this.pathsToInt96 = Collections.emptySet(); + this.maxRecursion = AVRO_MAX_RECURSION_DEFAULT; } public AvroSchemaConverter(Configuration conf) { @@ -118,6 +134,7 @@ public AvroSchemaConverter(ParquetConfiguration conf) { this.writeParquetUUID = conf.getBoolean(WRITE_PARQUET_UUID, WRITE_PARQUET_UUID_DEFAULT); this.readInt96AsFixed = conf.getBoolean(READ_INT96_AS_FIXED, READ_INT96_AS_FIXED_DEFAULT); this.pathsToInt96 = new HashSet<>(Arrays.asList(conf.getStrings(WRITE_FIXED_AS_INT96, new String[0]))); + this.maxRecursion = conf.getInt(AVRO_MAX_RECURSION, AVRO_MAX_RECURSION_DEFAULT); } /** @@ -150,16 +167,23 @@ public MessageType convert(Schema avroSchema) { if (!avroSchema.getType().equals(Schema.Type.RECORD)) { throw new IllegalArgumentException("Avro schema must be a record."); } - return new MessageType(avroSchema.getFullName(), convertFields(avroSchema.getFields(), "")); + return new MessageType( + avroSchema.getFullName(), + convertFields(avroSchema.getFields(), "", new IdentityHashMap())); } private List convertFields(List fields, String schemaPath) { + return convertFields(fields, schemaPath, new IdentityHashMap()); + } + + private List convertFields( + List fields, String schemaPath, IdentityHashMap seenSchemas) { List types = new ArrayList(); for (Schema.Field field : fields) { if (field.schema().getType().equals(Schema.Type.NULL)) { continue; // Avro nulls are not encoded, unless they are null unions } - types.add(convertField(field, appendPath(schemaPath, field.name()))); + types.add(convertField(field, appendPath(schemaPath, field.name()), seenSchemas)); } return types; } @@ -168,11 +192,37 @@ private Type convertField(String fieldName, Schema schema, String schemaPath) { return convertField(fieldName, schema, Type.Repetition.REQUIRED, schemaPath); } + private Type convertField( + String fieldName, Schema schema, String schemaPath, IdentityHashMap seenSchemas) { + return convertField(fieldName, schema, Type.Repetition.REQUIRED, schemaPath, seenSchemas); + } + @SuppressWarnings("deprecation") private Type convertField(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) { - Types.PrimitiveBuilder builder; + return convertField(fieldName, schema, repetition, schemaPath, new IdentityHashMap()); + } + + @SuppressWarnings("deprecation") + private Type convertField( + String fieldName, + Schema schema, + Type.Repetition repetition, + String schemaPath, + IdentityHashMap seenSchemas) { Schema.Type type = schema.getType(); LogicalType logicalType = schema.getLogicalType(); + + if (type.equals(Schema.Type.RECORD) || type.equals(Schema.Type.ENUM) || type.equals(Schema.Type.FIXED)) { + Integer depth = seenSchemas.get(schema); + if (depth != null && depth >= maxRecursion) { + throw new UnsupportedOperationException("Recursive Avro schemas are not supported by parquet-avro: " + + schema.getFullName() + " (exceeded maximum recursion depth of " + maxRecursion + ")"); + } + seenSchemas = new IdentityHashMap<>(seenSchemas); + seenSchemas.put(schema, depth == null ? 1 : depth + 1); + } + + Types.PrimitiveBuilder builder; if (type.equals(Schema.Type.BOOLEAN)) { builder = Types.primitive(BOOLEAN, repetition); } else if (type.equals(Schema.Type.INT)) { @@ -195,21 +245,24 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet builder = Types.primitive(BINARY, repetition).as(stringType()); } } else if (type.equals(Schema.Type.RECORD)) { - return new GroupType(repetition, fieldName, convertFields(schema.getFields(), schemaPath)); + return new GroupType(repetition, fieldName, convertFields(schema.getFields(), schemaPath, seenSchemas)); } else if (type.equals(Schema.Type.ENUM)) { builder = Types.primitive(BINARY, repetition).as(enumType()); } else if (type.equals(Schema.Type.ARRAY)) { if (writeOldListStructure) { return ConversionPatterns.listType( - repetition, fieldName, convertField("array", schema.getElementType(), REPEATED, schemaPath)); + repetition, + fieldName, + convertField("array", schema.getElementType(), REPEATED, schemaPath, seenSchemas)); } else { return ConversionPatterns.listOfElements( repetition, fieldName, - convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType(), schemaPath)); + convertField( + AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType(), schemaPath, seenSchemas)); } } else if (type.equals(Schema.Type.MAP)) { - Type valType = convertField("value", schema.getValueType(), schemaPath); + Type valType = convertField("value", schema.getValueType(), schemaPath, seenSchemas); // avro map key type is always string return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType); } else if (type.equals(Schema.Type.FIXED)) { @@ -223,7 +276,7 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition).length(schema.getFixedSize()); } } else if (type.equals(Schema.Type.UNION)) { - return convertUnion(fieldName, schema, repetition, schemaPath); + return convertUnion(fieldName, schema, repetition, schemaPath, seenSchemas); } else { throw new UnsupportedOperationException("Cannot convert Avro type " + type); } @@ -246,6 +299,15 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet } private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) { + return convertUnion(fieldName, schema, repetition, schemaPath, new IdentityHashMap()); + } + + private Type convertUnion( + String fieldName, + Schema schema, + Type.Repetition repetition, + String schemaPath, + IdentityHashMap seenSchemas) { List nonNullSchemas = new ArrayList(schema.getTypes().size()); // Found any schemas in the union? Required for the edge case, where the union contains only a single type. boolean foundNullSchema = false; @@ -267,20 +329,31 @@ private Type convertUnion(String fieldName, Schema schema, Type.Repetition repet case 1: return foundNullSchema - ? convertField(fieldName, nonNullSchemas.get(0), repetition, schemaPath) - : convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath); + ? convertField(fieldName, nonNullSchemas.get(0), repetition, schemaPath, seenSchemas) + : convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath, seenSchemas); default: // complex union type - return convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath); + return convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath, seenSchemas); } } private Type convertUnionToGroupType( String fieldName, Type.Repetition repetition, List nonNullSchemas, String schemaPath) { + return convertUnionToGroupType( + fieldName, repetition, nonNullSchemas, schemaPath, new IdentityHashMap()); + } + + private Type convertUnionToGroupType( + String fieldName, + Type.Repetition repetition, + List nonNullSchemas, + String schemaPath, + IdentityHashMap seenSchemas) { List unionTypes = new ArrayList(nonNullSchemas.size()); int index = 0; for (Schema childSchema : nonNullSchemas) { - unionTypes.add(convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL, schemaPath)); + unionTypes.add( + convertField("member" + index++, childSchema, Type.Repetition.OPTIONAL, schemaPath, seenSchemas)); } return new GroupType(repetition, fieldName, unionTypes); } @@ -289,6 +362,10 @@ private Type convertField(Schema.Field field, String schemaPath) { return convertField(field.name(), field.schema(), schemaPath); } + private Type convertField(Schema.Field field, String schemaPath, IdentityHashMap seenSchemas) { + return convertField(field.name(), field.schema(), schemaPath, seenSchemas); + } + public Schema convert(MessageType parquetSchema) { return convertFields(parquetSchema.getName(), parquetSchema.getFields(), new HashMap<>()); } diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java index 077e9cecd5..bc36bddd97 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java @@ -965,6 +965,116 @@ public void testAvroFixed12AsParquetInt96Type() throws Exception { () -> new AvroSchemaConverter(conf).convert(schema)); } + @Test + public void testRecursiveSchemaThrowsException() { + String recursiveSchemaJson = "{" + + "\"type\": \"record\", \"name\": \"Node\", \"fields\": [" + + " {\"name\": \"value\", \"type\": \"int\"}," + + " {\"name\": \"children\", \"type\": [" + + " \"null\", {" + + " \"type\": \"array\", \"items\": [\"null\", \"Node\"]" + + " }" + + " ], \"default\": null}" + + "]}"; + + Schema recursiveSchema = new Schema.Parser().parse(recursiveSchemaJson); + + assertThrows( + "Recursive Avro schema should throw UnsupportedOperationException", + UnsupportedOperationException.class, + () -> new AvroSchemaConverter().convert(recursiveSchema)); + } + + @Test + public void testRecursiveSchemaFromGitHubIssue() { + String issueSchemaJson = "{" + + "\"type\": \"record\", \"name\": \"ObjXX\", \"fields\": [" + + " {\"name\": \"id\", \"type\": [\"null\", \"long\"], \"default\": null}," + + " {\"name\": \"struct_add_list\", \"type\": [\"null\", {" + + " \"type\": \"array\", \"items\": [\"null\", {" + + " \"type\": \"record\", \"name\": \"ObjStructAdd\", \"fields\": [" + + " {\"name\": \"name\", \"type\": [\"null\", \"string\"], \"default\": null}," + + " {\"name\": \"fld_list\", \"type\": [\"null\", {" + + " \"type\": \"array\", \"items\": [\"null\", {" + + " \"type\": \"record\", \"name\": \"ObjStructAddFld\", \"fields\": [" + + " {\"name\": \"name\", \"type\": [\"null\", \"string\"], \"default\": null}," + + " {\"name\": \"ref_val\", \"type\": [\"null\", \"ObjStructAdd\"], \"default\": null}" + + " ]" + + " }]" + + " }], \"default\": null}" + + " ]" + + " }]" + + " }], \"default\": null}," + + " {\"name\": \"kafka_timestamp\", \"type\": {\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}}" + + "]}"; + + Schema issueSchema = new Schema.Parser().parse(issueSchemaJson); + + assertThrows( + "Schema should throw UnsupportedOperationException", + UnsupportedOperationException.class, + () -> new AvroSchemaConverter().convert(issueSchema)); + } + + @Test + public void testRecursiveSchemaErrorMessage() { + String recursiveSchemaJson = "{" + + "\"type\": \"record\", \"name\": \"TestRecord\", \"fields\": [" + + " {\"name\": \"self\", \"type\": [\"null\", \"TestRecord\"], \"default\": null}" + + "]}"; + + Schema recursiveSchema = new Schema.Parser().parse(recursiveSchemaJson); + + try { + new AvroSchemaConverter().convert(recursiveSchema); + Assert.fail("Expected UnsupportedOperationException"); + } catch (UnsupportedOperationException e) { + String message = e.getMessage(); + Assert.assertTrue( + "Error message should mention recursion", + message.contains("Recursive Avro schemas are not supported")); + Assert.assertTrue("Error message should mention schema name", message.contains("TestRecord")); + Assert.assertTrue( + "Error message should mention max recursion depth", message.contains("maximum recursion depth")); + } + } + + @Test + public void testConfigurableMaxRecursion() { + String recursiveSchemaJson = "{" + + "\"type\": \"record\", \"name\": \"Node\", \"fields\": [" + + " {\"name\": \"child\", \"type\": [\"null\", \"Node\"], \"default\": null}" + + "]}"; + + Schema recursiveSchema = new Schema.Parser().parse(recursiveSchemaJson); + Configuration conf = new Configuration(); + + AvroSchemaConverter.setMaxRecursion(conf, 1); + assertThrows( + "Should fail with max recursion 1", + UnsupportedOperationException.class, + () -> new AvroSchemaConverter(conf).convert(recursiveSchema)); + + AvroSchemaConverter.setMaxRecursion(conf, 5); + assertThrows( + "Should fail with max recursion 5", + UnsupportedOperationException.class, + () -> new AvroSchemaConverter(conf).convert(recursiveSchema)); + } + + @Test + public void testDeeplyNestedNonRecursiveSchema() { + Schema level3 = record("Level3", field("value", primitive(STRING))); + Schema level2 = record("Level2", field("level3", level3)); + Schema level1 = record("Level1", field("level2", level2)); + Schema rootSchema = record("Root", field("level1", level1)); + + AvroSchemaConverter converter = new AvroSchemaConverter(); + MessageType result = converter.convert(rootSchema); + Assert.assertNotNull("Non-recursive deep schema should convert successfully", result); + Assert.assertEquals("Root schema name should be preserved", "Root", result.getName()); + } + public static Schema optional(Schema original) { return Schema.createUnion(Lists.newArrayList(Schema.create(Schema.Type.NULL), original)); } From 6e6b4aa5be46da687cb32fe4e221c518293ce369 Mon Sep 17 00:00:00 2001 From: arnavb Date: Thu, 28 Aug 2025 08:07:26 +0000 Subject: [PATCH 2/6] update --- .../parquet/avro/AvroSchemaConverter.java | 26 +++--------- .../parquet/avro/TestAvroSchemaConverter.java | 40 +++---------------- 2 files changed, 10 insertions(+), 56 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 25ce457b10..eaa5a25746 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -83,26 +83,12 @@ public class AvroSchemaConverter { public static final String ADD_LIST_ELEMENT_RECORDS = "parquet.avro.add-list-element-records"; private static final boolean ADD_LIST_ELEMENT_RECORDS_DEFAULT = true; - public static final String AVRO_MAX_RECURSION = "parquet.avro.max-recursion"; - private static final int AVRO_MAX_RECURSION_DEFAULT = 10; private final boolean assumeRepeatedIsListElement; private final boolean writeOldListStructure; private final boolean writeParquetUUID; private final boolean readInt96AsFixed; private final Set pathsToInt96; - private final int maxRecursion; - - /** - * Sets the maximum recursion depth for recursive schemas. - * - * @param config The hadoop configuration to be updated. - * @param maxRecursion The maximum recursion depth schemas are allowed to go before terminating - * with an UnsupportedOperationException instead of their actual schema. - */ - public static void setMaxRecursion(Configuration config, int maxRecursion) { - config.setInt(AVRO_MAX_RECURSION, maxRecursion); - } public AvroSchemaConverter() { this(ADD_LIST_ELEMENT_RECORDS_DEFAULT, READ_INT96_AS_FIXED_DEFAULT); @@ -121,7 +107,6 @@ public AvroSchemaConverter() { this.writeParquetUUID = WRITE_PARQUET_UUID_DEFAULT; this.readInt96AsFixed = readInt96AsFixed; this.pathsToInt96 = Collections.emptySet(); - this.maxRecursion = AVRO_MAX_RECURSION_DEFAULT; } public AvroSchemaConverter(Configuration conf) { @@ -134,7 +119,6 @@ public AvroSchemaConverter(ParquetConfiguration conf) { this.writeParquetUUID = conf.getBoolean(WRITE_PARQUET_UUID, WRITE_PARQUET_UUID_DEFAULT); this.readInt96AsFixed = conf.getBoolean(READ_INT96_AS_FIXED, READ_INT96_AS_FIXED_DEFAULT); this.pathsToInt96 = new HashSet<>(Arrays.asList(conf.getStrings(WRITE_FIXED_AS_INT96, new String[0]))); - this.maxRecursion = conf.getInt(AVRO_MAX_RECURSION, AVRO_MAX_RECURSION_DEFAULT); } /** @@ -213,13 +197,13 @@ private Type convertField( LogicalType logicalType = schema.getLogicalType(); if (type.equals(Schema.Type.RECORD) || type.equals(Schema.Type.ENUM) || type.equals(Schema.Type.FIXED)) { - Integer depth = seenSchemas.get(schema); - if (depth != null && depth >= maxRecursion) { - throw new UnsupportedOperationException("Recursive Avro schemas are not supported by parquet-avro: " - + schema.getFullName() + " (exceeded maximum recursion depth of " + maxRecursion + ")"); + // If this schema has already been seen in the current branch, we have a recursion loop + if (seenSchemas.containsKey(schema)) { + throw new UnsupportedOperationException( + "Recursive Avro schemas are not supported by parquet-avro: " + schema.getFullName()); } seenSchemas = new IdentityHashMap<>(seenSchemas); - seenSchemas.put(schema, depth == null ? 1 : depth + 1); + seenSchemas.put(schema, 1); } Types.PrimitiveBuilder builder; diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java index bc36bddd97..d54cd4310a 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java @@ -980,7 +980,7 @@ public void testRecursiveSchemaThrowsException() { Schema recursiveSchema = new Schema.Parser().parse(recursiveSchemaJson); assertThrows( - "Recursive Avro schema should throw UnsupportedOperationException", + "Recursive Avro schema should throw UnsupportedOperationException for cycles", UnsupportedOperationException.class, () -> new AvroSchemaConverter().convert(recursiveSchema)); } @@ -1011,7 +1011,7 @@ public void testRecursiveSchemaFromGitHubIssue() { Schema issueSchema = new Schema.Parser().parse(issueSchemaJson); assertThrows( - "Schema should throw UnsupportedOperationException", + "Schema hould throw UnsupportedOperationException for cycles", UnsupportedOperationException.class, () -> new AvroSchemaConverter().convert(issueSchema)); } @@ -1025,41 +1025,11 @@ public void testRecursiveSchemaErrorMessage() { Schema recursiveSchema = new Schema.Parser().parse(recursiveSchemaJson); - try { - new AvroSchemaConverter().convert(recursiveSchema); - Assert.fail("Expected UnsupportedOperationException"); - } catch (UnsupportedOperationException e) { - String message = e.getMessage(); - Assert.assertTrue( - "Error message should mention recursion", - message.contains("Recursive Avro schemas are not supported")); - Assert.assertTrue("Error message should mention schema name", message.contains("TestRecord")); - Assert.assertTrue( - "Error message should mention max recursion depth", message.contains("maximum recursion depth")); - } - } - - @Test - public void testConfigurableMaxRecursion() { - String recursiveSchemaJson = "{" - + "\"type\": \"record\", \"name\": \"Node\", \"fields\": [" - + " {\"name\": \"child\", \"type\": [\"null\", \"Node\"], \"default\": null}" - + "]}"; - - Schema recursiveSchema = new Schema.Parser().parse(recursiveSchemaJson); - Configuration conf = new Configuration(); - - AvroSchemaConverter.setMaxRecursion(conf, 1); + // With our cycle detection fix, this should throw UnsupportedOperationException assertThrows( - "Should fail with max recursion 1", + "Recursive schema should throw UnsupportedOperationException with clear error message", UnsupportedOperationException.class, - () -> new AvroSchemaConverter(conf).convert(recursiveSchema)); - - AvroSchemaConverter.setMaxRecursion(conf, 5); - assertThrows( - "Should fail with max recursion 5", - UnsupportedOperationException.class, - () -> new AvroSchemaConverter(conf).convert(recursiveSchema)); + () -> new AvroSchemaConverter().convert(recursiveSchema)); } @Test From 368f834420ff8c32ee6744474c792f1cbada3ccc Mon Sep 17 00:00:00 2001 From: arnavb Date: Thu, 28 Aug 2025 09:23:23 +0000 Subject: [PATCH 3/6] update --- .../parquet/avro/AvroSchemaConverter.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index eaa5a25746..16ede5eda0 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -153,15 +153,15 @@ public MessageType convert(Schema avroSchema) { } return new MessageType( avroSchema.getFullName(), - convertFields(avroSchema.getFields(), "", new IdentityHashMap())); + convertFields(avroSchema.getFields(), "", new HashSet())); } private List convertFields(List fields, String schemaPath) { - return convertFields(fields, schemaPath, new IdentityHashMap()); + return convertFields(fields, schemaPath, new HashSet()); } private List convertFields( - List fields, String schemaPath, IdentityHashMap seenSchemas) { + List fields, String schemaPath, Set seenSchemas) { List types = new ArrayList(); for (Schema.Field field : fields) { if (field.schema().getType().equals(Schema.Type.NULL)) { @@ -177,13 +177,13 @@ private Type convertField(String fieldName, Schema schema, String schemaPath) { } private Type convertField( - String fieldName, Schema schema, String schemaPath, IdentityHashMap seenSchemas) { + String fieldName, Schema schema, String schemaPath, Set seenSchemas) { return convertField(fieldName, schema, Type.Repetition.REQUIRED, schemaPath, seenSchemas); } @SuppressWarnings("deprecation") private Type convertField(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) { - return convertField(fieldName, schema, repetition, schemaPath, new IdentityHashMap()); + return convertField(fieldName, schema, repetition, schemaPath, new HashSet()); } @SuppressWarnings("deprecation") @@ -192,18 +192,18 @@ private Type convertField( Schema schema, Type.Repetition repetition, String schemaPath, - IdentityHashMap seenSchemas) { + Set seenSchemas) { Schema.Type type = schema.getType(); LogicalType logicalType = schema.getLogicalType(); if (type.equals(Schema.Type.RECORD) || type.equals(Schema.Type.ENUM) || type.equals(Schema.Type.FIXED)) { // If this schema has already been seen in the current branch, we have a recursion loop - if (seenSchemas.containsKey(schema)) { + if (seenSchemas.contains(schema)) { throw new UnsupportedOperationException( "Recursive Avro schemas are not supported by parquet-avro: " + schema.getFullName()); } - seenSchemas = new IdentityHashMap<>(seenSchemas); - seenSchemas.put(schema, 1); + seenSchemas = new HashSet<>(seenSchemas); + seenSchemas.add(schema); } Types.PrimitiveBuilder builder; @@ -283,7 +283,7 @@ private Type convertField( } private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) { - return convertUnion(fieldName, schema, repetition, schemaPath, new IdentityHashMap()); + return convertUnion(fieldName, schema, repetition, schemaPath, new HashSet()); } private Type convertUnion( @@ -291,7 +291,7 @@ private Type convertUnion( Schema schema, Type.Repetition repetition, String schemaPath, - IdentityHashMap seenSchemas) { + Set seenSchemas) { List nonNullSchemas = new ArrayList(schema.getTypes().size()); // Found any schemas in the union? Required for the edge case, where the union contains only a single type. boolean foundNullSchema = false; @@ -324,7 +324,7 @@ private Type convertUnion( private Type convertUnionToGroupType( String fieldName, Type.Repetition repetition, List nonNullSchemas, String schemaPath) { return convertUnionToGroupType( - fieldName, repetition, nonNullSchemas, schemaPath, new IdentityHashMap()); + fieldName, repetition, nonNullSchemas, schemaPath, new HashSet()); } private Type convertUnionToGroupType( @@ -332,7 +332,7 @@ private Type convertUnionToGroupType( Type.Repetition repetition, List nonNullSchemas, String schemaPath, - IdentityHashMap seenSchemas) { + Set seenSchemas) { List unionTypes = new ArrayList(nonNullSchemas.size()); int index = 0; for (Schema childSchema : nonNullSchemas) { @@ -346,7 +346,7 @@ private Type convertField(Schema.Field field, String schemaPath) { return convertField(field.name(), field.schema(), schemaPath); } - private Type convertField(Schema.Field field, String schemaPath, IdentityHashMap seenSchemas) { + private Type convertField(Schema.Field field, String schemaPath, Set seenSchemas) { return convertField(field.name(), field.schema(), schemaPath, seenSchemas); } From 4dab752afd7abc4624f829918680e3f13c2e525a Mon Sep 17 00:00:00 2001 From: arnavb Date: Thu, 28 Aug 2025 09:50:06 +0000 Subject: [PATCH 4/6] lint --- .../parquet/avro/AvroSchemaConverter.java | 25 +++++-------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 16ede5eda0..bfe8ce52b2 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -51,7 +51,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -152,16 +151,14 @@ public MessageType convert(Schema avroSchema) { throw new IllegalArgumentException("Avro schema must be a record."); } return new MessageType( - avroSchema.getFullName(), - convertFields(avroSchema.getFields(), "", new HashSet())); + avroSchema.getFullName(), convertFields(avroSchema.getFields(), "", new HashSet())); } private List convertFields(List fields, String schemaPath) { return convertFields(fields, schemaPath, new HashSet()); } - private List convertFields( - List fields, String schemaPath, Set seenSchemas) { + private List convertFields(List fields, String schemaPath, Set seenSchemas) { List types = new ArrayList(); for (Schema.Field field : fields) { if (field.schema().getType().equals(Schema.Type.NULL)) { @@ -176,8 +173,7 @@ private Type convertField(String fieldName, Schema schema, String schemaPath) { return convertField(fieldName, schema, Type.Repetition.REQUIRED, schemaPath); } - private Type convertField( - String fieldName, Schema schema, String schemaPath, Set seenSchemas) { + private Type convertField(String fieldName, Schema schema, String schemaPath, Set seenSchemas) { return convertField(fieldName, schema, Type.Repetition.REQUIRED, schemaPath, seenSchemas); } @@ -188,11 +184,7 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet @SuppressWarnings("deprecation") private Type convertField( - String fieldName, - Schema schema, - Type.Repetition repetition, - String schemaPath, - Set seenSchemas) { + String fieldName, Schema schema, Type.Repetition repetition, String schemaPath, Set seenSchemas) { Schema.Type type = schema.getType(); LogicalType logicalType = schema.getLogicalType(); @@ -287,11 +279,7 @@ private Type convertUnion(String fieldName, Schema schema, Type.Repetition repet } private Type convertUnion( - String fieldName, - Schema schema, - Type.Repetition repetition, - String schemaPath, - Set seenSchemas) { + String fieldName, Schema schema, Type.Repetition repetition, String schemaPath, Set seenSchemas) { List nonNullSchemas = new ArrayList(schema.getTypes().size()); // Found any schemas in the union? Required for the edge case, where the union contains only a single type. boolean foundNullSchema = false; @@ -323,8 +311,7 @@ private Type convertUnion( private Type convertUnionToGroupType( String fieldName, Type.Repetition repetition, List nonNullSchemas, String schemaPath) { - return convertUnionToGroupType( - fieldName, repetition, nonNullSchemas, schemaPath, new HashSet()); + return convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath, new HashSet()); } private Type convertUnionToGroupType( From 9f901f44d934669f7480f2553ea25e9b2eca1302 Mon Sep 17 00:00:00 2001 From: arnavb Date: Sun, 31 Aug 2025 07:17:17 +0000 Subject: [PATCH 5/6] update --- .../parquet/avro/AvroSchemaConverter.java | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index bfe8ce52b2..3feee01221 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -51,6 +51,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -151,14 +152,14 @@ public MessageType convert(Schema avroSchema) { throw new IllegalArgumentException("Avro schema must be a record."); } return new MessageType( - avroSchema.getFullName(), convertFields(avroSchema.getFields(), "", new HashSet())); + avroSchema.getFullName(), convertFields(avroSchema.getFields(), "", new IdentityHashMap())); } private List convertFields(List fields, String schemaPath) { - return convertFields(fields, schemaPath, new HashSet()); + return convertFields(fields, schemaPath, new IdentityHashMap()); } - private List convertFields(List fields, String schemaPath, Set seenSchemas) { + private List convertFields(List fields, String schemaPath, IdentityHashMap seenSchemas) { List types = new ArrayList(); for (Schema.Field field : fields) { if (field.schema().getType().equals(Schema.Type.NULL)) { @@ -173,29 +174,29 @@ private Type convertField(String fieldName, Schema schema, String schemaPath) { return convertField(fieldName, schema, Type.Repetition.REQUIRED, schemaPath); } - private Type convertField(String fieldName, Schema schema, String schemaPath, Set seenSchemas) { + private Type convertField(String fieldName, Schema schema, String schemaPath, IdentityHashMap seenSchemas) { return convertField(fieldName, schema, Type.Repetition.REQUIRED, schemaPath, seenSchemas); } @SuppressWarnings("deprecation") private Type convertField(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) { - return convertField(fieldName, schema, repetition, schemaPath, new HashSet()); + return convertField(fieldName, schema, repetition, schemaPath, new IdentityHashMap()); } @SuppressWarnings("deprecation") private Type convertField( - String fieldName, Schema schema, Type.Repetition repetition, String schemaPath, Set seenSchemas) { + String fieldName, Schema schema, Type.Repetition repetition, String schemaPath, IdentityHashMap seenSchemas) { Schema.Type type = schema.getType(); LogicalType logicalType = schema.getLogicalType(); if (type.equals(Schema.Type.RECORD) || type.equals(Schema.Type.ENUM) || type.equals(Schema.Type.FIXED)) { // If this schema has already been seen in the current branch, we have a recursion loop - if (seenSchemas.contains(schema)) { + if (seenSchemas.containsKey(schema)) { throw new UnsupportedOperationException( "Recursive Avro schemas are not supported by parquet-avro: " + schema.getFullName()); } - seenSchemas = new HashSet<>(seenSchemas); - seenSchemas.add(schema); + seenSchemas = new IdentityHashMap<>(seenSchemas); + seenSchemas.put(schema, null); } Types.PrimitiveBuilder builder; @@ -275,11 +276,11 @@ private Type convertField( } private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition, String schemaPath) { - return convertUnion(fieldName, schema, repetition, schemaPath, new HashSet()); + return convertUnion(fieldName, schema, repetition, schemaPath, new IdentityHashMap()); } private Type convertUnion( - String fieldName, Schema schema, Type.Repetition repetition, String schemaPath, Set seenSchemas) { + String fieldName, Schema schema, Type.Repetition repetition, String schemaPath, IdentityHashMap seenSchemas) { List nonNullSchemas = new ArrayList(schema.getTypes().size()); // Found any schemas in the union? Required for the edge case, where the union contains only a single type. boolean foundNullSchema = false; @@ -311,7 +312,7 @@ private Type convertUnion( private Type convertUnionToGroupType( String fieldName, Type.Repetition repetition, List nonNullSchemas, String schemaPath) { - return convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath, new HashSet()); + return convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath, new IdentityHashMap()); } private Type convertUnionToGroupType( @@ -319,7 +320,7 @@ private Type convertUnionToGroupType( Type.Repetition repetition, List nonNullSchemas, String schemaPath, - Set seenSchemas) { + IdentityHashMap seenSchemas) { List unionTypes = new ArrayList(nonNullSchemas.size()); int index = 0; for (Schema childSchema : nonNullSchemas) { @@ -333,7 +334,7 @@ private Type convertField(Schema.Field field, String schemaPath) { return convertField(field.name(), field.schema(), schemaPath); } - private Type convertField(Schema.Field field, String schemaPath, Set seenSchemas) { + private Type convertField(Schema.Field field, String schemaPath, IdentityHashMap seenSchemas) { return convertField(field.name(), field.schema(), schemaPath, seenSchemas); } From 3a69f541abb0e14ae1d2d080b7a3e72fa1f71ec0 Mon Sep 17 00:00:00 2001 From: arnavb Date: Sun, 31 Aug 2025 15:01:13 +0000 Subject: [PATCH 6/6] lint --- .../parquet/avro/AvroSchemaConverter.java | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 3feee01221..58e9c2e198 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -152,14 +152,16 @@ public MessageType convert(Schema avroSchema) { throw new IllegalArgumentException("Avro schema must be a record."); } return new MessageType( - avroSchema.getFullName(), convertFields(avroSchema.getFields(), "", new IdentityHashMap())); + avroSchema.getFullName(), + convertFields(avroSchema.getFields(), "", new IdentityHashMap())); } private List convertFields(List fields, String schemaPath) { return convertFields(fields, schemaPath, new IdentityHashMap()); } - private List convertFields(List fields, String schemaPath, IdentityHashMap seenSchemas) { + private List convertFields( + List fields, String schemaPath, IdentityHashMap seenSchemas) { List types = new ArrayList(); for (Schema.Field field : fields) { if (field.schema().getType().equals(Schema.Type.NULL)) { @@ -174,7 +176,8 @@ private Type convertField(String fieldName, Schema schema, String schemaPath) { return convertField(fieldName, schema, Type.Repetition.REQUIRED, schemaPath); } - private Type convertField(String fieldName, Schema schema, String schemaPath, IdentityHashMap seenSchemas) { + private Type convertField( + String fieldName, Schema schema, String schemaPath, IdentityHashMap seenSchemas) { return convertField(fieldName, schema, Type.Repetition.REQUIRED, schemaPath, seenSchemas); } @@ -185,7 +188,11 @@ private Type convertField(String fieldName, Schema schema, Type.Repetition repet @SuppressWarnings("deprecation") private Type convertField( - String fieldName, Schema schema, Type.Repetition repetition, String schemaPath, IdentityHashMap seenSchemas) { + String fieldName, + Schema schema, + Type.Repetition repetition, + String schemaPath, + IdentityHashMap seenSchemas) { Schema.Type type = schema.getType(); LogicalType logicalType = schema.getLogicalType(); @@ -280,7 +287,11 @@ private Type convertUnion(String fieldName, Schema schema, Type.Repetition repet } private Type convertUnion( - String fieldName, Schema schema, Type.Repetition repetition, String schemaPath, IdentityHashMap seenSchemas) { + String fieldName, + Schema schema, + Type.Repetition repetition, + String schemaPath, + IdentityHashMap seenSchemas) { List nonNullSchemas = new ArrayList(schema.getTypes().size()); // Found any schemas in the union? Required for the edge case, where the union contains only a single type. boolean foundNullSchema = false; @@ -312,7 +323,8 @@ private Type convertUnion( private Type convertUnionToGroupType( String fieldName, Type.Repetition repetition, List nonNullSchemas, String schemaPath) { - return convertUnionToGroupType(fieldName, repetition, nonNullSchemas, schemaPath, new IdentityHashMap()); + return convertUnionToGroupType( + fieldName, repetition, nonNullSchemas, schemaPath, new IdentityHashMap()); } private Type convertUnionToGroupType(