From e501f3515408b5352ff13f4e72b37c8dbdc57fa1 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 9 Feb 2026 09:23:39 +0100 Subject: [PATCH 1/7] #723 Add 'generate_corrupted_fields' as a parameter and part of Spark Schema. --- .../cobol/parser/common/Constants.scala | 3 ++ .../reader/parameters/CobolParameters.scala | 4 +- .../parameters/CobolParametersParser.scala | 3 ++ .../reader/parameters/ReaderParameters.scala | 2 + .../cobol/reader/schema/CobolSchema.scala | 10 +++-- .../spark/cobol/schema/CobolSchema.scala | 19 ++++++++- .../cobrix/spark/cobol/CobolSchemaSpec.scala | 39 ++++++++++++++----- 7 files changed, 64 insertions(+), 16 deletions(-) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala index 950dec281..7f18fd3af 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala @@ -64,6 +64,9 @@ object Constants { val recordIdField = "Record_Id" val recordByteLength = "Record_Byte_Length" val recordBytes = "Record_Bytes" + val corruptedFieldsField = "_corrupted_fields" + val fieldNameColumn = "field_name" + val rawValueColumn = "raw_value" // Non-terminals val nonTerminalsPostfix = "_NT" diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala index f8331e716..e9bab4ff8 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala @@ -17,9 +17,9 @@ package za.co.absa.cobrix.cobol.reader.parameters import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat -import za.co.absa.cobrix.cobol.parser.policies.{CommentPolicy, FillerNamingPolicy, MetadataPolicy} import za.co.absa.cobrix.cobol.parser.policies.DebugFieldsPolicy.DebugFieldsPolicy import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy +import za.co.absa.cobrix.cobol.parser.policies.{CommentPolicy, FillerNamingPolicy, MetadataPolicy} import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy @@ -47,6 +47,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten * @param variableLengthParams VariableLengthParameters containing the specifications for the consumption of variable-length Cobol records. * @param variableSizeOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements * @param generateRecordBytes Generate 'record_bytes' field containing raw bytes of the original record + * @param generateCorruptFields Generate '_corrupt_fields' field for fields that haven't converted successfully * @param schemaRetentionPolicy A copybook usually has a root group struct element that acts like a rowtag in XML. This can be retained in Spark schema or can be collapsed * @param stringTrimmingPolicy Specify if and how strings should be trimmed when parsed * @param isDisplayAlwaysString If true, all fields having DISPLAY format will remain strings and won't be converted to numbers @@ -87,6 +88,7 @@ case class CobolParameters( variableLengthParams: Option[VariableLengthParameters], variableSizeOccurs: Boolean, generateRecordBytes: Boolean, + generateCorruptFields: Boolean, schemaRetentionPolicy: SchemaRetentionPolicy, stringTrimmingPolicy: StringTrimmingPolicy, isDisplayAlwaysString: Boolean, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala index e563d6650..c504662ff 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala @@ -66,6 +66,7 @@ object CobolParametersParser extends Logging { // Schema transformation parameters val PARAM_GENERATE_RECORD_ID = "generate_record_id" val PARAM_GENERATE_RECORD_BYTES = "generate_record_bytes" + val PARAM_CORRUPTED_FIELDS = "generate_corrupted_fields" val PARAM_SCHEMA_RETENTION_POLICY = "schema_retention_policy" val PARAM_GROUP_FILLERS = "drop_group_fillers" val PARAM_VALUE_FILLERS = "drop_value_fillers" @@ -286,6 +287,7 @@ object CobolParametersParser extends Logging { variableLengthParams, params.getOrElse(PARAM_VARIABLE_SIZE_OCCURS, "false").toBoolean, params.getOrElse(PARAM_GENERATE_RECORD_BYTES, "false").toBoolean, + params.getOrElse(PARAM_CORRUPTED_FIELDS, "false").toBoolean, schemaRetentionPolicy, stringTrimmingPolicy, params.getOrElse(PARAM_DISPLAY_PIC_ALWAYS_STRING, "false").toBoolean, @@ -431,6 +433,7 @@ object CobolParametersParser extends Logging { fileEndOffset = varLenParams.fileEndOffset, generateRecordId = varLenParams.generateRecordId, generateRecordBytes = parameters.generateRecordBytes, + generateCorruptFields = parameters.generateCorruptFields, schemaPolicy = parameters.schemaRetentionPolicy, stringTrimmingPolicy = parameters.stringTrimmingPolicy, isDisplayAlwaysString = parameters.isDisplayAlwaysString, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala index 01148faec..4a527e8a4 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala @@ -58,6 +58,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten * @param fileEndOffset A number of bytes to skip at the end of each file * @param generateRecordId If true, a record id field will be prepended to each record. * @param generateRecordBytes Generate 'record_bytes' field containing raw bytes of the original record + * @param generateCorruptFields Generate '_corrupt_fields' field for fields that haven't converted successfully * @param schemaPolicy Specifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook. * @param stringTrimmingPolicy Specifies if and how strings should be trimmed when parsed. * @param isDisplayAlwaysString If true, all fields having DISPLAY format will remain strings and won't be converted to numbers. @@ -111,6 +112,7 @@ case class ReaderParameters( fileEndOffset: Int = 0, generateRecordId: Boolean = false, generateRecordBytes: Boolean = false, + generateCorruptFields: Boolean = false, schemaPolicy: SchemaRetentionPolicy = SchemaRetentionPolicy.CollapseRoot, stringTrimmingPolicy: StringTrimmingPolicy = StringTrimmingPolicy.TrimBoth, isDisplayAlwaysString: Boolean = false, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala index c3f0f374c..5180e7778 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala @@ -17,16 +17,15 @@ package za.co.absa.cobrix.cobol.reader.schema import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage - -import java.time.ZonedDateTime -import java.time.format.DateTimeFormatter -import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser} import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC} import za.co.absa.cobrix.cobol.parser.policies.MetadataPolicy +import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser} import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy import java.nio.charset.{Charset, StandardCharsets} +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter import scala.collection.immutable.HashMap @@ -40,6 +39,7 @@ import scala.collection.immutable.HashMap * @param strictIntegralPrecision If true, Cobrix will not generate short/integer/long Spark data types, and always use decimal(n) with the exact precision that matches the copybook. * @param generateRecordId If true, a record id field will be prepended to the beginning of the schema. * @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema. + * @param generateCorruptedFields If true, a corrupted record field will be appended to the end of the schema. * @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema. * @param generateSegIdFieldsCnt A number of segment ID levels to generate * @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used) @@ -52,6 +52,7 @@ class CobolSchema(val copybook: Copybook, val inputFileNameField: String, val generateRecordId: Boolean, val generateRecordBytes: Boolean, + val generateCorruptedFields: Boolean, val generateSegIdFieldsCnt: Int = 0, val segmentIdProvidedPrefix: String = "", val metadataPolicy: MetadataPolicy = MetadataPolicy.Basic) extends Serializable { @@ -143,6 +144,7 @@ object CobolSchema { readerParameters.inputFileNameColumn, readerParameters.generateRecordId, readerParameters.generateRecordBytes, + readerParameters.generateCorruptFields, segIdFieldCount, segmentIdPrefix, readerParameters.metadataPolicy diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala index c781b6419..c5d441a19 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala @@ -24,11 +24,11 @@ import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, COMP1, COMP2, import za.co.absa.cobrix.cobol.parser.common.Constants import za.co.absa.cobrix.cobol.parser.encoding.RAW import za.co.absa.cobrix.cobol.parser.policies.MetadataPolicy +import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.getReaderProperties import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters} import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy import za.co.absa.cobrix.cobol.reader.schema.{CobolSchema => CobolReaderSchema} -import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.getReaderProperties import za.co.absa.cobrix.spark.cobol.parameters.MetadataFields.{MAX_ELEMENTS, MAX_LENGTH, MIN_ELEMENTS} import scala.collection.mutable @@ -44,6 +44,7 @@ import scala.collection.mutable.ArrayBuffer * @param strictIntegralPrecision If true, Cobrix will not generate short/integer/long Spark data types, and always use decimal(n) with the exact precision that matches the copybook. * @param generateRecordId If true, a record id field will be prepended to the beginning of the schema. * @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema. + * @param generateCorruptedFields If true, a corrupted record field will be appended to the beginning of the schema. * @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema. * @param generateSegIdFieldsCnt A number of segment ID levels to generate * @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used) @@ -56,6 +57,7 @@ class CobolSchema(copybook: Copybook, inputFileNameField: String = "", generateRecordId: Boolean = false, generateRecordBytes: Boolean = false, + generateCorruptedFields: Boolean = false, generateSegIdFieldsCnt: Int = 0, segmentIdProvidedPrefix: String = "", metadataPolicy: MetadataPolicy = MetadataPolicy.Basic) @@ -66,6 +68,7 @@ class CobolSchema(copybook: Copybook, inputFileNameField, generateRecordId, generateRecordBytes, + generateCorruptedFields, generateSegIdFieldsCnt, segmentIdProvidedPrefix ) with Logging with Serializable { @@ -126,7 +129,18 @@ class CobolSchema(copybook: Copybook, recordsWithRecordBytes } - StructType(recordsWithRecordId) + val recordsWithCorruptedFields = if (generateCorruptedFields) { + recordsWithRecordId :+ StructField(Constants.corruptedFieldsField, ArrayType(StructType( + Seq( + StructField(Constants.fieldNameColumn, StringType, nullable = false), + StructField(Constants.rawValueColumn, BinaryType, nullable = false) + ) + ), containsNull = false), nullable = false) + } else { + recordsWithRecordId + } + + StructType(recordsWithCorruptedFields) } private [cobrix] def getMaximumSegmentIdLength(segmentIdProvidedPrefix: String): Int = { @@ -309,6 +323,7 @@ object CobolSchema { schema.inputFileNameField, schema.generateRecordId, schema.generateRecordBytes, + schema.generateCorruptedFields, schema.generateSegIdFieldsCnt, schema.segmentIdPrefix, schema.metadataPolicy diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala index b25117b4b..81f73e9c9 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala @@ -16,7 +16,7 @@ package za.co.absa.cobrix.spark.cobol -import org.apache.spark.sql.types.{ArrayType, DecimalType, IntegerType, LongType, StringType, StructType} +import org.apache.spark.sql.types._ import org.scalatest.wordspec.AnyWordSpec import org.slf4j.{Logger, LoggerFactory} import za.co.absa.cobrix.cobol.parser.CopybookParser @@ -272,7 +272,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", true, false, 2) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", true, false, false, 2) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -290,7 +290,28 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, true, 2) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, true, false, 2) + val actualSchema = cobolSchema.getSparkSchema.treeString + + assertEqualsMultiline(actualSchema, expectedSchema) + } + + "multi-segment keep-original with corrupted record generation" in { + val expectedSchema = + """root + | |-- Seg_Id0: string (nullable = true) + | |-- Seg_Id1: string (nullable = true) + | |-- STRUCT1: struct (nullable = true) + | | |-- IntValue: integer (nullable = true) + | |-- STRUCT2: struct (nullable = true) + | | |-- STR_FLD: string (nullable = true) + | |-- _corrupted_fields: array (nullable = false) + | | |-- element: struct (containsNull = false) + | | | |-- field_name: string (nullable = false) + | | | |-- raw_value: binary (nullable = false) + |""".stripMargin.replaceAll("[\\r\\n]", "\n") + val parsedSchema = CopybookParser.parseTree(copyBook) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, false, true, 2) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -311,7 +332,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", true, true, 2) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", true, true, false, 2) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -332,7 +353,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, true, "", true, true, 2) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, true, "", true, true, false, 2) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -349,7 +370,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, false, 2) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, false, false, 2) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -367,7 +388,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", true, false, 2) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", true, false, false, 2) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -382,7 +403,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", false, false, 2) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", false, false, false, 2) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -464,7 +485,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | |-- NUM3: decimal(9,8) (nullable = true)""".stripMargin val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, true, false, "", false, false, 2) + val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, true, false, "", false, false, false, 2) val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) From 0f546e0640c9f1cfc5f9ee2eca722157631e630a Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 10 Feb 2026 08:47:10 +0100 Subject: [PATCH 2/7] #723 Add support for generating corrupted fields in record extraction - Introduced `generate_corrupted_fields` option to append a `_corrupted_fields` array to the schema, containing field names and raw values of fields that could not be decoded. - Updated relevant methods in `RecordExtractors` to handle corrupted fields. - Enhanced documentation and README to reflect the new feature. --- README.md | 29 ++-- .../cobrix/cobol/parser/ast/Primitive.scala | 28 +++- .../extractors/record/CorruptedField.scala | 22 +++ .../extractors/record/RecordExtractors.scala | 52 +++++-- .../iterator/FixedLenNestedRowIterator.scala | 2 +- .../iterator/VarLenNestedIterator.scala | 5 +- .../parameters/CobolParametersParser.scala | 4 + .../builder/SparkCobolOptionsBuilder.scala | 13 +- .../Test39CorruptedFieldsSpec.scala | 127 ++++++++++++++++++ 9 files changed, 251 insertions(+), 31 deletions(-) create mode 100644 cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptedField.scala create mode 100644 spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39CorruptedFieldsSpec.scala diff --git a/README.md b/README.md index 66bd71e24..30704f10c 100644 --- a/README.md +++ b/README.md @@ -597,6 +597,11 @@ root |-- Record_Bytes: binary (nullable = false) ``` +You can generate `_corrupted_fields` that will contain original binary values of fields Cobrix was unable to decode: +``` +.option("generate_corrupted_fields", "true") +``` + ### Locality optimization for variable-length records parsing Variable-length records depend on headers to have their length calculated, which makes it hard to achieve parallelism while parsing. @@ -1557,6 +1562,7 @@ The output looks like this: | .option("non_terminals", "GROUP1,GROUP2") | Specifies groups to also be added to the schema as string fields. When this option is specified, the reader will add one extra data field after each matching group containing the string data for the group. | | .option("generate_record_id", false) | Generate autoincremental 'File_Id', 'Record_Id' and 'Record_Byte_Length' fields. This is used for processing record order dependent data. | | .option("generate_record_bytes", false) | Generate 'Record_Bytes', the binary field that contains raw contents of the original unparsed records. | +| .option("generate_corrupted_fields", false) | Generate `_corrupted_fields` field that contains values of fields Cobrix was unable to decode. | | .option("with_input_file_name_col", "file_name") | Generates a column containing input file name for each record (Similar to Spark SQL `input_file_name()` function). The column name is specified by the value of the option. This option only works for variable record length files. For fixed record length and ASCII files use `input_file_name()`. | | .option("metadata", "basic") | Specifies wat kind of metadata to include in the Spark schema: `false`, `basic`(default), or `extended` (PIC, usage, etc). | | .option("debug", "hex") | If specified, each primitive field will be accompanied by a debug field containing raw bytes from the source file. Possible values: `none` (default), `hex`, `binary`, `string` (ASCII only). The legacy value `true` is supported and will generate debug fields in HEX. | @@ -1945,19 +1951,26 @@ at org.apache.hadoop.io.nativeio.NativeIO$POSIX.getStat(NativeIO.java:608) A: Update hadoop dll to version 3.2.2 or newer. ## Changelog +- #### 2.9.8 will be released soon. + - [#723](https://github.com/AbsaOSS/cobrix/pull/723) Added the option to generate `corrupted_fields` field that contains field names and raw values + of fields that Cobrix couldn't decode. + ```scala + .option("generate_corrupted_fields", "true") + ``` + - #### 2.9.7 released 29 January 2026. - - [#816](https://github.com/AbsaOSS/cobrix/pull/816) Fixed the reliance on log4j libraries in the classpath. Cobrix can now be run on clusters that do not use Log4j for logging. + - [#816](https://github.com/AbsaOSS/cobrix/pull/816) Fixed the reliance on log4j libraries in the classpath. Cobrix can now be run on clusters that do not use Log4j for logging. - #### 2.9.6 released 7 January 2026. - - [#813](https://github.com/AbsaOSS/cobrix/pull/813) Fixed compatibility of the relaxed sign overpunching. Allow numbers - with overpunched sign in unsigned numbers and allow multiple digits when overpunched sign when `strict_sign_overpunching = true`. + - [#813](https://github.com/AbsaOSS/cobrix/pull/813) Fixed compatibility of the relaxed sign overpunching. Allow numbers + with overpunched sign in unsigned numbers and allow multiple digits when overpunched sign when `strict_sign_overpunching = true`. - #### 2.9.5 released 22 December 2025. - - [#809](https://github.com/AbsaOSS/cobrix/pull/809) Add support for reading compressed EBCDIC files. All compression - supported by Hadoop (.gz, .bz2, etc) are also supported by Cobrix because Cobrix uses Hadoop compressed streams for - reading such files. - - [#811](https://github.com/AbsaOSS/cobrix/pull/811) Add read properties hash code as index key to avoid false cache. - This makes index caching safe to use by default, so index caching is now turned on by default. + - [#809](https://github.com/AbsaOSS/cobrix/pull/809) Add support for reading compressed EBCDIC files. All compression + supported by Hadoop (.gz, .bz2, etc) are also supported by Cobrix because Cobrix uses Hadoop compressed streams for + reading such files. + - [#811](https://github.com/AbsaOSS/cobrix/pull/811) Add read properties hash code as index key to avoid false cache. + This makes index caching safe to use by default, so index caching is now turned on by default. - #### 2.9.4 released 26 November 2025. - [#805](https://github.com/AbsaOSS/cobrix/pull/805) Added the option to cache VRL indexes for better performance when same files are processed multiple times. ```scala diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala index 08e61ca13..d857632bb 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala @@ -90,7 +90,6 @@ case class Primitive( copy(dependingOnHandlers = newDependingOnHandlers)(parent) } - /** Returns the binary size in bits for the field */ def getBinarySizeBytes: Int = { dataType match { @@ -110,6 +109,29 @@ case class Primitive( * @param record A record in a binary format represented as a vector of bits */ def decodeTypeValue(itOffset: Int, record: Array[Byte]): Any = { + val bytes = getRawValue(itOffset, record) + + if (bytes == null) null else decode(bytes) + } + + def isNull(itOffset: Int, record: Array[Byte]): Boolean = { + val bytes = getRawValue(itOffset, record) + + if (bytes == null) { + true + } else { + var i = 0 + while (i < bytes.length) { + if (bytes(i) != 0) { + return false + } + i += 1 + } + true + } + } + + def getRawValue(itOffset: Int, record: Array[Byte]): Array[Byte] = { val bytesCount = binaryProperties.dataSize val idx = itOffset @@ -132,9 +154,7 @@ case class Primitive( } else { bytesCount } - val bytes = java.util.Arrays.copyOfRange(record, idx, idx + bytesToCopy) - - decode(bytes) + java.util.Arrays.copyOfRange(record, idx, idx + bytesToCopy) } } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptedField.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptedField.scala new file mode 100644 index 000000000..aa2038fd1 --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptedField.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.extractors.record + +case class CorruptedField( + fieldName: String, + rawValue: Array[Byte] + ) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala index efda1f4d3..d7708ce3d 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala @@ -17,15 +17,18 @@ package za.co.absa.cobrix.cobol.reader.extractors.record import za.co.absa.cobrix.cobol.parser.CopybookParser.CopybookAST +import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, COMP9} import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive, Statement} import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy +import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.reflect.ClassTag object RecordExtractors { + private val corruptedFieldsGroup = getCorruptedFieldsGroup /** * This method extracts a record from the specified array of bytes. The copybook for the record needs to be already parsed. @@ -37,6 +40,7 @@ object RecordExtractors { * @param variableLengthOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements. * @param generateRecordId If true, a record id field will be added as the first field of the record. * @param generateRecordBytes If true, a record bytes field will be added at the beginning of each record. + * @param generateCorruptedFields If true, a corrupted record field will be appended to the end of the schema. * @param segmentLevelIds Segment ids to put to the extracted record if id generation it turned on. * @param fileId A file id to be put to the extractor record if generateRecordId == true. * @param recordId The record id to be saved to the record id field. @@ -55,6 +59,7 @@ object RecordExtractors { variableLengthOccurs: Boolean = false, generateRecordId: Boolean = false, generateRecordBytes: Boolean = false, + generateCorruptedFields: Boolean = false, segmentLevelIds: List[String] = Nil, fileId: Int = 0, recordId: Long = 0, @@ -64,6 +69,7 @@ object RecordExtractors { handler: RecordHandler[T] ): Seq[Any] = { val dependFields = scala.collection.mutable.HashMap.empty[String, Either[Int, String]] + val corruptedFields = new ListBuffer[CorruptedField] val isAstFlat = ast.children.exists(_.isInstanceOf[Primitive]) @@ -103,6 +109,9 @@ object RecordExtractors { var j = 0 while (i < actualSize) { val value = s.decodeTypeValue(offset, data) + if (value == null && generateCorruptedFields && !s.isNull(offset, data)) { + corruptedFields += CorruptedField(s"${field.name}[$i]", s.getRawValue(offset,data)) + } offset += s.binaryProperties.dataSize values(j) = value i += 1 @@ -127,6 +136,9 @@ object RecordExtractors { } case st: Primitive => val value = st.decodeTypeValue(useOffset, data) + if (value == null && generateCorruptedFields && !st.isNull(useOffset, data)) { + corruptedFields += CorruptedField(field.name, st.getRawValue(useOffset,data)) + } if (value != null && st.isDependee) { val intStringVal: Either[Int, String] = value match { case v: Int => Left(v) @@ -199,7 +211,7 @@ object RecordExtractors { policy } - applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes, segmentLevelIds, fileId, recordId, data.length, data, generateInputFileField, inputFileName, handler) + applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes, generateCorruptedFields, segmentLevelIds, fileId, recordId, data.length, data, generateInputFileField, inputFileName, corruptedFields.toSeq, handler) } /** @@ -419,7 +431,7 @@ object RecordExtractors { policy } - applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes = false, Nil, fileId, recordId, recordLength, Array.empty[Byte], generateInputFileField = generateInputFileField, inputFileName, handler) + applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes = false, generateCorruptedFields = false, Nil, fileId, recordId, recordLength, Array.empty[Byte], generateInputFileField = generateInputFileField, inputFileName, Seq.empty, handler) } /** @@ -435,15 +447,16 @@ object RecordExtractors { * Combinations of the listed transformations are supported. *

* - * @param ast The parsed copybook - * @param records The array of [[T]] object for each Group of the copybook - * @param generateRecordId If true a record id field will be added as the first field of the record. - * @param generateRecordBytes If true a record bytes field will be added at the beginning of the record. - * @param fileId The file id to be saved to the file id field - * @param recordId The record id to be saved to the record id field - * @param recordByteLength The length of the record - * @param generateInputFileField if true, a field containing input file name will be generated - * @param inputFileName An input file name to put if its generation is needed + * @param ast The parsed copybook + * @param records The array of [[T]] object for each Group of the copybook + * @param generateRecordId If true a record id field will be added as the first field of the record. + * @param generateRecordBytes If true a record bytes field will be added at the beginning of the record. + * @param generateCorruptedFields If true, a corrupted record field will be appended to the end of the schema. + * @param fileId The file id to be saved to the file id field + * @param recordId The record id to be saved to the record id field + * @param recordByteLength The length of the record + * @param generateInputFileField if true, a field containing input file name will be generated + * @param inputFileName An input file name to put if its generation is needed * @return A [[T]] object corresponding to the record schema */ private def applyRecordPostProcessing[T]( @@ -452,6 +465,7 @@ object RecordExtractors { policy: SchemaRetentionPolicy, generateRecordId: Boolean, generateRecordBytes: Boolean, + generateCorruptedFields: Boolean, segmentLevelIds: List[String], fileId: Int, recordId: Long, @@ -459,6 +473,7 @@ object RecordExtractors { recordBytes: Array[Byte], generateInputFileField: Boolean, inputFileName: String, + corruptedFields: Seq[CorruptedField], handler: RecordHandler[T] ): Seq[Any] = { val outputRecords = new ListBuffer[Any] @@ -491,7 +506,22 @@ object RecordExtractors { // Return rows as the original sequence of groups records.foreach(record => outputRecords.append(record)) } + + if (generateCorruptedFields) { + val corruptedFieldsRaw = corruptedFields.map(d => handler.create(Array[Any](d.fieldName, d.rawValue), corruptedFieldsGroup)) + outputRecords.append(corruptedFieldsRaw) + } + // toList() is a constant time operation, and List implements immutable Seq, which is exactly what is needed here. outputRecords.toList } + + private def getCorruptedFieldsGroup: Group = { + val corruptedFieldsInGroup = new mutable.ArrayBuffer[Statement] + + corruptedFieldsInGroup += Primitive(15, "field_name", "field_name", 0, AlphaNumeric("X(50)", 50), decode = null, encode = null)(None) + corruptedFieldsInGroup += Primitive(15, "raw_value", "raw_value", 0, AlphaNumeric("X(50)", 50, compact = Some(COMP9())), decode = null, encode = null)(None) + + Group(10, "_corrupted_fields", "_corrupted_fields", 0, children = corruptedFieldsInGroup )(None) + } } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala index 5e9afe222..8da001b6a 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala @@ -19,7 +19,6 @@ package za.co.absa.cobrix.cobol.reader.iterator import za.co.absa.cobrix.cobol.internal.Logging import za.co.absa.cobrix.cobol.reader.extractors.record.{RecordExtractors, RecordHandler} import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters -import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy import za.co.absa.cobrix.cobol.reader.schema.CobolSchema import za.co.absa.cobrix.cobol.reader.validator.ReaderParametersValidator @@ -91,6 +90,7 @@ class FixedLenNestedRowIterator[T: ClassTag]( readerProperties.schemaPolicy, readerProperties.variableSizeOccurs, generateRecordBytes = readerProperties.generateRecordBytes, + generateCorruptedFields = readerProperties.generateCorruptFields, activeSegmentRedefine = activeSegmentRedefine, handler = handler ) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala index 49e72a80c..0f9a231e1 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala @@ -18,10 +18,10 @@ package za.co.absa.cobrix.cobol.reader.iterator import za.co.absa.cobrix.cobol.parser.Copybook import za.co.absa.cobrix.cobol.parser.headerparsers.RecordHeaderParser -import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor +import za.co.absa.cobrix.cobol.reader.extractors.record.{RecordExtractors, RecordHandler} +import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters import za.co.absa.cobrix.cobol.reader.stream.SimpleStream -import za.co.absa.cobrix.cobol.reader.extractors.record.{RecordHandler, RecordExtractors} import scala.collection.immutable.HashMap import scala.collection.mutable.ListBuffer @@ -99,6 +99,7 @@ final class VarLenNestedIterator[T: ClassTag](cobolSchema: Copybook, readerProperties.variableSizeOccurs, readerProperties.generateRecordId, readerProperties.generateRecordBytes, + readerProperties.generateCorruptFields, segmentLevelIds, fileId, rawRecordIterator.getRecordIndex, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala index c504662ff..f87314ab5 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala @@ -895,6 +895,10 @@ object CobolParametersParser extends Logging { throw new IllegalArgumentException(s"Option '$PARAM_GENERATE_RECORD_BYTES' cannot be used with 'segment-children:*' " + "since hierarchical records are composites of more than one raw record.") } + if (params.contains(PARAM_CORRUPTED_FIELDS)) { + throw new IllegalArgumentException(s"Option '$PARAM_CORRUPTED_FIELDS' cannot be used with 'segment-children:*' " + + "at the moment.") + } } if (!isRecordSequence && recordFormat != AsciiText && recordFormat != CobrixAsciiText && params.contains(PARAM_INPUT_FILE_COLUMN)) { val recordSequenceCondition = s"one of this holds: '$PARAM_RECORD_FORMAT' = V or '$PARAM_RECORD_FORMAT' = VB or '$PARAM_IS_RECORD_SEQUENCE' = true" + diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala index 0692a5878..14c81e3c3 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala @@ -67,11 +67,14 @@ class SparkCobolOptionsBuilder(copybookContent: String)(implicit spark: SparkSes .filter(array => array.nonEmpty && array.length >= minimumRecordLength && array.length <= maximumRecordLength) .map(array => { val record = RecordExtractors.extractRecord[GenericRow](cobolSchema.getCobolSchema.ast, - array, - 0, - schemaRetentionPolicy, - generateRecordBytes = readerParams.generateRecordBytes, - handler = recordHandler) + array, + 0, + schemaRetentionPolicy, + variableLengthOccurs = readerParams.variableSizeOccurs, + generateRecordId = readerParams.generateRecordId, + generateRecordBytes = readerParams.generateRecordBytes, + generateCorruptedFields = readerParams.generateCorruptFields, + handler = recordHandler) Row.fromSeq(record) }) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39CorruptedFieldsSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39CorruptedFieldsSpec.scala new file mode 100644 index 000000000..27c99c4f1 --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39CorruptedFieldsSpec.scala @@ -0,0 +1,127 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.source.integration + +import org.apache.spark.sql.DataFrame +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.PARAM_CORRUPTED_FIELDS +import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase +import za.co.absa.cobrix.spark.cobol.source.fixtures.{BinaryFileFixture, TextComparisonFixture} +import za.co.absa.cobrix.spark.cobol.utils.SparkUtils + +class Test39CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture with TextComparisonFixture { + private val copybook = + """ 01 R. + 03 ID PIC 9(1). + 03 F1 PIC X(1). + 03 F2 PIC S9(1) COMP-3. + 03 F3 PIC 9(3) COMP. + 03 F4 PIC 9(1) OCCURS 3. + """ + + private val data = Array( + 0xF1, 0x40, 0x5C, 0x00, 0x06, 0xF1, 0xF2, 0xF3, // Record OK, no errors + 0xF2, 0xF1, 0xD3, 0x00, 0x05, 0xF4, 0xF5, 0xF6, // COMP-3 error + 0xF3, 0x00, 0x3C, 0xF1, 0x06, 0xF7, 0xF8, 0xF9, // COMP invalid value (negative) + 0xF4, 0xC1, 0x4C, 0x00, 0xA0, 0xC1, 0xF5, 0xA3 // Errors in array + ).map(_.toByte) + + "Corrupted fields record generation" should { + "work when the option is turned on" in { + val expectedSchema = + """root + | |-- ID: integer (nullable = true) + | |-- F1: string (nullable = true) + | |-- F2: integer (nullable = true) + | |-- F3: integer (nullable = true) + | |-- F4: array (nullable = true) + | | |-- element: integer (containsNull = true) + | |-- _corrupted_fields: array (nullable = false) + | | |-- element: struct (containsNull = false) + | | | |-- field_name: string (nullable = false) + | | | |-- raw_value: binary (nullable = false) + |""".stripMargin + + val expectedData = + """[ { + | "ID" : 1, + | "F1" : "", + | "F2" : 5, + | "F3" : 6, + | "F4" : [ 1, 2, 3 ], + | "_corrupted_fields" : [ ] + |}, { + | "ID" : 2, + | "F1" : "1", + | "F3" : 5, + | "F4" : [ 4, 5, 6 ], + | "_corrupted_fields" : [ { + | "field_name" : "F2", + | "raw_value" : "0w==" + | } ] + |}, { + | "ID" : 3, + | "F2" : 3, + | "F3" : 61702, + | "F4" : [ 7, 8, 9 ], + | "_corrupted_fields" : [ ] + |}, { + | "ID" : 4, + | "F1" : "A", + | "F2" : 4, + | "F3" : 160, + | "F4" : [ null, 5, null ], + | "_corrupted_fields" : [ { + | "field_name" : "F4[0]", + | "raw_value" : "wQ==" + | }, { + | "field_name" : "F4[2]", + | "raw_value" : "ow==" + | } ] + |} ] + |""".stripMargin + + withTempBinFile("corrupted_fields1", ".dat", data) { tmpFileName => + val df = getDataFrame(tmpFileName, Map("generate_corrupted_fields" -> "true")) + + val actualSchema = df.schema.treeString + compareTextVertical(actualSchema, expectedSchema) + + val actualData = SparkUtils.convertDataFrameToPrettyJSON(df.orderBy("ID"), 10) + + compareTextVertical(actualData, expectedData) + } + } + + "throw an exception when working with a hierarchical data" in { + val ex = intercept[IllegalArgumentException] { + getDataFrame("/tmp/dummy", Map("generate_corrupted_fields" -> "true", "segment-children:0" -> "COMPANY => DEPT,CUSTOMER")) + } + + assert(ex.getMessage.contains(s"Option '$PARAM_CORRUPTED_FIELDS' cannot be used with 'segment-children:*'")) + } + } + + private def getDataFrame(inputPath: String, extraOptions: Map[String, String] = Map.empty[String, String]): DataFrame = { + spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .options(extraOptions) + .load(inputPath) + } +} From bb08a9ce5967a8c0228939b2c76aec48f6626649 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 10 Feb 2026 10:30:23 +0100 Subject: [PATCH 3/7] #723 Make 0x40 considered empty and not corrupted when checking for null fields, improve performance of generating corrupted fields slightly. --- .../cobrix/cobol/parser/ast/Primitive.scala | 55 +++++++++++++++---- .../extractors/record/RecordExtractors.scala | 4 +- ....scala => Test41CorruptedFieldsSpec.scala} | 10 +++- 3 files changed, 54 insertions(+), 15 deletions(-) rename spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/{Test39CorruptedFieldsSpec.scala => Test41CorruptedFieldsSpec.scala} (92%) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala index d857632bb..34ca9b903 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala @@ -114,23 +114,56 @@ case class Primitive( if (bytes == null) null else decode(bytes) } - def isNull(itOffset: Int, record: Array[Byte]): Boolean = { - val bytes = getRawValue(itOffset, record) + /** + * Checks if a value extracted from a given binary record at a specified offset is considered empty. + * A value is considered empty if it contains only null bytes or bytes equal to 0x40. + * + * @param itOffset The offset within the binary record where the value starts. + * @param record The binary record represented as an array of bytes. + * @return `true` if the value is empty, otherwise `false`. + */ + def isEmpty(itOffset: Int, record: Array[Byte]): Boolean = { + val bytesCount = binaryProperties.dataSize + val idx = itOffset + + if (isString) { + // The length of a string can be smaller for varchar fields at the end of a record + if (idx > record.length) { + return true + } + } else { + // Non-string field size should exactly fix the required bytes + if (idx + bytesCount > record.length) { + return true + } + } - if (bytes == null) { - true + // Determine the actual number of bytes to copy based on the record size. + // Varchar fields can be trimmed by the record size. + val endIndex = if (idx + bytesCount > record.length) { + record.length } else { - var i = 0 - while (i < bytes.length) { - if (bytes(i) != 0) { - return false - } - i += 1 + idx + bytesCount + } + var i = idx + while (i < endIndex) { + if (record(i) != 0 && record(i) != 0x40) { + return false } - true + i += 1 } + true } + /** + * Extracts a raw byte array representation of a value from a binary record + * based on the specified offset and the field's properties. + * + * @param itOffset The offset within the binary record where the value starts. + * @param record The binary record represented as an array of bytes. + * @return An array of bytes representing the value, or `null` if the offset + * or size is invalid for the given binary record. + */ def getRawValue(itOffset: Int, record: Array[Byte]): Array[Byte] = { val bytesCount = binaryProperties.dataSize val idx = itOffset diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala index d7708ce3d..4d26892d5 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala @@ -109,7 +109,7 @@ object RecordExtractors { var j = 0 while (i < actualSize) { val value = s.decodeTypeValue(offset, data) - if (value == null && generateCorruptedFields && !s.isNull(offset, data)) { + if (value == null && generateCorruptedFields && !s.isEmpty(offset, data)) { corruptedFields += CorruptedField(s"${field.name}[$i]", s.getRawValue(offset,data)) } offset += s.binaryProperties.dataSize @@ -136,7 +136,7 @@ object RecordExtractors { } case st: Primitive => val value = st.decodeTypeValue(useOffset, data) - if (value == null && generateCorruptedFields && !st.isNull(useOffset, data)) { + if (value == null && generateCorruptedFields && !st.isEmpty(useOffset, data)) { corruptedFields += CorruptedField(field.name, st.getRawValue(useOffset,data)) } if (value != null && st.isDependee) { diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39CorruptedFieldsSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptedFieldsSpec.scala similarity index 92% rename from spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39CorruptedFieldsSpec.scala rename to spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptedFieldsSpec.scala index 27c99c4f1..4cab7323a 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39CorruptedFieldsSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptedFieldsSpec.scala @@ -23,7 +23,7 @@ import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase import za.co.absa.cobrix.spark.cobol.source.fixtures.{BinaryFileFixture, TextComparisonFixture} import za.co.absa.cobrix.spark.cobol.utils.SparkUtils -class Test39CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture with TextComparisonFixture { +class Test41CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture with TextComparisonFixture { private val copybook = """ 01 R. 03 ID PIC 9(1). @@ -37,7 +37,8 @@ class Test39CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with Bina 0xF1, 0x40, 0x5C, 0x00, 0x06, 0xF1, 0xF2, 0xF3, // Record OK, no errors 0xF2, 0xF1, 0xD3, 0x00, 0x05, 0xF4, 0xF5, 0xF6, // COMP-3 error 0xF3, 0x00, 0x3C, 0xF1, 0x06, 0xF7, 0xF8, 0xF9, // COMP invalid value (negative) - 0xF4, 0xC1, 0x4C, 0x00, 0xA0, 0xC1, 0xF5, 0xA3 // Errors in array + 0xF4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0xF0, // Null values okay + 0xF5, 0xC1, 0x4C, 0x00, 0xA0, 0xC1, 0xF5, 0xA3 // Errors in array ).map(_.toByte) "Corrupted fields record generation" should { @@ -81,6 +82,11 @@ class Test39CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with Bina | "_corrupted_fields" : [ ] |}, { | "ID" : 4, + | "F3" : 0, + | "F4" : [ null, null, 0 ], + | "_corrupted_fields" : [ ] + |}, { + | "ID" : 5, | "F1" : "A", | "F2" : 4, | "F3" : 160, From be408d9f7108992f9c37dc2d912e38e204a318fe Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 10 Feb 2026 14:28:51 +0100 Subject: [PATCH 4/7] #723 Make corrupted fields generation slightly more efficient (CPU and memory wise), fix `PIC X USAGE COMP` cases. --- .../cobol/parser/antlr/ParserVisitor.scala | 6 ++-- .../cobol/parser/ast/datatype/Usage.scala | 8 ++--- .../extractors/record/RecordExtractors.scala | 33 +++++++++++++------ 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/antlr/ParserVisitor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/antlr/ParserVisitor.scala index 063c7efe3..3bb1ed853 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/antlr/ParserVisitor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/antlr/ParserVisitor.scala @@ -16,7 +16,6 @@ package za.co.absa.cobrix.cobol.parser.antlr -import java.nio.charset.Charset import org.antlr.v4.runtime.{ParserRuleContext, RuleContext} import za.co.absa.cobrix.cobol.parser.CopybookParser import za.co.absa.cobrix.cobol.parser.CopybookParser.CopybookAST @@ -25,13 +24,14 @@ import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive} import za.co.absa.cobrix.cobol.parser.common.Constants import za.co.absa.cobrix.cobol.parser.decoders.DecoderSelector import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat -import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage import za.co.absa.cobrix.cobol.parser.encoding._ +import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage import za.co.absa.cobrix.cobol.parser.exceptions.SyntaxErrorException import za.co.absa.cobrix.cobol.parser.policies.CommentPolicy import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy import za.co.absa.cobrix.cobol.parser.position.{Left, Position, Right} +import java.nio.charset.Charset import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.matching.Regex @@ -168,7 +168,7 @@ class ParserVisitor(enc: Encoding, int.copy(compact=usage) case x: AlphaNumeric if usageVal == COMP3U() => Integral(x.pic, x.length*2, None, false, None, Some(COMP3U()), None, x.originalPic) - case x: AlphaNumeric if usageVal == COMP1() || usageVal == COMP4() => + case x: AlphaNumeric if usageVal == COMP4() || usageVal == COMP9() => val enc = if (decodeBinaryAsHex) HEX else RAW x.copy(compact=usage, enc=Some(enc)) case x: AlphaNumeric => diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/datatype/Usage.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/datatype/Usage.scala index 4a304e53b..8cd4869ae 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/datatype/Usage.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/datatype/Usage.scala @@ -19,8 +19,8 @@ package za.co.absa.cobrix.cobol.parser.ast.datatype sealed trait Usage -//case class COMP() extends Usage -//case class COMP0() extends Usage +//case class COMP() extends Usage // Use COMP4() +//case class COMP0() extends Usage // Use COMP4() case class COMP1() extends Usage { override def toString = "COMP-1" } @@ -44,8 +44,8 @@ case class COMP5() extends Usage { case class COMP9() extends Usage { // artificial little-endian binary override def toString = "COMP-9" } -//case class DISPLAY() extends Usage { +//case class DISPLAY() extends Usage { // Use None for the USAGE instead // override def toString = "DISPLAY" //} -//case class BINARY() extends Usage +//case class BINARY() extends Usage // Use COMP4() diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala index 4d26892d5..daaa46e4a 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala @@ -17,8 +17,9 @@ package za.co.absa.cobrix.cobol.reader.extractors.record import za.co.absa.cobrix.cobol.parser.CopybookParser.CopybookAST -import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, COMP9} +import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, COMP4} import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive, Statement} +import za.co.absa.cobrix.cobol.parser.encoding.RAW import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy @@ -69,7 +70,7 @@ object RecordExtractors { handler: RecordHandler[T] ): Seq[Any] = { val dependFields = scala.collection.mutable.HashMap.empty[String, Either[Int, String]] - val corruptedFields = new ListBuffer[CorruptedField] + val corruptedFields = new ArrayBuffer[CorruptedField] val isAstFlat = ast.children.exists(_.isInstanceOf[Primitive]) @@ -211,7 +212,7 @@ object RecordExtractors { policy } - applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes, generateCorruptedFields, segmentLevelIds, fileId, recordId, data.length, data, generateInputFileField, inputFileName, corruptedFields.toSeq, handler) + applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes, generateCorruptedFields, segmentLevelIds, fileId, recordId, data.length, data, generateInputFileField, inputFileName, corruptedFields, handler) } /** @@ -431,7 +432,7 @@ object RecordExtractors { policy } - applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes = false, generateCorruptedFields = false, Nil, fileId, recordId, recordLength, Array.empty[Byte], generateInputFileField = generateInputFileField, inputFileName, Seq.empty, handler) + applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes = false, generateCorruptedFields = false, Nil, fileId, recordId, recordLength, Array.empty[Byte], generateInputFileField = generateInputFileField, inputFileName, null, handler) } /** @@ -473,7 +474,7 @@ object RecordExtractors { recordBytes: Array[Byte], generateInputFileField: Boolean, inputFileName: String, - corruptedFields: Seq[CorruptedField], + corruptedFields: ArrayBuffer[CorruptedField], handler: RecordHandler[T] ): Seq[Any] = { val outputRecords = new ListBuffer[Any] @@ -507,21 +508,33 @@ object RecordExtractors { records.foreach(record => outputRecords.append(record)) } - if (generateCorruptedFields) { - val corruptedFieldsRaw = corruptedFields.map(d => handler.create(Array[Any](d.fieldName, d.rawValue), corruptedFieldsGroup)) - outputRecords.append(corruptedFieldsRaw) + if (generateCorruptedFields && corruptedFields != null) { + // Ugly but efficient implementation of converting errors as an array field + val len = corruptedFields.length + val ar = new Array[Any](len) + var i = 0 + while (i < len) { + val r = handler.create(Array[Any](corruptedFields(i).fieldName, corruptedFields(i).rawValue), corruptedFieldsGroup) + ar(i) = r + i += 1 + } + outputRecords.append(ar) } // toList() is a constant time operation, and List implements immutable Seq, which is exactly what is needed here. outputRecords.toList } + /** + * Constructs a Group object representing corrupted fields. It is only needed for constructing records that require field names, + * such as JSON. Field sizes and encoding do not really matter + */ private def getCorruptedFieldsGroup: Group = { val corruptedFieldsInGroup = new mutable.ArrayBuffer[Statement] corruptedFieldsInGroup += Primitive(15, "field_name", "field_name", 0, AlphaNumeric("X(50)", 50), decode = null, encode = null)(None) - corruptedFieldsInGroup += Primitive(15, "raw_value", "raw_value", 0, AlphaNumeric("X(50)", 50, compact = Some(COMP9())), decode = null, encode = null)(None) + corruptedFieldsInGroup += Primitive(15, "raw_value", "raw_value", 0, AlphaNumeric("X(50)", 50, enc = Some(RAW), compact = Some(COMP4())), decode = null, encode = null)(None) - Group(10, "_corrupted_fields", "_corrupted_fields", 0, children = corruptedFieldsInGroup )(None) + Group(10, "_corrupted_fields", "_corrupted_fields", 0, children = corruptedFieldsInGroup, occurs = Some(10))(None) } } From 3d1a9a8775f5ab92fe545c03f202a0cabba4cc98 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 11 Feb 2026 07:29:44 +0100 Subject: [PATCH 5/7] Refactor CobolSchema to use builder pattern for improved readability and flexibility in schema creation. --- .../spark/cobol/schema/CobolSchema.scala | 99 ++++++- .../cobol/CobolSchemaHierarchicalSpec.scala | 3 +- .../cobrix/spark/cobol/CobolSchemaSpec.scala | 246 ++++++++++++++++-- .../cobol/source/base/SparkSchemaSpec.scala | 4 +- .../source/base/impl/DummyCobolSchema.scala | 18 +- .../text/Test02TextFilesOldSchool.scala | 8 +- 6 files changed, 336 insertions(+), 42 deletions(-) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala index c5d441a19..ba3d842a5 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala @@ -52,15 +52,15 @@ import scala.collection.mutable.ArrayBuffer */ class CobolSchema(copybook: Copybook, schemaRetentionPolicy: SchemaRetentionPolicy, - isDisplayAlwaysString: Boolean = false, - strictIntegralPrecision: Boolean = false, - inputFileNameField: String = "", - generateRecordId: Boolean = false, - generateRecordBytes: Boolean = false, - generateCorruptedFields: Boolean = false, - generateSegIdFieldsCnt: Int = 0, - segmentIdProvidedPrefix: String = "", - metadataPolicy: MetadataPolicy = MetadataPolicy.Basic) + isDisplayAlwaysString: Boolean, + strictIntegralPrecision: Boolean, + inputFileNameField: String, + generateRecordId: Boolean, + generateRecordBytes: Boolean, + generateCorruptedFields: Boolean, + generateSegIdFieldsCnt: Int, + segmentIdProvidedPrefix: String, + metadataPolicy: MetadataPolicy) extends CobolReaderSchema(copybook, schemaRetentionPolicy, isDisplayAlwaysString, @@ -337,4 +337,85 @@ object CobolSchema { CobolSchema.fromBaseReader(CobolReaderSchema.fromReaderParameters(copyBookContents, readerParameters)) } + + def builder(copybook: Copybook): CobolSchemaBuilder = new CobolSchemaBuilder(copybook) + + class CobolSchemaBuilder(copybook: Copybook) { + private var schemaRetentionPolicy: SchemaRetentionPolicy = SchemaRetentionPolicy.CollapseRoot + private var isDisplayAlwaysString: Boolean = false + private var strictIntegralPrecision: Boolean = false + private var inputFileNameField: String = "" + private var generateRecordId: Boolean = false + private var generateRecordBytes: Boolean = false + private var generateCorruptedFields: Boolean = false + private var generateSegIdFieldsCnt: Int = 0 + private var segmentIdProvidedPrefix: String = "" + private var metadataPolicy: MetadataPolicy = MetadataPolicy.Basic + + def withSchemaRetentionPolicy(schemaRetentionPolicy: SchemaRetentionPolicy): CobolSchemaBuilder = { + this.schemaRetentionPolicy = schemaRetentionPolicy + this + } + + def withIsDisplayAlwaysString(isDisplayAlwaysString: Boolean): CobolSchemaBuilder = { + this.isDisplayAlwaysString = isDisplayAlwaysString + this + } + + def withStrictIntegralPrecision(strictIntegralPrecision: Boolean): CobolSchemaBuilder = { + this.strictIntegralPrecision = strictIntegralPrecision + this + } + + def withInputFileNameField(inputFileNameField: String): CobolSchemaBuilder = { + this.inputFileNameField = inputFileNameField + this + } + + def withGenerateRecordId(generateRecordId: Boolean): CobolSchemaBuilder = { + this.generateRecordId = generateRecordId + this + } + + def withGenerateRecordBytes(generateRecordBytes: Boolean): CobolSchemaBuilder = { + this.generateRecordBytes = generateRecordBytes + this + } + + def withGenerateCorruptedFields(generateCorruptedFields: Boolean): CobolSchemaBuilder = { + this.generateCorruptedFields = generateCorruptedFields + this + } + + def withGenerateSegIdFieldsCnt(generateSegIdFieldsCnt: Int): CobolSchemaBuilder = { + this.generateSegIdFieldsCnt = generateSegIdFieldsCnt + this + } + + def withSegmentIdProvidedPrefix(segmentIdProvidedPrefix: String): CobolSchemaBuilder = { + this.segmentIdProvidedPrefix = segmentIdProvidedPrefix + this + } + + def withMetadataPolicy(metadataPolicy: MetadataPolicy): CobolSchemaBuilder = { + this.metadataPolicy = metadataPolicy + this + } + + def build(): CobolSchema = { + new CobolSchema( + copybook, + schemaRetentionPolicy, + isDisplayAlwaysString, + strictIntegralPrecision, + inputFileNameField, + generateRecordId, + generateRecordBytes, + generateCorruptedFields, + generateSegIdFieldsCnt, + segmentIdProvidedPrefix, + metadataPolicy + ) + } + } } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaHierarchicalSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaHierarchicalSpec.scala index 27d8c44e8..0d530bfb2 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaHierarchicalSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaHierarchicalSpec.scala @@ -18,7 +18,6 @@ package za.co.absa.cobrix.spark.cobol import org.scalatest.wordspec.AnyWordSpec import za.co.absa.cobrix.cobol.parser.CopybookParser -import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy import za.co.absa.cobrix.spark.cobol.schema.CobolSchema import scala.collection.immutable.HashMap @@ -102,6 +101,6 @@ class CobolSchemaHierarchicalSpec extends AnyWordSpec { private def parseSchema(copybook: String, segmentRedefines: List[String], fieldParentMap: Map[String, String]): CobolSchema = { val parsedSchema = CopybookParser.parseTree(copybook, segmentRedefines = segmentRedefines, fieldParentMap = fieldParentMap) - new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "",false, false) + CobolSchema.builder(parsedSchema).build() } } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala index 81f73e9c9..5bdf38643 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala @@ -20,6 +20,7 @@ import org.apache.spark.sql.types._ import org.scalatest.wordspec.AnyWordSpec import org.slf4j.{Logger, LoggerFactory} import za.co.absa.cobrix.cobol.parser.CopybookParser +import za.co.absa.cobrix.cobol.parser.policies.MetadataPolicy import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy import za.co.absa.cobrix.spark.cobol.parameters.MetadataFields.MAX_LENGTH import za.co.absa.cobrix.spark.cobol.schema.CobolSchema @@ -55,7 +56,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBookContents) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", false, false) + val cobolSchema = CobolSchema.builder(parsedSchema).build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -73,7 +74,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBookContents) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, true, "", false, false) + val cobolSchema = CobolSchema.builder(parsedSchema).withStrictIntegralPrecision(true).build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -94,7 +95,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBookContents) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", true, false) + val cobolSchema = CobolSchema.builder(parsedSchema).withGenerateRecordId(true).build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -113,7 +114,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBookContents) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", false, true) + val cobolSchema = CobolSchema.builder(parsedSchema).withGenerateRecordBytes(true).build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -135,7 +136,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBookContents) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", true, true) + val cobolSchema = CobolSchema.builder(parsedSchema).withGenerateRecordId(true).withGenerateRecordBytes(true).build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -163,7 +164,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", true, false) + val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal).withGenerateRecordId(true).build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -179,7 +180,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, false) + val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal).build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -196,7 +197,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", true, false) + val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot).withGenerateRecordId(true).build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -211,7 +212,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", false, true) + val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot).withGenerateRecordBytes(true).build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -229,7 +230,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", true, true) + val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot).withGenerateRecordBytes(true).withGenerateRecordId(true).build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -243,7 +244,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", false, false) + val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot).build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -272,7 +273,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", true, false, false, 2) + val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal).withGenerateRecordId(true).withGenerateSegIdFieldsCnt(2).build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -290,7 +291,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, true, false, 2) + val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal).withGenerateRecordBytes(true).withGenerateSegIdFieldsCnt(2).build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -311,7 +312,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | | | |-- raw_value: binary (nullable = false) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, false, true, 2) + val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal).withGenerateCorruptedFields(true).withGenerateSegIdFieldsCnt(2).build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -332,7 +333,12 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", true, true, false, 2) + val cobolSchema = CobolSchema.builder(parsedSchema) + .withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal) + .withGenerateRecordId(true) + .withGenerateRecordBytes(true) + .withGenerateSegIdFieldsCnt(2) + .build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -353,7 +359,13 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, true, "", true, true, false, 2) + val cobolSchema = CobolSchema.builder(parsedSchema) + .withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal) + .withStrictIntegralPrecision(true) + .withGenerateRecordId(true) + .withGenerateRecordBytes(true) + .withGenerateSegIdFieldsCnt(2) + .build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -370,7 +382,11 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, false, false, 2) + val cobolSchema = CobolSchema.builder(parsedSchema) + .withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal) + .withGenerateSegIdFieldsCnt(2) + .build() + val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -388,7 +404,11 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", true, false, false, 2) + val cobolSchema = CobolSchema.builder(parsedSchema) + .withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot) + .withGenerateRecordId(true) + .withGenerateSegIdFieldsCnt(2) + .build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -403,7 +423,11 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | |-- STR_FLD: string (nullable = true) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", false, false, false, 2) + val cobolSchema = CobolSchema.builder(parsedSchema) + .withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot) + .withGenerateSegIdFieldsCnt(2) + .build() + val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -422,7 +446,8 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema1 = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, false) + val cobolSchema1 = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal).build() + val actualSparkSchema = cobolSchema1.getSparkSchema val rootField = actualSparkSchema.fields.head.dataType.asInstanceOf[StructType] @@ -451,7 +476,10 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema1 = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", false, false) + val cobolSchema1 = CobolSchema.builder(parsedSchema) + .withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot) + .build() + val actualSparkSchema = cobolSchema1.getSparkSchema val metadataStr1 = actualSparkSchema.fields.head.metadata @@ -485,7 +513,12 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | |-- NUM3: decimal(9,8) (nullable = true)""".stripMargin val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, true, false, "", false, false, false, 2) + val cobolSchema = CobolSchema.builder(parsedSchema) + .withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot) + .withIsDisplayAlwaysString(true) + .withGenerateSegIdFieldsCnt(2) + .build() + val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -707,4 +740,173 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { assert(cobolSchema.getMaximumSegmentIdLength("ID_") == 53) } } + + "builder" should { + "create schema with default settings using builder" in { + val copybook: String = + """ 01 RECORD. + | 05 STR1 PIC X(10). + | 05 NUM2 PIC 9(7). + |""".stripMargin + + val parsedCopybook = CopybookParser.parse(copybook) + val cobolSchema = CobolSchema.builder(parsedCopybook) + .build() + + val sparkSchema = cobolSchema.getSparkSchema + + assert(sparkSchema.fields.length == 2) + assert(sparkSchema.fields.head.name == "STR1") + assert(sparkSchema.fields.head.dataType == StringType) + assert(sparkSchema.fields(1).name == "NUM2") + assert(sparkSchema.fields(1).dataType == IntegerType) + } + + "create schema with strict integral precision using builder" in { + val copybook: String = + """ 01 RECORD. + | 05 NUM1 PIC 9(12). + | 05 NUM2 PIC S9(7). + |""".stripMargin + + val parsedCopybook = CopybookParser.parse(copybook) + val cobolSchema = CobolSchema.builder(parsedCopybook) + .withStrictIntegralPrecision(true) + .build() + + val sparkSchema = cobolSchema.getSparkSchema + + assert(sparkSchema.fields.length == 2) + assert(sparkSchema.fields.head.name == "NUM1") + assert(sparkSchema.fields.head.dataType == DecimalType(12, 0)) + assert(sparkSchema.fields(1).name == "NUM2") + assert(sparkSchema.fields(1).dataType == DecimalType(7, 0)) + } + + "create schema with record id generation using builder" in { + val copybook: String = + """ 01 RECORD. + | 05 STR1 PIC X(10). + |""".stripMargin + + val parsedCopybook = CopybookParser.parse(copybook) + val cobolSchema = CobolSchema.builder(parsedCopybook) + .withGenerateRecordId(true) + .build() + + val sparkSchema = cobolSchema.getSparkSchema + + assert(sparkSchema.fields.length == 4) + assert(sparkSchema.fields.head.name == "File_Id") + assert(sparkSchema.fields.head.dataType == IntegerType) + assert(sparkSchema.fields(1).name == "Record_Id") + assert(sparkSchema.fields(1).dataType == LongType) + assert(sparkSchema.fields(2).name == "Record_Byte_Length") + assert(sparkSchema.fields(2).dataType == IntegerType) + assert(sparkSchema.fields(3).name == "STR1") + } + + "create schema with record bytes generation using builder" in { + val copybook: String = + """ 01 RECORD. + | 05 STR1 PIC X(10). + |""".stripMargin + val parsedCopybook = CopybookParser.parse(copybook) + val cobolSchema = CobolSchema.builder(parsedCopybook) + .withGenerateRecordBytes(true) + .build() + + val sparkSchema = cobolSchema.getSparkSchema + + assert(sparkSchema.fields.length == 2) + assert(sparkSchema.fields.head.name == "Record_Bytes") + assert(sparkSchema.fields.head.dataType == BinaryType) + assert(sparkSchema.fields(1).name == "STR1") + } + + "create schema with both record id and bytes using builder" in { + val copybook: String = + """ 01 RECORD. + | 05 STR1 PIC X(10). + |""".stripMargin + + val parsedCopybook = CopybookParser.parse(copybook) + val cobolSchema = CobolSchema.builder(parsedCopybook) + .withGenerateRecordId(true) + .withGenerateRecordBytes(true) + .build() + + val sparkSchema = cobolSchema.getSparkSchema + + assert(sparkSchema.fields.length == 5) + assert(sparkSchema.fields.head.name == "File_Id") + assert(sparkSchema.fields(1).name == "Record_Id") + assert(sparkSchema.fields(2).name == "Record_Byte_Length") + assert(sparkSchema.fields(3).name == "Record_Bytes") + assert(sparkSchema.fields(3).dataType == BinaryType) + assert(sparkSchema.fields(4).name == "STR1") + } + + "create schema with schema retention policy using builder" in { + val copybook: String = + """ 01 STRUCT1. + | 05 IntValue PIC 9(6) COMP. + | 01 STRUCT2. + | 10 STR-FLD PIC X(10). + |""".stripMargin + + val parsedCopybook = CopybookParser.parse(copybook) + val cobolSchema = CobolSchema.builder(parsedCopybook) + .withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal) + .build() + + val sparkSchema = cobolSchema.getSparkSchema + + assert(sparkSchema.fields.length == 2) + assert(sparkSchema.fields.head.name == "STRUCT1") + assert(sparkSchema.fields.head.dataType.isInstanceOf[StructType]) + assert(sparkSchema.fields(1).name == "STRUCT2") + assert(sparkSchema.fields(1).dataType.isInstanceOf[StructType]) + } + + "create schema with corrupted fields using builder" in { + val copybook: String = + """ 01 RECORD. + | 05 STR1 PIC X(10). + |""".stripMargin + val parsedCopybook = CopybookParser.parse(copybook) + val cobolSchema = CobolSchema.builder(parsedCopybook) + .withGenerateCorruptedFields(true) + .build() + + val sparkSchema = cobolSchema.getSparkSchema + + assert(sparkSchema.fields.length == 2) + assert(sparkSchema.fields(1).name == "_corrupted_fields") + assert(sparkSchema.fields(1).dataType.isInstanceOf[ArrayType]) + } + + "create schema with various options" in { + val copybook: String = + """ 01 RECORD. + | 05 NUM1 PIC 9(10). + |""".stripMargin + val parsedCopybook = CopybookParser.parse(copybook) + val cobolSchema = CobolSchema.builder(parsedCopybook) + .withIsDisplayAlwaysString(true) + .withInputFileNameField("file_name") + .withGenerateSegIdFieldsCnt(1) + .withSegmentIdProvidedPrefix("segid") + .withMetadataPolicy(MetadataPolicy.Extended) + .build() + + val sparkSchema = cobolSchema.getSparkSchema + + assert(sparkSchema.fields.length == 3) + assert(sparkSchema.fields(0).name == "file_name") + assert(sparkSchema.fields(1).name == "Seg_Id0") + assert(sparkSchema.fields(2).name == "NUM1") + assert(sparkSchema.fields(2).dataType.isInstanceOf[StringType]) + } + } } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/SparkSchemaSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/SparkSchemaSpec.scala index 3731efa8a..9ee30bbb3 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/SparkSchemaSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/SparkSchemaSpec.scala @@ -37,7 +37,9 @@ class SparkSchemaSpec extends AnyFunSuite { val parsedSchema = CopybookParser.parseTree(copyBookContents) - val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "",false, false) + val cobolSchema = CobolSchema.builder(parsedSchema) + .withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot) + .build() val sparkSchema = cobolSchema.getSparkSchema diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyCobolSchema.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyCobolSchema.scala index 77fb982b1..790420cad 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyCobolSchema.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyCobolSchema.scala @@ -17,14 +17,24 @@ package za.co.absa.cobrix.spark.cobol.source.base.impl import org.apache.spark.sql.types.StructType -import za.co.absa.cobrix.spark.cobol.schema.CobolSchema import za.co.absa.cobrix.cobol.parser.Copybook import za.co.absa.cobrix.cobol.parser.ast.Group +import za.co.absa.cobrix.cobol.parser.policies.MetadataPolicy import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy +import za.co.absa.cobrix.spark.cobol.schema.CobolSchema -import scala.collection.Seq - -class DummyCobolSchema(val sparkSchema: StructType) extends CobolSchema(new Copybook(Group.root), SchemaRetentionPolicy.KeepOriginal, false, false, "", false, false) with Serializable { +class DummyCobolSchema(val sparkSchema: StructType) extends CobolSchema( + new Copybook(Group.root), + SchemaRetentionPolicy.KeepOriginal, + false, + false, + "", + false, + false, + false, + 0, + "", + MetadataPolicy.Basic) with Serializable { override def getSparkSchema: StructType = sparkSchema override lazy val getRecordSize: Int = 40 diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/text/Test02TextFilesOldSchool.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/text/Test02TextFilesOldSchool.scala index 746a41c19..71c3cf4bc 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/text/Test02TextFilesOldSchool.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/text/Test02TextFilesOldSchool.scala @@ -16,8 +16,6 @@ package za.co.absa.cobrix.spark.cobol.source.text -import java.nio.charset.StandardCharsets - import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.expressions.GenericRow import org.scalatest.funsuite.AnyFunSuite @@ -32,6 +30,8 @@ import za.co.absa.cobrix.spark.cobol.schema.CobolSchema import za.co.absa.cobrix.spark.cobol.source.base.{SimpleComparisonBase, SparkTestBase} import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture +import java.nio.charset.StandardCharsets + class Test02TextFilesOldSchool extends AnyFunSuite with SparkTestBase with BinaryFileFixture with SimpleComparisonBase { private implicit val logger: Logger = LoggerFactory.getLogger(this.getClass) @@ -52,7 +52,7 @@ class Test02TextFilesOldSchool extends AnyFunSuite with SparkTestBase with Binar withTempTextFile("text_ascii", ".txt", StandardCharsets.UTF_8, textFileContent) { tmpFileName => val parsedCopybook = CopybookParser.parse(copybook, dataEncoding = ASCII, stringTrimmingPolicy = StringTrimmingPolicy.TrimNone) - val cobolSchema = new CobolSchema(parsedCopybook, SchemaRetentionPolicy.CollapseRoot, false, false, "", false) + val cobolSchema = CobolSchema.builder(parsedCopybook).build() val sparkSchema = cobolSchema.getSparkSchema val rddText = spark.sparkContext @@ -61,7 +61,7 @@ class Test02TextFilesOldSchool extends AnyFunSuite with SparkTestBase with Binar val recordHandler = new RowHandler() val rddRow = rddText - .filter(str => str.length > 0) + .filter(str => str.nonEmpty) .map(str => { val record = RecordExtractors.extractRecord[GenericRow](parsedCopybook.ast, str.getBytes(), From 620daebd2f63eb205cd029551cce2538e1c40156 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 11 Feb 2026 09:28:45 +0100 Subject: [PATCH 6/7] #723 Fix numerous PR suggestions about corrupt field generation. --- README.md | 12 +-- .../cobrix/cobol/parser/ast/Primitive.scala | 44 ++++++++- .../cobol/parser/common/Constants.scala | 2 +- ...orruptedField.scala => CorruptField.scala} | 8 +- .../extractors/record/RecordExtractors.scala | 98 +++++++++---------- .../iterator/FixedLenNestedRowIterator.scala | 2 +- .../reader/parameters/CobolParameters.scala | 2 +- .../parameters/CobolParametersParser.scala | 12 +-- .../cobol/reader/schema/CobolSchema.scala | 4 +- .../builder/SparkCobolOptionsBuilder.scala | 2 +- .../spark/cobol/schema/CobolSchema.scala | 24 ++--- .../cobrix/spark/cobol/CobolSchemaSpec.scala | 12 +-- .../integration/Test17HierarchicalSpec.scala | 9 +- .../Test35GeneratedRecordBytes.scala | 3 +- ...ec.scala => Test41CorruptFieldsSpec.scala} | 26 ++--- 15 files changed, 150 insertions(+), 110 deletions(-) rename cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/{CorruptedField.scala => CorruptField.scala} (81%) rename spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/{Test41CorruptedFieldsSpec.scala => Test41CorruptFieldsSpec.scala} (83%) diff --git a/README.md b/README.md index 30704f10c..a16b56bfd 100644 --- a/README.md +++ b/README.md @@ -597,9 +597,9 @@ root |-- Record_Bytes: binary (nullable = false) ``` -You can generate `_corrupted_fields` that will contain original binary values of fields Cobrix was unable to decode: -``` -.option("generate_corrupted_fields", "true") +You can generate `_corrupt_fields` that will contain original binary values of fields Cobrix was unable to decode: +```scala +.option("generate_corrupt_fields", "true") ``` ### Locality optimization for variable-length records parsing @@ -1562,7 +1562,7 @@ The output looks like this: | .option("non_terminals", "GROUP1,GROUP2") | Specifies groups to also be added to the schema as string fields. When this option is specified, the reader will add one extra data field after each matching group containing the string data for the group. | | .option("generate_record_id", false) | Generate autoincremental 'File_Id', 'Record_Id' and 'Record_Byte_Length' fields. This is used for processing record order dependent data. | | .option("generate_record_bytes", false) | Generate 'Record_Bytes', the binary field that contains raw contents of the original unparsed records. | -| .option("generate_corrupted_fields", false) | Generate `_corrupted_fields` field that contains values of fields Cobrix was unable to decode. | +| .option("generate_corrupt_fields", false) | Generate `_corrupt_fields` field that contains values of fields Cobrix was unable to decode. | | .option("with_input_file_name_col", "file_name") | Generates a column containing input file name for each record (Similar to Spark SQL `input_file_name()` function). The column name is specified by the value of the option. This option only works for variable record length files. For fixed record length and ASCII files use `input_file_name()`. | | .option("metadata", "basic") | Specifies wat kind of metadata to include in the Spark schema: `false`, `basic`(default), or `extended` (PIC, usage, etc). | | .option("debug", "hex") | If specified, each primitive field will be accompanied by a debug field containing raw bytes from the source file. Possible values: `none` (default), `hex`, `binary`, `string` (ASCII only). The legacy value `true` is supported and will generate debug fields in HEX. | @@ -1952,10 +1952,10 @@ A: Update hadoop dll to version 3.2.2 or newer. ## Changelog - #### 2.9.8 will be released soon. - - [#723](https://github.com/AbsaOSS/cobrix/pull/723) Added the option to generate `corrupted_fields` field that contains field names and raw values + - [#723](https://github.com/AbsaOSS/cobrix/pull/723) Added the option to generate `corrupt_fields` field that contains field names and raw values of fields that Cobrix couldn't decode. ```scala - .option("generate_corrupted_fields", "true") + .option("generate_corrupt_fields", "true") ``` - #### 2.9.7 released 29 January 2026. diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala index 34ca9b903..1019dcc7c 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala @@ -16,9 +16,9 @@ package za.co.absa.cobrix.cobol.parser.ast -import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, CobolType, Decimal, Integral} +import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, COMP3, CobolType, Decimal, Integral} import za.co.absa.cobrix.cobol.parser.decoders.{BinaryUtils, DecoderSelector} -import za.co.absa.cobrix.cobol.parser.encoding.EncoderSelector +import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC, EncoderSelector} /** An abstraction of the statements describing fields of primitive data types in the COBOL copybook * @@ -63,6 +63,44 @@ case class Primitive( /** This is cached value specifying if the field is a string */ private val isString = dataType.isInstanceOf[AlphaNumeric] + /** This is cached value to speedup checking for empty values */ + private val spaceChar: Byte = { + dataType match { + case t: AlphaNumeric => + t.enc match { + case Some(EBCDIC) => 0x40 + case Some(ASCII) => 0x20 + case Some(_) => 0 + case None => 0x40 + } + case t: Integral => + t.compact match { + case Some(COMP3()) => 0x40 + case Some(_) => 0 + case None => + t.enc match { + case Some(EBCDIC) => 0x40 + case Some(ASCII) => 0x20 + case Some(_) => 0 + case None => 0x40 + } + } + case t: Decimal => + t.compact match { + case Some(COMP3()) => 0x40 + case Some(_) => 0 + case None => + t.enc match { + case Some(EBCDIC) => 0x40 + case Some(ASCII) => 0x20 + case Some(_) => 0 + case None => 0x40 + } + } + case _ => 0 + } + } + /** Returns a string representation of the field */ override def toString: String = { s"${" " * 2 * level}$camelCased ${camelCase(redefines.getOrElse(""))} $dataType" @@ -147,7 +185,7 @@ case class Primitive( } var i = idx while (i < endIndex) { - if (record(i) != 0 && record(i) != 0x40) { + if (record(i) != 0 && record(i) != spaceChar) { return false } i += 1 diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala index 7f18fd3af..979ee7790 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala @@ -64,7 +64,7 @@ object Constants { val recordIdField = "Record_Id" val recordByteLength = "Record_Byte_Length" val recordBytes = "Record_Bytes" - val corruptedFieldsField = "_corrupted_fields" + val corruptFieldsField = "_corrupt_fields" val fieldNameColumn = "field_name" val rawValueColumn = "raw_value" diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptedField.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptField.scala similarity index 81% rename from cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptedField.scala rename to cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptField.scala index aa2038fd1..a6bf54347 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptedField.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptField.scala @@ -16,7 +16,7 @@ package za.co.absa.cobrix.cobol.reader.extractors.record -case class CorruptedField( - fieldName: String, - rawValue: Array[Byte] - ) +case class CorruptField( + fieldName: String, + rawValue: Array[Byte] + ) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala index daaa46e4a..fdabc52ee 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala @@ -29,7 +29,7 @@ import scala.reflect.ClassTag object RecordExtractors { - private val corruptedFieldsGroup = getCorruptedFieldsGroup + private val corruptFieldsGroup = getCorruptFieldsGroup /** * This method extracts a record from the specified array of bytes. The copybook for the record needs to be already parsed. @@ -41,7 +41,7 @@ object RecordExtractors { * @param variableLengthOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements. * @param generateRecordId If true, a record id field will be added as the first field of the record. * @param generateRecordBytes If true, a record bytes field will be added at the beginning of each record. - * @param generateCorruptedFields If true, a corrupted record field will be appended to the end of the schema. + * @param generateCorruptFields If true, a corrupt fields field will be appended to the end of the schema. * @param segmentLevelIds Segment ids to put to the extracted record if id generation it turned on. * @param fileId A file id to be put to the extractor record if generateRecordId == true. * @param recordId The record id to be saved to the record id field. @@ -53,24 +53,24 @@ object RecordExtractors { */ @throws(classOf[IllegalStateException]) def extractRecord[T: ClassTag]( - ast: CopybookAST, - data: Array[Byte], - offsetBytes: Int = 0, - policy: SchemaRetentionPolicy = SchemaRetentionPolicy.KeepOriginal, - variableLengthOccurs: Boolean = false, - generateRecordId: Boolean = false, - generateRecordBytes: Boolean = false, - generateCorruptedFields: Boolean = false, - segmentLevelIds: List[String] = Nil, - fileId: Int = 0, - recordId: Long = 0, - activeSegmentRedefine: String = "", - generateInputFileField: Boolean = false, - inputFileName: String = "", - handler: RecordHandler[T] + ast: CopybookAST, + data: Array[Byte], + offsetBytes: Int = 0, + policy: SchemaRetentionPolicy = SchemaRetentionPolicy.KeepOriginal, + variableLengthOccurs: Boolean = false, + generateRecordId: Boolean = false, + generateRecordBytes: Boolean = false, + generateCorruptFields: Boolean = false, + segmentLevelIds: List[String] = Nil, + fileId: Int = 0, + recordId: Long = 0, + activeSegmentRedefine: String = "", + generateInputFileField: Boolean = false, + inputFileName: String = "", + handler: RecordHandler[T] ): Seq[Any] = { val dependFields = scala.collection.mutable.HashMap.empty[String, Either[Int, String]] - val corruptedFields = new ArrayBuffer[CorruptedField] + val corruptFields = new ArrayBuffer[CorruptField] val isAstFlat = ast.children.exists(_.isInstanceOf[Primitive]) @@ -110,8 +110,8 @@ object RecordExtractors { var j = 0 while (i < actualSize) { val value = s.decodeTypeValue(offset, data) - if (value == null && generateCorruptedFields && !s.isEmpty(offset, data)) { - corruptedFields += CorruptedField(s"${field.name}[$i]", s.getRawValue(offset,data)) + if (value == null && generateCorruptFields && !s.isEmpty(offset, data)) { + corruptFields += CorruptField(s"${field.name}[$i]", s.getRawValue(offset,data)) } offset += s.binaryProperties.dataSize values(j) = value @@ -137,8 +137,8 @@ object RecordExtractors { } case st: Primitive => val value = st.decodeTypeValue(useOffset, data) - if (value == null && generateCorruptedFields && !st.isEmpty(useOffset, data)) { - corruptedFields += CorruptedField(field.name, st.getRawValue(useOffset,data)) + if (value == null && generateCorruptFields && !st.isEmpty(useOffset, data)) { + corruptFields += CorruptField(field.name, st.getRawValue(useOffset,data)) } if (value != null && st.isDependee) { val intStringVal: Either[Int, String] = value match { @@ -212,7 +212,7 @@ object RecordExtractors { policy } - applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes, generateCorruptedFields, segmentLevelIds, fileId, recordId, data.length, data, generateInputFileField, inputFileName, corruptedFields, handler) + applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes, generateCorruptFields, segmentLevelIds, fileId, recordId, data.length, data, generateInputFileField, inputFileName, corruptFields, handler) } /** @@ -432,7 +432,7 @@ object RecordExtractors { policy } - applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes = false, generateCorruptedFields = false, Nil, fileId, recordId, recordLength, Array.empty[Byte], generateInputFileField = generateInputFileField, inputFileName, null, handler) + applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes = false, generateCorruptFields = false, Nil, fileId, recordId, recordLength, Array.empty[Byte], generateInputFileField = generateInputFileField, inputFileName, null, handler) } /** @@ -452,7 +452,7 @@ object RecordExtractors { * @param records The array of [[T]] object for each Group of the copybook * @param generateRecordId If true a record id field will be added as the first field of the record. * @param generateRecordBytes If true a record bytes field will be added at the beginning of the record. - * @param generateCorruptedFields If true, a corrupted record field will be appended to the end of the schema. + * @param generateCorruptFields If true,a corrupt fields field will be appended to the end of the schema. * @param fileId The file id to be saved to the file id field * @param recordId The record id to be saved to the record id field * @param recordByteLength The length of the record @@ -461,21 +461,21 @@ object RecordExtractors { * @return A [[T]] object corresponding to the record schema */ private def applyRecordPostProcessing[T]( - ast: CopybookAST, - records: List[T], - policy: SchemaRetentionPolicy, - generateRecordId: Boolean, - generateRecordBytes: Boolean, - generateCorruptedFields: Boolean, - segmentLevelIds: List[String], - fileId: Int, - recordId: Long, - recordByteLength: Int, - recordBytes: Array[Byte], - generateInputFileField: Boolean, - inputFileName: String, - corruptedFields: ArrayBuffer[CorruptedField], - handler: RecordHandler[T] + ast: CopybookAST, + records: List[T], + policy: SchemaRetentionPolicy, + generateRecordId: Boolean, + generateRecordBytes: Boolean, + generateCorruptFields: Boolean, + segmentLevelIds: List[String], + fileId: Int, + recordId: Long, + recordByteLength: Int, + recordBytes: Array[Byte], + generateInputFileField: Boolean, + inputFileName: String, + corruptFields: ArrayBuffer[CorruptField], + handler: RecordHandler[T] ): Seq[Any] = { val outputRecords = new ListBuffer[Any] @@ -508,13 +508,13 @@ object RecordExtractors { records.foreach(record => outputRecords.append(record)) } - if (generateCorruptedFields && corruptedFields != null) { + if (generateCorruptFields && corruptFields != null) { // Ugly but efficient implementation of converting errors as an array field - val len = corruptedFields.length + val len = corruptFields.length val ar = new Array[Any](len) var i = 0 while (i < len) { - val r = handler.create(Array[Any](corruptedFields(i).fieldName, corruptedFields(i).rawValue), corruptedFieldsGroup) + val r = handler.create(Array[Any](corruptFields(i).fieldName, corruptFields(i).rawValue), corruptFieldsGroup) ar(i) = r i += 1 } @@ -526,15 +526,15 @@ object RecordExtractors { } /** - * Constructs a Group object representing corrupted fields. It is only needed for constructing records that require field names, + * Constructs a Group object representing corrupt fields. It is only needed for constructing records that require field names, * such as JSON. Field sizes and encoding do not really matter */ - private def getCorruptedFieldsGroup: Group = { - val corruptedFieldsInGroup = new mutable.ArrayBuffer[Statement] + private def getCorruptFieldsGroup: Group = { + val corruptFieldsInGroup = new mutable.ArrayBuffer[Statement] - corruptedFieldsInGroup += Primitive(15, "field_name", "field_name", 0, AlphaNumeric("X(50)", 50), decode = null, encode = null)(None) - corruptedFieldsInGroup += Primitive(15, "raw_value", "raw_value", 0, AlphaNumeric("X(50)", 50, enc = Some(RAW), compact = Some(COMP4())), decode = null, encode = null)(None) + corruptFieldsInGroup += Primitive(15, "field_name", "field_name", 0, AlphaNumeric("X(50)", 50), decode = null, encode = null)(None) + corruptFieldsInGroup += Primitive(15, "raw_value", "raw_value", 0, AlphaNumeric("X(50)", 50, enc = Some(RAW), compact = Some(COMP4())), decode = null, encode = null)(None) - Group(10, "_corrupted_fields", "_corrupted_fields", 0, children = corruptedFieldsInGroup, occurs = Some(10))(None) + Group(10, "_corrupt_fields", "_corrupt_fields", 0, children = corruptFieldsInGroup, occurs = Some(10))(None) } } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala index 8da001b6a..a6a8fb8d6 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala @@ -90,7 +90,7 @@ class FixedLenNestedRowIterator[T: ClassTag]( readerProperties.schemaPolicy, readerProperties.variableSizeOccurs, generateRecordBytes = readerProperties.generateRecordBytes, - generateCorruptedFields = readerProperties.generateCorruptFields, + generateCorruptFields = readerProperties.generateCorruptFields, activeSegmentRedefine = activeSegmentRedefine, handler = handler ) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala index e9bab4ff8..7a6b63432 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala @@ -50,7 +50,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten * @param generateCorruptFields Generate '_corrupt_fields' field for fields that haven't converted successfully * @param schemaRetentionPolicy A copybook usually has a root group struct element that acts like a rowtag in XML. This can be retained in Spark schema or can be collapsed * @param stringTrimmingPolicy Specify if and how strings should be trimmed when parsed - * @param isDisplayAlwaysString If true, all fields having DISPLAY format will remain strings and won't be converted to numbers + * @param isDisplayAlwaysString If true, all fields having DISPLAY format will remain strings and won't be converted to numbers * @param allowPartialRecords If true, partial ASCII records can be parsed (in cases when LF character is missing for example) * @param multisegmentParams Parameters for reading multisegment mainframe files * @param improvedNullDetection If true, string values that contain only zero bytes (0x0) will be considered null. diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala index f87314ab5..6f9878eca 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala @@ -66,7 +66,7 @@ object CobolParametersParser extends Logging { // Schema transformation parameters val PARAM_GENERATE_RECORD_ID = "generate_record_id" val PARAM_GENERATE_RECORD_BYTES = "generate_record_bytes" - val PARAM_CORRUPTED_FIELDS = "generate_corrupted_fields" + val PARAM_CORRUPT_FIELDS = "generate_corrupt_fields" val PARAM_SCHEMA_RETENTION_POLICY = "schema_retention_policy" val PARAM_GROUP_FILLERS = "drop_group_fillers" val PARAM_VALUE_FILLERS = "drop_value_fillers" @@ -287,7 +287,7 @@ object CobolParametersParser extends Logging { variableLengthParams, params.getOrElse(PARAM_VARIABLE_SIZE_OCCURS, "false").toBoolean, params.getOrElse(PARAM_GENERATE_RECORD_BYTES, "false").toBoolean, - params.getOrElse(PARAM_CORRUPTED_FIELDS, "false").toBoolean, + params.getOrElse(PARAM_CORRUPT_FIELDS, "false").toBoolean, schemaRetentionPolicy, stringTrimmingPolicy, params.getOrElse(PARAM_DISPLAY_PIC_ALWAYS_STRING, "false").toBoolean, @@ -891,12 +891,12 @@ object CobolParametersParser extends Logging { throw new IllegalArgumentException(s"Options 'segment-children:*' cannot be used with 'segment_id_level*' or 'segment_id_root' " + "since ID fields generation is not supported for hierarchical records reader.") } - if (params.contains(PARAM_GENERATE_RECORD_BYTES)) { - throw new IllegalArgumentException(s"Option '$PARAM_GENERATE_RECORD_BYTES' cannot be used with 'segment-children:*' " + + if (params.contains(PARAM_GENERATE_RECORD_BYTES) && params(PARAM_GENERATE_RECORD_BYTES).toBoolean) { + throw new IllegalArgumentException(s"Option '$PARAM_GENERATE_RECORD_BYTES=true' cannot be used with 'segment-children:*' " + "since hierarchical records are composites of more than one raw record.") } - if (params.contains(PARAM_CORRUPTED_FIELDS)) { - throw new IllegalArgumentException(s"Option '$PARAM_CORRUPTED_FIELDS' cannot be used with 'segment-children:*' " + + if (params.contains(PARAM_CORRUPT_FIELDS) && params(PARAM_CORRUPT_FIELDS).toBoolean) { + throw new IllegalArgumentException(s"Option '$PARAM_CORRUPT_FIELDS=true' cannot be used with 'segment-children:*' " + "at the moment.") } } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala index 5180e7778..69a025c8d 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala @@ -39,7 +39,7 @@ import scala.collection.immutable.HashMap * @param strictIntegralPrecision If true, Cobrix will not generate short/integer/long Spark data types, and always use decimal(n) with the exact precision that matches the copybook. * @param generateRecordId If true, a record id field will be prepended to the beginning of the schema. * @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema. - * @param generateCorruptedFields If true, a corrupted record field will be appended to the end of the schema. + * @param generateCorruptFields If true, a corrupt fields field will be appended to the end of the schema. * @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema. * @param generateSegIdFieldsCnt A number of segment ID levels to generate * @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used) @@ -52,7 +52,7 @@ class CobolSchema(val copybook: Copybook, val inputFileNameField: String, val generateRecordId: Boolean, val generateRecordBytes: Boolean, - val generateCorruptedFields: Boolean, + val generateCorruptFields: Boolean, val generateSegIdFieldsCnt: Int = 0, val segmentIdProvidedPrefix: String = "", val metadataPolicy: MetadataPolicy = MetadataPolicy.Basic) extends Serializable { diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala index 14c81e3c3..d827e20b1 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala @@ -73,7 +73,7 @@ class SparkCobolOptionsBuilder(copybookContent: String)(implicit spark: SparkSes variableLengthOccurs = readerParams.variableSizeOccurs, generateRecordId = readerParams.generateRecordId, generateRecordBytes = readerParams.generateRecordBytes, - generateCorruptedFields = readerParams.generateCorruptFields, + generateCorruptFields = readerParams.generateCorruptFields, handler = recordHandler) Row.fromSeq(record) }) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala index ba3d842a5..3beb8d8fc 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala @@ -40,11 +40,11 @@ import scala.collection.mutable.ArrayBuffer * provides the corresponding Spark schema and also other properties for the Spark data source. * * @param copybook A parsed copybook. - * @param schemaRetentionPolicy pecifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook. + * @param schemaRetentionPolicy Specifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook. * @param strictIntegralPrecision If true, Cobrix will not generate short/integer/long Spark data types, and always use decimal(n) with the exact precision that matches the copybook. * @param generateRecordId If true, a record id field will be prepended to the beginning of the schema. * @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema. - * @param generateCorruptedFields If true, a corrupted record field will be appended to the beginning of the schema. + * @param generateCorruptFields If true, a corrupt fields field will be appended to the end of the schema. * @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema. * @param generateSegIdFieldsCnt A number of segment ID levels to generate * @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used) @@ -57,7 +57,7 @@ class CobolSchema(copybook: Copybook, inputFileNameField: String, generateRecordId: Boolean, generateRecordBytes: Boolean, - generateCorruptedFields: Boolean, + generateCorruptFields: Boolean, generateSegIdFieldsCnt: Int, segmentIdProvidedPrefix: String, metadataPolicy: MetadataPolicy) @@ -68,7 +68,7 @@ class CobolSchema(copybook: Copybook, inputFileNameField, generateRecordId, generateRecordBytes, - generateCorruptedFields, + generateCorruptFields, generateSegIdFieldsCnt, segmentIdProvidedPrefix ) with Logging with Serializable { @@ -129,8 +129,8 @@ class CobolSchema(copybook: Copybook, recordsWithRecordBytes } - val recordsWithCorruptedFields = if (generateCorruptedFields) { - recordsWithRecordId :+ StructField(Constants.corruptedFieldsField, ArrayType(StructType( + val recordsWithCorruptFields = if (generateCorruptFields) { + recordsWithRecordId :+ StructField(Constants.corruptFieldsField, ArrayType(StructType( Seq( StructField(Constants.fieldNameColumn, StringType, nullable = false), StructField(Constants.rawValueColumn, BinaryType, nullable = false) @@ -140,7 +140,7 @@ class CobolSchema(copybook: Copybook, recordsWithRecordId } - StructType(recordsWithCorruptedFields) + StructType(recordsWithCorruptFields) } private [cobrix] def getMaximumSegmentIdLength(segmentIdProvidedPrefix: String): Int = { @@ -323,7 +323,7 @@ object CobolSchema { schema.inputFileNameField, schema.generateRecordId, schema.generateRecordBytes, - schema.generateCorruptedFields, + schema.generateCorruptFields, schema.generateSegIdFieldsCnt, schema.segmentIdPrefix, schema.metadataPolicy @@ -347,7 +347,7 @@ object CobolSchema { private var inputFileNameField: String = "" private var generateRecordId: Boolean = false private var generateRecordBytes: Boolean = false - private var generateCorruptedFields: Boolean = false + private var generateCorruptFields: Boolean = false private var generateSegIdFieldsCnt: Int = 0 private var segmentIdProvidedPrefix: String = "" private var metadataPolicy: MetadataPolicy = MetadataPolicy.Basic @@ -382,8 +382,8 @@ object CobolSchema { this } - def withGenerateCorruptedFields(generateCorruptedFields: Boolean): CobolSchemaBuilder = { - this.generateCorruptedFields = generateCorruptedFields + def withGenerateCorruptFields(generateCorruptFields: Boolean): CobolSchemaBuilder = { + this.generateCorruptFields = generateCorruptFields this } @@ -411,7 +411,7 @@ object CobolSchema { inputFileNameField, generateRecordId, generateRecordBytes, - generateCorruptedFields, + generateCorruptFields, generateSegIdFieldsCnt, segmentIdProvidedPrefix, metadataPolicy diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala index 5bdf38643..349ba4630 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala @@ -297,7 +297,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { assertEqualsMultiline(actualSchema, expectedSchema) } - "multi-segment keep-original with corrupted record generation" in { + "multi-segment keep-original with corrupt record generation" in { val expectedSchema = """root | |-- Seg_Id0: string (nullable = true) @@ -306,13 +306,13 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { | | |-- IntValue: integer (nullable = true) | |-- STRUCT2: struct (nullable = true) | | |-- STR_FLD: string (nullable = true) - | |-- _corrupted_fields: array (nullable = false) + | |-- _corrupt_fields: array (nullable = false) | | |-- element: struct (containsNull = false) | | | |-- field_name: string (nullable = false) | | | |-- raw_value: binary (nullable = false) |""".stripMargin.replaceAll("[\\r\\n]", "\n") val parsedSchema = CopybookParser.parseTree(copyBook) - val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal).withGenerateCorruptedFields(true).withGenerateSegIdFieldsCnt(2).build() + val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal).withGenerateCorruptFields(true).withGenerateSegIdFieldsCnt(2).build() val actualSchema = cobolSchema.getSparkSchema.treeString assertEqualsMultiline(actualSchema, expectedSchema) @@ -869,20 +869,20 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { assert(sparkSchema.fields(1).dataType.isInstanceOf[StructType]) } - "create schema with corrupted fields using builder" in { + "create schema with corrupt fields using builder" in { val copybook: String = """ 01 RECORD. | 05 STR1 PIC X(10). |""".stripMargin val parsedCopybook = CopybookParser.parse(copybook) val cobolSchema = CobolSchema.builder(parsedCopybook) - .withGenerateCorruptedFields(true) + .withGenerateCorruptFields(true) .build() val sparkSchema = cobolSchema.getSparkSchema assert(sparkSchema.fields.length == 2) - assert(sparkSchema.fields(1).name == "_corrupted_fields") + assert(sparkSchema.fields(1).name == "_corrupt_fields") assert(sparkSchema.fields(1).dataType.isInstanceOf[ArrayType]) } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test17HierarchicalSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test17HierarchicalSpec.scala index 837d227c7..b316d9ccb 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test17HierarchicalSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test17HierarchicalSpec.scala @@ -16,12 +16,13 @@ package za.co.absa.cobrix.spark.cobol.source.integration -import java.nio.charset.StandardCharsets -import java.nio.file.{Files, Paths} - import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.PARAM_GENERATE_RECORD_BYTES import za.co.absa.cobrix.spark.cobol.source.base.{CobolTestBase, SparkTestBase} +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths} + //noinspection NameBooleanParameters class Test17HierarchicalSpec extends AnyWordSpec with SparkTestBase with CobolTestBase { @@ -441,7 +442,7 @@ class Test17HierarchicalSpec extends AnyWordSpec with SparkTestBase with CobolTe .load(inpudDataPath) } - assert(ex.getMessage.contains("Option 'generate_record_bytes' cannot be used with 'segment-children:*'")) + assert(ex.getMessage.contains(s"Option '$PARAM_GENERATE_RECORD_BYTES=true' cannot be used with 'segment-children:*'")) } } } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test35GeneratedRecordBytes.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test35GeneratedRecordBytes.scala index d17f43653..7912c480e 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test35GeneratedRecordBytes.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test35GeneratedRecordBytes.scala @@ -17,6 +17,7 @@ package za.co.absa.cobrix.spark.cobol.source.integration import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.PARAM_GENERATE_RECORD_BYTES import za.co.absa.cobrix.spark.cobol.Cobrix import za.co.absa.cobrix.spark.cobol.source.base.{SimpleComparisonBase, SparkTestBase} import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture @@ -115,7 +116,7 @@ class Test35GeneratedRecordBytes extends AnyWordSpec with SparkTestBase with Bin .load(fileName) } - assert(ex.getMessage.contains("Option 'generate_record_bytes' cannot be used with 'segment-children:*'")) + assert(ex.getMessage.contains(s"Option '$PARAM_GENERATE_RECORD_BYTES=true' cannot be used with 'segment-children:*'")) } } } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptedFieldsSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala similarity index 83% rename from spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptedFieldsSpec.scala rename to spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala index 4cab7323a..5a5732419 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptedFieldsSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala @@ -18,12 +18,12 @@ package za.co.absa.cobrix.spark.cobol.source.integration import org.apache.spark.sql.DataFrame import org.scalatest.wordspec.AnyWordSpec -import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.PARAM_CORRUPTED_FIELDS +import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.PARAM_CORRUPT_FIELDS import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase import za.co.absa.cobrix.spark.cobol.source.fixtures.{BinaryFileFixture, TextComparisonFixture} import za.co.absa.cobrix.spark.cobol.utils.SparkUtils -class Test41CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture with TextComparisonFixture { +class Test41CorruptFieldsSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture with TextComparisonFixture { private val copybook = """ 01 R. 03 ID PIC 9(1). @@ -41,7 +41,7 @@ class Test41CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with Bina 0xF5, 0xC1, 0x4C, 0x00, 0xA0, 0xC1, 0xF5, 0xA3 // Errors in array ).map(_.toByte) - "Corrupted fields record generation" should { + "Corrupt fields record generation" should { "work when the option is turned on" in { val expectedSchema = """root @@ -51,7 +51,7 @@ class Test41CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with Bina | |-- F3: integer (nullable = true) | |-- F4: array (nullable = true) | | |-- element: integer (containsNull = true) - | |-- _corrupted_fields: array (nullable = false) + | |-- _corrupt_fields: array (nullable = false) | | |-- element: struct (containsNull = false) | | | |-- field_name: string (nullable = false) | | | |-- raw_value: binary (nullable = false) @@ -64,13 +64,13 @@ class Test41CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with Bina | "F2" : 5, | "F3" : 6, | "F4" : [ 1, 2, 3 ], - | "_corrupted_fields" : [ ] + | "_corrupt_fields" : [ ] |}, { | "ID" : 2, | "F1" : "1", | "F3" : 5, | "F4" : [ 4, 5, 6 ], - | "_corrupted_fields" : [ { + | "_corrupt_fields" : [ { | "field_name" : "F2", | "raw_value" : "0w==" | } ] @@ -79,19 +79,19 @@ class Test41CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with Bina | "F2" : 3, | "F3" : 61702, | "F4" : [ 7, 8, 9 ], - | "_corrupted_fields" : [ ] + | "_corrupt_fields" : [ ] |}, { | "ID" : 4, | "F3" : 0, | "F4" : [ null, null, 0 ], - | "_corrupted_fields" : [ ] + | "_corrupt_fields" : [ ] |}, { | "ID" : 5, | "F1" : "A", | "F2" : 4, | "F3" : 160, | "F4" : [ null, 5, null ], - | "_corrupted_fields" : [ { + | "_corrupt_fields" : [ { | "field_name" : "F4[0]", | "raw_value" : "wQ==" | }, { @@ -101,8 +101,8 @@ class Test41CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with Bina |} ] |""".stripMargin - withTempBinFile("corrupted_fields1", ".dat", data) { tmpFileName => - val df = getDataFrame(tmpFileName, Map("generate_corrupted_fields" -> "true")) + withTempBinFile("corrupt_fields1", ".dat", data) { tmpFileName => + val df = getDataFrame(tmpFileName, Map("generate_corrupt_fields" -> "true")) val actualSchema = df.schema.treeString compareTextVertical(actualSchema, expectedSchema) @@ -115,10 +115,10 @@ class Test41CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with Bina "throw an exception when working with a hierarchical data" in { val ex = intercept[IllegalArgumentException] { - getDataFrame("/tmp/dummy", Map("generate_corrupted_fields" -> "true", "segment-children:0" -> "COMPANY => DEPT,CUSTOMER")) + getDataFrame("/tmp/dummy", Map("generate_corrupt_fields" -> "true", "segment-children:0" -> "COMPANY => DEPT,CUSTOMER")) } - assert(ex.getMessage.contains(s"Option '$PARAM_CORRUPTED_FIELDS' cannot be used with 'segment-children:*'")) + assert(ex.getMessage.contains(s"Option '$PARAM_CORRUPT_FIELDS=true' cannot be used with 'segment-children:*'")) } } From 8a835e555a06be13928f1191dd4fa613175ebbfb Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 11 Feb 2026 09:41:38 +0100 Subject: [PATCH 7/7] #723 Fix PR nitpicks. --- README.md | 2 +- .../za/co/absa/cobrix/cobol/parser/ast/Primitive.scala | 7 ++++--- .../cobol/reader/extractors/record/RecordExtractors.scala | 7 ++++--- .../za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala | 2 +- .../cobol/source/integration/Test41CorruptFieldsSpec.scala | 4 ++-- 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index a16b56bfd..f4cc9bcf0 100644 --- a/README.md +++ b/README.md @@ -1952,7 +1952,7 @@ A: Update hadoop dll to version 3.2.2 or newer. ## Changelog - #### 2.9.8 will be released soon. - - [#723](https://github.com/AbsaOSS/cobrix/pull/723) Added the option to generate `corrupt_fields` field that contains field names and raw values + - [#723](https://github.com/AbsaOSS/cobrix/pull/723) Added the option to generate `_corrupt_fields` field that contains field names and raw values of fields that Cobrix couldn't decode. ```scala .option("generate_corrupt_fields", "true") diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala index 1019dcc7c..794172a2b 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala @@ -154,7 +154,8 @@ case class Primitive( /** * Checks if a value extracted from a given binary record at a specified offset is considered empty. - * A value is considered empty if it contains only null bytes or bytes equal to 0x40. + * A value is considered empty if it contains only null bytes or bytes equal to a space character + * of the underlying encoding (e.g., 0x40 for EBCDIC, 0x20 for ASCII, 0x00 for binary). * * @param itOffset The offset within the binary record where the value starts. * @param record The binary record represented as an array of bytes. @@ -170,7 +171,7 @@ case class Primitive( return true } } else { - // Non-string field size should exactly fix the required bytes + // Non-string field size should exactly fit the required bytes if (idx + bytesCount > record.length) { return true } @@ -212,7 +213,7 @@ case class Primitive( return null } } else { - // Non-string field size should exactly fix the required bytes + // Non-string field size should exactly fit the required bytes if (idx + bytesCount > record.length) { return null } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala index fdabc52ee..838c0e365 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala @@ -19,6 +19,7 @@ package za.co.absa.cobrix.cobol.reader.extractors.record import za.co.absa.cobrix.cobol.parser.CopybookParser.CopybookAST import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, COMP4} import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive, Statement} +import za.co.absa.cobrix.cobol.parser.common.Constants import za.co.absa.cobrix.cobol.parser.encoding.RAW import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy @@ -532,9 +533,9 @@ object RecordExtractors { private def getCorruptFieldsGroup: Group = { val corruptFieldsInGroup = new mutable.ArrayBuffer[Statement] - corruptFieldsInGroup += Primitive(15, "field_name", "field_name", 0, AlphaNumeric("X(50)", 50), decode = null, encode = null)(None) - corruptFieldsInGroup += Primitive(15, "raw_value", "raw_value", 0, AlphaNumeric("X(50)", 50, enc = Some(RAW), compact = Some(COMP4())), decode = null, encode = null)(None) + corruptFieldsInGroup += Primitive(15, Constants.fieldNameColumn, Constants.fieldNameColumn, 0, AlphaNumeric("X(50)", 50), decode = null, encode = null)(None) + corruptFieldsInGroup += Primitive(15, Constants.rawValueColumn, Constants.rawValueColumn, 0, AlphaNumeric("X(50)", 50, enc = Some(RAW), compact = Some(COMP4())), decode = null, encode = null)(None) - Group(10, "_corrupt_fields", "_corrupt_fields", 0, children = corruptFieldsInGroup, occurs = Some(10))(None) + Group(10, Constants.corruptFieldsField, Constants.corruptFieldsField, 0, children = corruptFieldsInGroup, occurs = Some(10))(None) } } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala index 349ba4630..34d1244d8 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala @@ -906,7 +906,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { assert(sparkSchema.fields(0).name == "file_name") assert(sparkSchema.fields(1).name == "Seg_Id0") assert(sparkSchema.fields(2).name == "NUM1") - assert(sparkSchema.fields(2).dataType.isInstanceOf[StringType]) + assert(sparkSchema.fields(2).dataType == StringType) } } } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala index 5a5732419..77dabe73f 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala @@ -102,7 +102,7 @@ class Test41CorruptFieldsSpec extends AnyWordSpec with SparkTestBase with Binary |""".stripMargin withTempBinFile("corrupt_fields1", ".dat", data) { tmpFileName => - val df = getDataFrame(tmpFileName, Map("generate_corrupt_fields" -> "true")) + val df = getDataFrame(tmpFileName, Map(PARAM_CORRUPT_FIELDS -> "true")) val actualSchema = df.schema.treeString compareTextVertical(actualSchema, expectedSchema) @@ -115,7 +115,7 @@ class Test41CorruptFieldsSpec extends AnyWordSpec with SparkTestBase with Binary "throw an exception when working with a hierarchical data" in { val ex = intercept[IllegalArgumentException] { - getDataFrame("/tmp/dummy", Map("generate_corrupt_fields" -> "true", "segment-children:0" -> "COMPANY => DEPT,CUSTOMER")) + getDataFrame("/tmp/dummy", Map(PARAM_CORRUPT_FIELDS -> "true", "segment-children:0" -> "COMPANY => DEPT,CUSTOMER")) } assert(ex.getMessage.contains(s"Option '$PARAM_CORRUPT_FIELDS=true' cannot be used with 'segment-children:*'"))