From e501f3515408b5352ff13f4e72b37c8dbdc57fa1 Mon Sep 17 00:00:00 2001
From: Ruslan Iushchenko
Date: Mon, 9 Feb 2026 09:23:39 +0100
Subject: [PATCH 1/7] #723 Add 'generate_corrupted_fields' as a parameter and
part of Spark Schema.
---
.../cobol/parser/common/Constants.scala | 3 ++
.../reader/parameters/CobolParameters.scala | 4 +-
.../parameters/CobolParametersParser.scala | 3 ++
.../reader/parameters/ReaderParameters.scala | 2 +
.../cobol/reader/schema/CobolSchema.scala | 10 +++--
.../spark/cobol/schema/CobolSchema.scala | 19 ++++++++-
.../cobrix/spark/cobol/CobolSchemaSpec.scala | 39 ++++++++++++++-----
7 files changed, 64 insertions(+), 16 deletions(-)
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala
index 950dec281..7f18fd3af 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala
@@ -64,6 +64,9 @@ object Constants {
val recordIdField = "Record_Id"
val recordByteLength = "Record_Byte_Length"
val recordBytes = "Record_Bytes"
+ val corruptedFieldsField = "_corrupted_fields"
+ val fieldNameColumn = "field_name"
+ val rawValueColumn = "raw_value"
// Non-terminals
val nonTerminalsPostfix = "_NT"
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala
index f8331e716..e9bab4ff8 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala
@@ -17,9 +17,9 @@
package za.co.absa.cobrix.cobol.reader.parameters
import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat
-import za.co.absa.cobrix.cobol.parser.policies.{CommentPolicy, FillerNamingPolicy, MetadataPolicy}
import za.co.absa.cobrix.cobol.parser.policies.DebugFieldsPolicy.DebugFieldsPolicy
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy
+import za.co.absa.cobrix.cobol.parser.policies.{CommentPolicy, FillerNamingPolicy, MetadataPolicy}
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy
@@ -47,6 +47,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param variableLengthParams VariableLengthParameters containing the specifications for the consumption of variable-length Cobol records.
* @param variableSizeOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements
* @param generateRecordBytes Generate 'record_bytes' field containing raw bytes of the original record
+ * @param generateCorruptFields Generate '_corrupt_fields' field for fields that haven't converted successfully
* @param schemaRetentionPolicy A copybook usually has a root group struct element that acts like a rowtag in XML. This can be retained in Spark schema or can be collapsed
* @param stringTrimmingPolicy Specify if and how strings should be trimmed when parsed
* @param isDisplayAlwaysString If true, all fields having DISPLAY format will remain strings and won't be converted to numbers
@@ -87,6 +88,7 @@ case class CobolParameters(
variableLengthParams: Option[VariableLengthParameters],
variableSizeOccurs: Boolean,
generateRecordBytes: Boolean,
+ generateCorruptFields: Boolean,
schemaRetentionPolicy: SchemaRetentionPolicy,
stringTrimmingPolicy: StringTrimmingPolicy,
isDisplayAlwaysString: Boolean,
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala
index e563d6650..c504662ff 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala
@@ -66,6 +66,7 @@ object CobolParametersParser extends Logging {
// Schema transformation parameters
val PARAM_GENERATE_RECORD_ID = "generate_record_id"
val PARAM_GENERATE_RECORD_BYTES = "generate_record_bytes"
+ val PARAM_CORRUPTED_FIELDS = "generate_corrupted_fields"
val PARAM_SCHEMA_RETENTION_POLICY = "schema_retention_policy"
val PARAM_GROUP_FILLERS = "drop_group_fillers"
val PARAM_VALUE_FILLERS = "drop_value_fillers"
@@ -286,6 +287,7 @@ object CobolParametersParser extends Logging {
variableLengthParams,
params.getOrElse(PARAM_VARIABLE_SIZE_OCCURS, "false").toBoolean,
params.getOrElse(PARAM_GENERATE_RECORD_BYTES, "false").toBoolean,
+ params.getOrElse(PARAM_CORRUPTED_FIELDS, "false").toBoolean,
schemaRetentionPolicy,
stringTrimmingPolicy,
params.getOrElse(PARAM_DISPLAY_PIC_ALWAYS_STRING, "false").toBoolean,
@@ -431,6 +433,7 @@ object CobolParametersParser extends Logging {
fileEndOffset = varLenParams.fileEndOffset,
generateRecordId = varLenParams.generateRecordId,
generateRecordBytes = parameters.generateRecordBytes,
+ generateCorruptFields = parameters.generateCorruptFields,
schemaPolicy = parameters.schemaRetentionPolicy,
stringTrimmingPolicy = parameters.stringTrimmingPolicy,
isDisplayAlwaysString = parameters.isDisplayAlwaysString,
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala
index 01148faec..4a527e8a4 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala
@@ -58,6 +58,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param fileEndOffset A number of bytes to skip at the end of each file
* @param generateRecordId If true, a record id field will be prepended to each record.
* @param generateRecordBytes Generate 'record_bytes' field containing raw bytes of the original record
+ * @param generateCorruptFields Generate '_corrupt_fields' field for fields that haven't converted successfully
* @param schemaPolicy Specifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook.
* @param stringTrimmingPolicy Specifies if and how strings should be trimmed when parsed.
* @param isDisplayAlwaysString If true, all fields having DISPLAY format will remain strings and won't be converted to numbers.
@@ -111,6 +112,7 @@ case class ReaderParameters(
fileEndOffset: Int = 0,
generateRecordId: Boolean = false,
generateRecordBytes: Boolean = false,
+ generateCorruptFields: Boolean = false,
schemaPolicy: SchemaRetentionPolicy = SchemaRetentionPolicy.CollapseRoot,
stringTrimmingPolicy: StringTrimmingPolicy = StringTrimmingPolicy.TrimBoth,
isDisplayAlwaysString: Boolean = false,
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala
index c3f0f374c..5180e7778 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala
@@ -17,16 +17,15 @@
package za.co.absa.cobrix.cobol.reader.schema
import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
-
-import java.time.ZonedDateTime
-import java.time.format.DateTimeFormatter
-import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser}
import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC}
import za.co.absa.cobrix.cobol.parser.policies.MetadataPolicy
+import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser}
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy
import java.nio.charset.{Charset, StandardCharsets}
+import java.time.ZonedDateTime
+import java.time.format.DateTimeFormatter
import scala.collection.immutable.HashMap
@@ -40,6 +39,7 @@ import scala.collection.immutable.HashMap
* @param strictIntegralPrecision If true, Cobrix will not generate short/integer/long Spark data types, and always use decimal(n) with the exact precision that matches the copybook.
* @param generateRecordId If true, a record id field will be prepended to the beginning of the schema.
* @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema.
+ * @param generateCorruptedFields If true, a corrupted record field will be appended to the end of the schema.
* @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema.
* @param generateSegIdFieldsCnt A number of segment ID levels to generate
* @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used)
@@ -52,6 +52,7 @@ class CobolSchema(val copybook: Copybook,
val inputFileNameField: String,
val generateRecordId: Boolean,
val generateRecordBytes: Boolean,
+ val generateCorruptedFields: Boolean,
val generateSegIdFieldsCnt: Int = 0,
val segmentIdProvidedPrefix: String = "",
val metadataPolicy: MetadataPolicy = MetadataPolicy.Basic) extends Serializable {
@@ -143,6 +144,7 @@ object CobolSchema {
readerParameters.inputFileNameColumn,
readerParameters.generateRecordId,
readerParameters.generateRecordBytes,
+ readerParameters.generateCorruptFields,
segIdFieldCount,
segmentIdPrefix,
readerParameters.metadataPolicy
diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala
index c781b6419..c5d441a19 100644
--- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala
+++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala
@@ -24,11 +24,11 @@ import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, COMP1, COMP2,
import za.co.absa.cobrix.cobol.parser.common.Constants
import za.co.absa.cobrix.cobol.parser.encoding.RAW
import za.co.absa.cobrix.cobol.parser.policies.MetadataPolicy
+import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.getReaderProperties
import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters}
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy
import za.co.absa.cobrix.cobol.reader.schema.{CobolSchema => CobolReaderSchema}
-import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.getReaderProperties
import za.co.absa.cobrix.spark.cobol.parameters.MetadataFields.{MAX_ELEMENTS, MAX_LENGTH, MIN_ELEMENTS}
import scala.collection.mutable
@@ -44,6 +44,7 @@ import scala.collection.mutable.ArrayBuffer
* @param strictIntegralPrecision If true, Cobrix will not generate short/integer/long Spark data types, and always use decimal(n) with the exact precision that matches the copybook.
* @param generateRecordId If true, a record id field will be prepended to the beginning of the schema.
* @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema.
+ * @param generateCorruptedFields If true, a corrupted record field will be appended to the beginning of the schema.
* @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema.
* @param generateSegIdFieldsCnt A number of segment ID levels to generate
* @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used)
@@ -56,6 +57,7 @@ class CobolSchema(copybook: Copybook,
inputFileNameField: String = "",
generateRecordId: Boolean = false,
generateRecordBytes: Boolean = false,
+ generateCorruptedFields: Boolean = false,
generateSegIdFieldsCnt: Int = 0,
segmentIdProvidedPrefix: String = "",
metadataPolicy: MetadataPolicy = MetadataPolicy.Basic)
@@ -66,6 +68,7 @@ class CobolSchema(copybook: Copybook,
inputFileNameField,
generateRecordId,
generateRecordBytes,
+ generateCorruptedFields,
generateSegIdFieldsCnt,
segmentIdProvidedPrefix
) with Logging with Serializable {
@@ -126,7 +129,18 @@ class CobolSchema(copybook: Copybook,
recordsWithRecordBytes
}
- StructType(recordsWithRecordId)
+ val recordsWithCorruptedFields = if (generateCorruptedFields) {
+ recordsWithRecordId :+ StructField(Constants.corruptedFieldsField, ArrayType(StructType(
+ Seq(
+ StructField(Constants.fieldNameColumn, StringType, nullable = false),
+ StructField(Constants.rawValueColumn, BinaryType, nullable = false)
+ )
+ ), containsNull = false), nullable = false)
+ } else {
+ recordsWithRecordId
+ }
+
+ StructType(recordsWithCorruptedFields)
}
private [cobrix] def getMaximumSegmentIdLength(segmentIdProvidedPrefix: String): Int = {
@@ -309,6 +323,7 @@ object CobolSchema {
schema.inputFileNameField,
schema.generateRecordId,
schema.generateRecordBytes,
+ schema.generateCorruptedFields,
schema.generateSegIdFieldsCnt,
schema.segmentIdPrefix,
schema.metadataPolicy
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala
index b25117b4b..81f73e9c9 100644
--- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala
@@ -16,7 +16,7 @@
package za.co.absa.cobrix.spark.cobol
-import org.apache.spark.sql.types.{ArrayType, DecimalType, IntegerType, LongType, StringType, StructType}
+import org.apache.spark.sql.types._
import org.scalatest.wordspec.AnyWordSpec
import org.slf4j.{Logger, LoggerFactory}
import za.co.absa.cobrix.cobol.parser.CopybookParser
@@ -272,7 +272,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| | |-- STR_FLD: string (nullable = true)
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", true, false, 2)
+ val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", true, false, false, 2)
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -290,7 +290,28 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| | |-- STR_FLD: string (nullable = true)
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, true, 2)
+ val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, true, false, 2)
+ val actualSchema = cobolSchema.getSparkSchema.treeString
+
+ assertEqualsMultiline(actualSchema, expectedSchema)
+ }
+
+ "multi-segment keep-original with corrupted record generation" in {
+ val expectedSchema =
+ """root
+ | |-- Seg_Id0: string (nullable = true)
+ | |-- Seg_Id1: string (nullable = true)
+ | |-- STRUCT1: struct (nullable = true)
+ | | |-- IntValue: integer (nullable = true)
+ | |-- STRUCT2: struct (nullable = true)
+ | | |-- STR_FLD: string (nullable = true)
+ | |-- _corrupted_fields: array (nullable = false)
+ | | |-- element: struct (containsNull = false)
+ | | | |-- field_name: string (nullable = false)
+ | | | |-- raw_value: binary (nullable = false)
+ |""".stripMargin.replaceAll("[\\r\\n]", "\n")
+ val parsedSchema = CopybookParser.parseTree(copyBook)
+ val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, false, true, 2)
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -311,7 +332,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| | |-- STR_FLD: string (nullable = true)
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", true, true, 2)
+ val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", true, true, false, 2)
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -332,7 +353,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| | |-- STR_FLD: string (nullable = true)
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, true, "", true, true, 2)
+ val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, true, "", true, true, false, 2)
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -349,7 +370,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| | |-- STR_FLD: string (nullable = true)
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, false, 2)
+ val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, false, false, 2)
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -367,7 +388,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| |-- STR_FLD: string (nullable = true)
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", true, false, 2)
+ val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", true, false, false, 2)
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -382,7 +403,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| |-- STR_FLD: string (nullable = true)
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", false, false, 2)
+ val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", false, false, false, 2)
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -464,7 +485,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| |-- NUM3: decimal(9,8) (nullable = true)""".stripMargin
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, true, false, "", false, false, 2)
+ val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, true, false, "", false, false, false, 2)
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
From 0f546e0640c9f1cfc5f9ee2eca722157631e630a Mon Sep 17 00:00:00 2001
From: Ruslan Iushchenko
Date: Tue, 10 Feb 2026 08:47:10 +0100
Subject: [PATCH 2/7] #723 Add support for generating corrupted fields in
record extraction
- Introduced `generate_corrupted_fields` option to append a `_corrupted_fields` array to the schema, containing field names and raw values of fields that could not be decoded.
- Updated relevant methods in `RecordExtractors` to handle corrupted fields.
- Enhanced documentation and README to reflect the new feature.
---
README.md | 29 ++--
.../cobrix/cobol/parser/ast/Primitive.scala | 28 +++-
.../extractors/record/CorruptedField.scala | 22 +++
.../extractors/record/RecordExtractors.scala | 52 +++++--
.../iterator/FixedLenNestedRowIterator.scala | 2 +-
.../iterator/VarLenNestedIterator.scala | 5 +-
.../parameters/CobolParametersParser.scala | 4 +
.../builder/SparkCobolOptionsBuilder.scala | 13 +-
.../Test39CorruptedFieldsSpec.scala | 127 ++++++++++++++++++
9 files changed, 251 insertions(+), 31 deletions(-)
create mode 100644 cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptedField.scala
create mode 100644 spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39CorruptedFieldsSpec.scala
diff --git a/README.md b/README.md
index 66bd71e24..30704f10c 100644
--- a/README.md
+++ b/README.md
@@ -597,6 +597,11 @@ root
|-- Record_Bytes: binary (nullable = false)
```
+You can generate `_corrupted_fields` that will contain original binary values of fields Cobrix was unable to decode:
+```
+.option("generate_corrupted_fields", "true")
+```
+
### Locality optimization for variable-length records parsing
Variable-length records depend on headers to have their length calculated, which makes it hard to achieve parallelism while parsing.
@@ -1557,6 +1562,7 @@ The output looks like this:
| .option("non_terminals", "GROUP1,GROUP2") | Specifies groups to also be added to the schema as string fields. When this option is specified, the reader will add one extra data field after each matching group containing the string data for the group. |
| .option("generate_record_id", false) | Generate autoincremental 'File_Id', 'Record_Id' and 'Record_Byte_Length' fields. This is used for processing record order dependent data. |
| .option("generate_record_bytes", false) | Generate 'Record_Bytes', the binary field that contains raw contents of the original unparsed records. |
+| .option("generate_corrupted_fields", false) | Generate `_corrupted_fields` field that contains values of fields Cobrix was unable to decode. |
| .option("with_input_file_name_col", "file_name") | Generates a column containing input file name for each record (Similar to Spark SQL `input_file_name()` function). The column name is specified by the value of the option. This option only works for variable record length files. For fixed record length and ASCII files use `input_file_name()`. |
| .option("metadata", "basic") | Specifies wat kind of metadata to include in the Spark schema: `false`, `basic`(default), or `extended` (PIC, usage, etc). |
| .option("debug", "hex") | If specified, each primitive field will be accompanied by a debug field containing raw bytes from the source file. Possible values: `none` (default), `hex`, `binary`, `string` (ASCII only). The legacy value `true` is supported and will generate debug fields in HEX. |
@@ -1945,19 +1951,26 @@ at org.apache.hadoop.io.nativeio.NativeIO$POSIX.getStat(NativeIO.java:608)
A: Update hadoop dll to version 3.2.2 or newer.
## Changelog
+- #### 2.9.8 will be released soon.
+ - [#723](https://github.com/AbsaOSS/cobrix/pull/723) Added the option to generate `corrupted_fields` field that contains field names and raw values
+ of fields that Cobrix couldn't decode.
+ ```scala
+ .option("generate_corrupted_fields", "true")
+ ```
+
- #### 2.9.7 released 29 January 2026.
- - [#816](https://github.com/AbsaOSS/cobrix/pull/816) Fixed the reliance on log4j libraries in the classpath. Cobrix can now be run on clusters that do not use Log4j for logging.
+ - [#816](https://github.com/AbsaOSS/cobrix/pull/816) Fixed the reliance on log4j libraries in the classpath. Cobrix can now be run on clusters that do not use Log4j for logging.
- #### 2.9.6 released 7 January 2026.
- - [#813](https://github.com/AbsaOSS/cobrix/pull/813) Fixed compatibility of the relaxed sign overpunching. Allow numbers
- with overpunched sign in unsigned numbers and allow multiple digits when overpunched sign when `strict_sign_overpunching = true`.
+ - [#813](https://github.com/AbsaOSS/cobrix/pull/813) Fixed compatibility of the relaxed sign overpunching. Allow numbers
+ with overpunched sign in unsigned numbers and allow multiple digits when overpunched sign when `strict_sign_overpunching = true`.
- #### 2.9.5 released 22 December 2025.
- - [#809](https://github.com/AbsaOSS/cobrix/pull/809) Add support for reading compressed EBCDIC files. All compression
- supported by Hadoop (.gz, .bz2, etc) are also supported by Cobrix because Cobrix uses Hadoop compressed streams for
- reading such files.
- - [#811](https://github.com/AbsaOSS/cobrix/pull/811) Add read properties hash code as index key to avoid false cache.
- This makes index caching safe to use by default, so index caching is now turned on by default.
+ - [#809](https://github.com/AbsaOSS/cobrix/pull/809) Add support for reading compressed EBCDIC files. All compression
+ supported by Hadoop (.gz, .bz2, etc) are also supported by Cobrix because Cobrix uses Hadoop compressed streams for
+ reading such files.
+ - [#811](https://github.com/AbsaOSS/cobrix/pull/811) Add read properties hash code as index key to avoid false cache.
+ This makes index caching safe to use by default, so index caching is now turned on by default.
- #### 2.9.4 released 26 November 2025.
- [#805](https://github.com/AbsaOSS/cobrix/pull/805) Added the option to cache VRL indexes for better performance when same files are processed multiple times.
```scala
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala
index 08e61ca13..d857632bb 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala
@@ -90,7 +90,6 @@ case class Primitive(
copy(dependingOnHandlers = newDependingOnHandlers)(parent)
}
-
/** Returns the binary size in bits for the field */
def getBinarySizeBytes: Int = {
dataType match {
@@ -110,6 +109,29 @@ case class Primitive(
* @param record A record in a binary format represented as a vector of bits
*/
def decodeTypeValue(itOffset: Int, record: Array[Byte]): Any = {
+ val bytes = getRawValue(itOffset, record)
+
+ if (bytes == null) null else decode(bytes)
+ }
+
+ def isNull(itOffset: Int, record: Array[Byte]): Boolean = {
+ val bytes = getRawValue(itOffset, record)
+
+ if (bytes == null) {
+ true
+ } else {
+ var i = 0
+ while (i < bytes.length) {
+ if (bytes(i) != 0) {
+ return false
+ }
+ i += 1
+ }
+ true
+ }
+ }
+
+ def getRawValue(itOffset: Int, record: Array[Byte]): Array[Byte] = {
val bytesCount = binaryProperties.dataSize
val idx = itOffset
@@ -132,9 +154,7 @@ case class Primitive(
} else {
bytesCount
}
- val bytes = java.util.Arrays.copyOfRange(record, idx, idx + bytesToCopy)
-
- decode(bytes)
+ java.util.Arrays.copyOfRange(record, idx, idx + bytesToCopy)
}
}
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptedField.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptedField.scala
new file mode 100644
index 000000000..aa2038fd1
--- /dev/null
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptedField.scala
@@ -0,0 +1,22 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.cobrix.cobol.reader.extractors.record
+
+case class CorruptedField(
+ fieldName: String,
+ rawValue: Array[Byte]
+ )
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala
index efda1f4d3..d7708ce3d 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala
@@ -17,15 +17,18 @@
package za.co.absa.cobrix.cobol.reader.extractors.record
import za.co.absa.cobrix.cobol.parser.CopybookParser.CopybookAST
+import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, COMP9}
import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive, Statement}
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy
+import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.reflect.ClassTag
object RecordExtractors {
+ private val corruptedFieldsGroup = getCorruptedFieldsGroup
/**
* This method extracts a record from the specified array of bytes. The copybook for the record needs to be already parsed.
@@ -37,6 +40,7 @@ object RecordExtractors {
* @param variableLengthOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements.
* @param generateRecordId If true, a record id field will be added as the first field of the record.
* @param generateRecordBytes If true, a record bytes field will be added at the beginning of each record.
+ * @param generateCorruptedFields If true, a corrupted record field will be appended to the end of the schema.
* @param segmentLevelIds Segment ids to put to the extracted record if id generation it turned on.
* @param fileId A file id to be put to the extractor record if generateRecordId == true.
* @param recordId The record id to be saved to the record id field.
@@ -55,6 +59,7 @@ object RecordExtractors {
variableLengthOccurs: Boolean = false,
generateRecordId: Boolean = false,
generateRecordBytes: Boolean = false,
+ generateCorruptedFields: Boolean = false,
segmentLevelIds: List[String] = Nil,
fileId: Int = 0,
recordId: Long = 0,
@@ -64,6 +69,7 @@ object RecordExtractors {
handler: RecordHandler[T]
): Seq[Any] = {
val dependFields = scala.collection.mutable.HashMap.empty[String, Either[Int, String]]
+ val corruptedFields = new ListBuffer[CorruptedField]
val isAstFlat = ast.children.exists(_.isInstanceOf[Primitive])
@@ -103,6 +109,9 @@ object RecordExtractors {
var j = 0
while (i < actualSize) {
val value = s.decodeTypeValue(offset, data)
+ if (value == null && generateCorruptedFields && !s.isNull(offset, data)) {
+ corruptedFields += CorruptedField(s"${field.name}[$i]", s.getRawValue(offset,data))
+ }
offset += s.binaryProperties.dataSize
values(j) = value
i += 1
@@ -127,6 +136,9 @@ object RecordExtractors {
}
case st: Primitive =>
val value = st.decodeTypeValue(useOffset, data)
+ if (value == null && generateCorruptedFields && !st.isNull(useOffset, data)) {
+ corruptedFields += CorruptedField(field.name, st.getRawValue(useOffset,data))
+ }
if (value != null && st.isDependee) {
val intStringVal: Either[Int, String] = value match {
case v: Int => Left(v)
@@ -199,7 +211,7 @@ object RecordExtractors {
policy
}
- applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes, segmentLevelIds, fileId, recordId, data.length, data, generateInputFileField, inputFileName, handler)
+ applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes, generateCorruptedFields, segmentLevelIds, fileId, recordId, data.length, data, generateInputFileField, inputFileName, corruptedFields.toSeq, handler)
}
/**
@@ -419,7 +431,7 @@ object RecordExtractors {
policy
}
- applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes = false, Nil, fileId, recordId, recordLength, Array.empty[Byte], generateInputFileField = generateInputFileField, inputFileName, handler)
+ applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes = false, generateCorruptedFields = false, Nil, fileId, recordId, recordLength, Array.empty[Byte], generateInputFileField = generateInputFileField, inputFileName, Seq.empty, handler)
}
/**
@@ -435,15 +447,16 @@ object RecordExtractors {
* Combinations of the listed transformations are supported.
*
*
- * @param ast The parsed copybook
- * @param records The array of [[T]] object for each Group of the copybook
- * @param generateRecordId If true a record id field will be added as the first field of the record.
- * @param generateRecordBytes If true a record bytes field will be added at the beginning of the record.
- * @param fileId The file id to be saved to the file id field
- * @param recordId The record id to be saved to the record id field
- * @param recordByteLength The length of the record
- * @param generateInputFileField if true, a field containing input file name will be generated
- * @param inputFileName An input file name to put if its generation is needed
+ * @param ast The parsed copybook
+ * @param records The array of [[T]] object for each Group of the copybook
+ * @param generateRecordId If true a record id field will be added as the first field of the record.
+ * @param generateRecordBytes If true a record bytes field will be added at the beginning of the record.
+ * @param generateCorruptedFields If true, a corrupted record field will be appended to the end of the schema.
+ * @param fileId The file id to be saved to the file id field
+ * @param recordId The record id to be saved to the record id field
+ * @param recordByteLength The length of the record
+ * @param generateInputFileField if true, a field containing input file name will be generated
+ * @param inputFileName An input file name to put if its generation is needed
* @return A [[T]] object corresponding to the record schema
*/
private def applyRecordPostProcessing[T](
@@ -452,6 +465,7 @@ object RecordExtractors {
policy: SchemaRetentionPolicy,
generateRecordId: Boolean,
generateRecordBytes: Boolean,
+ generateCorruptedFields: Boolean,
segmentLevelIds: List[String],
fileId: Int,
recordId: Long,
@@ -459,6 +473,7 @@ object RecordExtractors {
recordBytes: Array[Byte],
generateInputFileField: Boolean,
inputFileName: String,
+ corruptedFields: Seq[CorruptedField],
handler: RecordHandler[T]
): Seq[Any] = {
val outputRecords = new ListBuffer[Any]
@@ -491,7 +506,22 @@ object RecordExtractors {
// Return rows as the original sequence of groups
records.foreach(record => outputRecords.append(record))
}
+
+ if (generateCorruptedFields) {
+ val corruptedFieldsRaw = corruptedFields.map(d => handler.create(Array[Any](d.fieldName, d.rawValue), corruptedFieldsGroup))
+ outputRecords.append(corruptedFieldsRaw)
+ }
+
// toList() is a constant time operation, and List implements immutable Seq, which is exactly what is needed here.
outputRecords.toList
}
+
+ private def getCorruptedFieldsGroup: Group = {
+ val corruptedFieldsInGroup = new mutable.ArrayBuffer[Statement]
+
+ corruptedFieldsInGroup += Primitive(15, "field_name", "field_name", 0, AlphaNumeric("X(50)", 50), decode = null, encode = null)(None)
+ corruptedFieldsInGroup += Primitive(15, "raw_value", "raw_value", 0, AlphaNumeric("X(50)", 50, compact = Some(COMP9())), decode = null, encode = null)(None)
+
+ Group(10, "_corrupted_fields", "_corrupted_fields", 0, children = corruptedFieldsInGroup )(None)
+ }
}
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala
index 5e9afe222..8da001b6a 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala
@@ -19,7 +19,6 @@ package za.co.absa.cobrix.cobol.reader.iterator
import za.co.absa.cobrix.cobol.internal.Logging
import za.co.absa.cobrix.cobol.reader.extractors.record.{RecordExtractors, RecordHandler}
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
-import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy
import za.co.absa.cobrix.cobol.reader.schema.CobolSchema
import za.co.absa.cobrix.cobol.reader.validator.ReaderParametersValidator
@@ -91,6 +90,7 @@ class FixedLenNestedRowIterator[T: ClassTag](
readerProperties.schemaPolicy,
readerProperties.variableSizeOccurs,
generateRecordBytes = readerProperties.generateRecordBytes,
+ generateCorruptedFields = readerProperties.generateCorruptFields,
activeSegmentRedefine = activeSegmentRedefine,
handler = handler
)
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala
index 49e72a80c..0f9a231e1 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala
@@ -18,10 +18,10 @@ package za.co.absa.cobrix.cobol.reader.iterator
import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.parser.headerparsers.RecordHeaderParser
-import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor
+import za.co.absa.cobrix.cobol.reader.extractors.record.{RecordExtractors, RecordHandler}
+import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream
-import za.co.absa.cobrix.cobol.reader.extractors.record.{RecordHandler, RecordExtractors}
import scala.collection.immutable.HashMap
import scala.collection.mutable.ListBuffer
@@ -99,6 +99,7 @@ final class VarLenNestedIterator[T: ClassTag](cobolSchema: Copybook,
readerProperties.variableSizeOccurs,
readerProperties.generateRecordId,
readerProperties.generateRecordBytes,
+ readerProperties.generateCorruptFields,
segmentLevelIds,
fileId,
rawRecordIterator.getRecordIndex,
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala
index c504662ff..f87314ab5 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala
@@ -895,6 +895,10 @@ object CobolParametersParser extends Logging {
throw new IllegalArgumentException(s"Option '$PARAM_GENERATE_RECORD_BYTES' cannot be used with 'segment-children:*' " +
"since hierarchical records are composites of more than one raw record.")
}
+ if (params.contains(PARAM_CORRUPTED_FIELDS)) {
+ throw new IllegalArgumentException(s"Option '$PARAM_CORRUPTED_FIELDS' cannot be used with 'segment-children:*' " +
+ "at the moment.")
+ }
}
if (!isRecordSequence && recordFormat != AsciiText && recordFormat != CobrixAsciiText && params.contains(PARAM_INPUT_FILE_COLUMN)) {
val recordSequenceCondition = s"one of this holds: '$PARAM_RECORD_FORMAT' = V or '$PARAM_RECORD_FORMAT' = VB or '$PARAM_IS_RECORD_SEQUENCE' = true" +
diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala
index 0692a5878..14c81e3c3 100644
--- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala
+++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala
@@ -67,11 +67,14 @@ class SparkCobolOptionsBuilder(copybookContent: String)(implicit spark: SparkSes
.filter(array => array.nonEmpty && array.length >= minimumRecordLength && array.length <= maximumRecordLength)
.map(array => {
val record = RecordExtractors.extractRecord[GenericRow](cobolSchema.getCobolSchema.ast,
- array,
- 0,
- schemaRetentionPolicy,
- generateRecordBytes = readerParams.generateRecordBytes,
- handler = recordHandler)
+ array,
+ 0,
+ schemaRetentionPolicy,
+ variableLengthOccurs = readerParams.variableSizeOccurs,
+ generateRecordId = readerParams.generateRecordId,
+ generateRecordBytes = readerParams.generateRecordBytes,
+ generateCorruptedFields = readerParams.generateCorruptFields,
+ handler = recordHandler)
Row.fromSeq(record)
})
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39CorruptedFieldsSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39CorruptedFieldsSpec.scala
new file mode 100644
index 000000000..27c99c4f1
--- /dev/null
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39CorruptedFieldsSpec.scala
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.cobrix.spark.cobol.source.integration
+
+import org.apache.spark.sql.DataFrame
+import org.scalatest.wordspec.AnyWordSpec
+import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.PARAM_CORRUPTED_FIELDS
+import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
+import za.co.absa.cobrix.spark.cobol.source.fixtures.{BinaryFileFixture, TextComparisonFixture}
+import za.co.absa.cobrix.spark.cobol.utils.SparkUtils
+
+class Test39CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture with TextComparisonFixture {
+ private val copybook =
+ """ 01 R.
+ 03 ID PIC 9(1).
+ 03 F1 PIC X(1).
+ 03 F2 PIC S9(1) COMP-3.
+ 03 F3 PIC 9(3) COMP.
+ 03 F4 PIC 9(1) OCCURS 3.
+ """
+
+ private val data = Array(
+ 0xF1, 0x40, 0x5C, 0x00, 0x06, 0xF1, 0xF2, 0xF3, // Record OK, no errors
+ 0xF2, 0xF1, 0xD3, 0x00, 0x05, 0xF4, 0xF5, 0xF6, // COMP-3 error
+ 0xF3, 0x00, 0x3C, 0xF1, 0x06, 0xF7, 0xF8, 0xF9, // COMP invalid value (negative)
+ 0xF4, 0xC1, 0x4C, 0x00, 0xA0, 0xC1, 0xF5, 0xA3 // Errors in array
+ ).map(_.toByte)
+
+ "Corrupted fields record generation" should {
+ "work when the option is turned on" in {
+ val expectedSchema =
+ """root
+ | |-- ID: integer (nullable = true)
+ | |-- F1: string (nullable = true)
+ | |-- F2: integer (nullable = true)
+ | |-- F3: integer (nullable = true)
+ | |-- F4: array (nullable = true)
+ | | |-- element: integer (containsNull = true)
+ | |-- _corrupted_fields: array (nullable = false)
+ | | |-- element: struct (containsNull = false)
+ | | | |-- field_name: string (nullable = false)
+ | | | |-- raw_value: binary (nullable = false)
+ |""".stripMargin
+
+ val expectedData =
+ """[ {
+ | "ID" : 1,
+ | "F1" : "",
+ | "F2" : 5,
+ | "F3" : 6,
+ | "F4" : [ 1, 2, 3 ],
+ | "_corrupted_fields" : [ ]
+ |}, {
+ | "ID" : 2,
+ | "F1" : "1",
+ | "F3" : 5,
+ | "F4" : [ 4, 5, 6 ],
+ | "_corrupted_fields" : [ {
+ | "field_name" : "F2",
+ | "raw_value" : "0w=="
+ | } ]
+ |}, {
+ | "ID" : 3,
+ | "F2" : 3,
+ | "F3" : 61702,
+ | "F4" : [ 7, 8, 9 ],
+ | "_corrupted_fields" : [ ]
+ |}, {
+ | "ID" : 4,
+ | "F1" : "A",
+ | "F2" : 4,
+ | "F3" : 160,
+ | "F4" : [ null, 5, null ],
+ | "_corrupted_fields" : [ {
+ | "field_name" : "F4[0]",
+ | "raw_value" : "wQ=="
+ | }, {
+ | "field_name" : "F4[2]",
+ | "raw_value" : "ow=="
+ | } ]
+ |} ]
+ |""".stripMargin
+
+ withTempBinFile("corrupted_fields1", ".dat", data) { tmpFileName =>
+ val df = getDataFrame(tmpFileName, Map("generate_corrupted_fields" -> "true"))
+
+ val actualSchema = df.schema.treeString
+ compareTextVertical(actualSchema, expectedSchema)
+
+ val actualData = SparkUtils.convertDataFrameToPrettyJSON(df.orderBy("ID"), 10)
+
+ compareTextVertical(actualData, expectedData)
+ }
+ }
+
+ "throw an exception when working with a hierarchical data" in {
+ val ex = intercept[IllegalArgumentException] {
+ getDataFrame("/tmp/dummy", Map("generate_corrupted_fields" -> "true", "segment-children:0" -> "COMPANY => DEPT,CUSTOMER"))
+ }
+
+ assert(ex.getMessage.contains(s"Option '$PARAM_CORRUPTED_FIELDS' cannot be used with 'segment-children:*'"))
+ }
+ }
+
+ private def getDataFrame(inputPath: String, extraOptions: Map[String, String] = Map.empty[String, String]): DataFrame = {
+ spark
+ .read
+ .format("cobol")
+ .option("copybook_contents", copybook)
+ .options(extraOptions)
+ .load(inputPath)
+ }
+}
From bb08a9ce5967a8c0228939b2c76aec48f6626649 Mon Sep 17 00:00:00 2001
From: Ruslan Iushchenko
Date: Tue, 10 Feb 2026 10:30:23 +0100
Subject: [PATCH 3/7] #723 Make 0x40 considered empty and not corrupted when
checking for null fields, improve performance of generating corrupted fields
slightly.
---
.../cobrix/cobol/parser/ast/Primitive.scala | 55 +++++++++++++++----
.../extractors/record/RecordExtractors.scala | 4 +-
....scala => Test41CorruptedFieldsSpec.scala} | 10 +++-
3 files changed, 54 insertions(+), 15 deletions(-)
rename spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/{Test39CorruptedFieldsSpec.scala => Test41CorruptedFieldsSpec.scala} (92%)
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala
index d857632bb..34ca9b903 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala
@@ -114,23 +114,56 @@ case class Primitive(
if (bytes == null) null else decode(bytes)
}
- def isNull(itOffset: Int, record: Array[Byte]): Boolean = {
- val bytes = getRawValue(itOffset, record)
+ /**
+ * Checks if a value extracted from a given binary record at a specified offset is considered empty.
+ * A value is considered empty if it contains only null bytes or bytes equal to 0x40.
+ *
+ * @param itOffset The offset within the binary record where the value starts.
+ * @param record The binary record represented as an array of bytes.
+ * @return `true` if the value is empty, otherwise `false`.
+ */
+ def isEmpty(itOffset: Int, record: Array[Byte]): Boolean = {
+ val bytesCount = binaryProperties.dataSize
+ val idx = itOffset
+
+ if (isString) {
+ // The length of a string can be smaller for varchar fields at the end of a record
+ if (idx > record.length) {
+ return true
+ }
+ } else {
+ // Non-string field size should exactly fix the required bytes
+ if (idx + bytesCount > record.length) {
+ return true
+ }
+ }
- if (bytes == null) {
- true
+ // Determine the actual number of bytes to copy based on the record size.
+ // Varchar fields can be trimmed by the record size.
+ val endIndex = if (idx + bytesCount > record.length) {
+ record.length
} else {
- var i = 0
- while (i < bytes.length) {
- if (bytes(i) != 0) {
- return false
- }
- i += 1
+ idx + bytesCount
+ }
+ var i = idx
+ while (i < endIndex) {
+ if (record(i) != 0 && record(i) != 0x40) {
+ return false
}
- true
+ i += 1
}
+ true
}
+ /**
+ * Extracts a raw byte array representation of a value from a binary record
+ * based on the specified offset and the field's properties.
+ *
+ * @param itOffset The offset within the binary record where the value starts.
+ * @param record The binary record represented as an array of bytes.
+ * @return An array of bytes representing the value, or `null` if the offset
+ * or size is invalid for the given binary record.
+ */
def getRawValue(itOffset: Int, record: Array[Byte]): Array[Byte] = {
val bytesCount = binaryProperties.dataSize
val idx = itOffset
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala
index d7708ce3d..4d26892d5 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala
@@ -109,7 +109,7 @@ object RecordExtractors {
var j = 0
while (i < actualSize) {
val value = s.decodeTypeValue(offset, data)
- if (value == null && generateCorruptedFields && !s.isNull(offset, data)) {
+ if (value == null && generateCorruptedFields && !s.isEmpty(offset, data)) {
corruptedFields += CorruptedField(s"${field.name}[$i]", s.getRawValue(offset,data))
}
offset += s.binaryProperties.dataSize
@@ -136,7 +136,7 @@ object RecordExtractors {
}
case st: Primitive =>
val value = st.decodeTypeValue(useOffset, data)
- if (value == null && generateCorruptedFields && !st.isNull(useOffset, data)) {
+ if (value == null && generateCorruptedFields && !st.isEmpty(useOffset, data)) {
corruptedFields += CorruptedField(field.name, st.getRawValue(useOffset,data))
}
if (value != null && st.isDependee) {
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39CorruptedFieldsSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptedFieldsSpec.scala
similarity index 92%
rename from spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39CorruptedFieldsSpec.scala
rename to spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptedFieldsSpec.scala
index 27c99c4f1..4cab7323a 100644
--- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test39CorruptedFieldsSpec.scala
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptedFieldsSpec.scala
@@ -23,7 +23,7 @@ import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
import za.co.absa.cobrix.spark.cobol.source.fixtures.{BinaryFileFixture, TextComparisonFixture}
import za.co.absa.cobrix.spark.cobol.utils.SparkUtils
-class Test39CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture with TextComparisonFixture {
+class Test41CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture with TextComparisonFixture {
private val copybook =
""" 01 R.
03 ID PIC 9(1).
@@ -37,7 +37,8 @@ class Test39CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with Bina
0xF1, 0x40, 0x5C, 0x00, 0x06, 0xF1, 0xF2, 0xF3, // Record OK, no errors
0xF2, 0xF1, 0xD3, 0x00, 0x05, 0xF4, 0xF5, 0xF6, // COMP-3 error
0xF3, 0x00, 0x3C, 0xF1, 0x06, 0xF7, 0xF8, 0xF9, // COMP invalid value (negative)
- 0xF4, 0xC1, 0x4C, 0x00, 0xA0, 0xC1, 0xF5, 0xA3 // Errors in array
+ 0xF4, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0xF0, // Null values okay
+ 0xF5, 0xC1, 0x4C, 0x00, 0xA0, 0xC1, 0xF5, 0xA3 // Errors in array
).map(_.toByte)
"Corrupted fields record generation" should {
@@ -81,6 +82,11 @@ class Test39CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with Bina
| "_corrupted_fields" : [ ]
|}, {
| "ID" : 4,
+ | "F3" : 0,
+ | "F4" : [ null, null, 0 ],
+ | "_corrupted_fields" : [ ]
+ |}, {
+ | "ID" : 5,
| "F1" : "A",
| "F2" : 4,
| "F3" : 160,
From be408d9f7108992f9c37dc2d912e38e204a318fe Mon Sep 17 00:00:00 2001
From: Ruslan Iushchenko
Date: Tue, 10 Feb 2026 14:28:51 +0100
Subject: [PATCH 4/7] #723 Make corrupted fields generation slightly more
efficient (CPU and memory wise), fix `PIC X USAGE COMP` cases.
---
.../cobol/parser/antlr/ParserVisitor.scala | 6 ++--
.../cobol/parser/ast/datatype/Usage.scala | 8 ++---
.../extractors/record/RecordExtractors.scala | 33 +++++++++++++------
3 files changed, 30 insertions(+), 17 deletions(-)
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/antlr/ParserVisitor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/antlr/ParserVisitor.scala
index 063c7efe3..3bb1ed853 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/antlr/ParserVisitor.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/antlr/ParserVisitor.scala
@@ -16,7 +16,6 @@
package za.co.absa.cobrix.cobol.parser.antlr
-import java.nio.charset.Charset
import org.antlr.v4.runtime.{ParserRuleContext, RuleContext}
import za.co.absa.cobrix.cobol.parser.CopybookParser
import za.co.absa.cobrix.cobol.parser.CopybookParser.CopybookAST
@@ -25,13 +24,14 @@ import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive}
import za.co.absa.cobrix.cobol.parser.common.Constants
import za.co.absa.cobrix.cobol.parser.decoders.DecoderSelector
import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat
-import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
import za.co.absa.cobrix.cobol.parser.encoding._
+import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
import za.co.absa.cobrix.cobol.parser.exceptions.SyntaxErrorException
import za.co.absa.cobrix.cobol.parser.policies.CommentPolicy
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy
import za.co.absa.cobrix.cobol.parser.position.{Left, Position, Right}
+import java.nio.charset.Charset
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.matching.Regex
@@ -168,7 +168,7 @@ class ParserVisitor(enc: Encoding,
int.copy(compact=usage)
case x: AlphaNumeric if usageVal == COMP3U() =>
Integral(x.pic, x.length*2, None, false, None, Some(COMP3U()), None, x.originalPic)
- case x: AlphaNumeric if usageVal == COMP1() || usageVal == COMP4() =>
+ case x: AlphaNumeric if usageVal == COMP4() || usageVal == COMP9() =>
val enc = if (decodeBinaryAsHex) HEX else RAW
x.copy(compact=usage, enc=Some(enc))
case x: AlphaNumeric =>
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/datatype/Usage.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/datatype/Usage.scala
index 4a304e53b..8cd4869ae 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/datatype/Usage.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/datatype/Usage.scala
@@ -19,8 +19,8 @@ package za.co.absa.cobrix.cobol.parser.ast.datatype
sealed trait Usage
-//case class COMP() extends Usage
-//case class COMP0() extends Usage
+//case class COMP() extends Usage // Use COMP4()
+//case class COMP0() extends Usage // Use COMP4()
case class COMP1() extends Usage {
override def toString = "COMP-1"
}
@@ -44,8 +44,8 @@ case class COMP5() extends Usage {
case class COMP9() extends Usage { // artificial little-endian binary
override def toString = "COMP-9"
}
-//case class DISPLAY() extends Usage {
+//case class DISPLAY() extends Usage { // Use None for the USAGE instead
// override def toString = "DISPLAY"
//}
-//case class BINARY() extends Usage
+//case class BINARY() extends Usage // Use COMP4()
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala
index 4d26892d5..daaa46e4a 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala
@@ -17,8 +17,9 @@
package za.co.absa.cobrix.cobol.reader.extractors.record
import za.co.absa.cobrix.cobol.parser.CopybookParser.CopybookAST
-import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, COMP9}
+import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, COMP4}
import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive, Statement}
+import za.co.absa.cobrix.cobol.parser.encoding.RAW
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy
@@ -69,7 +70,7 @@ object RecordExtractors {
handler: RecordHandler[T]
): Seq[Any] = {
val dependFields = scala.collection.mutable.HashMap.empty[String, Either[Int, String]]
- val corruptedFields = new ListBuffer[CorruptedField]
+ val corruptedFields = new ArrayBuffer[CorruptedField]
val isAstFlat = ast.children.exists(_.isInstanceOf[Primitive])
@@ -211,7 +212,7 @@ object RecordExtractors {
policy
}
- applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes, generateCorruptedFields, segmentLevelIds, fileId, recordId, data.length, data, generateInputFileField, inputFileName, corruptedFields.toSeq, handler)
+ applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes, generateCorruptedFields, segmentLevelIds, fileId, recordId, data.length, data, generateInputFileField, inputFileName, corruptedFields, handler)
}
/**
@@ -431,7 +432,7 @@ object RecordExtractors {
policy
}
- applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes = false, generateCorruptedFields = false, Nil, fileId, recordId, recordLength, Array.empty[Byte], generateInputFileField = generateInputFileField, inputFileName, Seq.empty, handler)
+ applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes = false, generateCorruptedFields = false, Nil, fileId, recordId, recordLength, Array.empty[Byte], generateInputFileField = generateInputFileField, inputFileName, null, handler)
}
/**
@@ -473,7 +474,7 @@ object RecordExtractors {
recordBytes: Array[Byte],
generateInputFileField: Boolean,
inputFileName: String,
- corruptedFields: Seq[CorruptedField],
+ corruptedFields: ArrayBuffer[CorruptedField],
handler: RecordHandler[T]
): Seq[Any] = {
val outputRecords = new ListBuffer[Any]
@@ -507,21 +508,33 @@ object RecordExtractors {
records.foreach(record => outputRecords.append(record))
}
- if (generateCorruptedFields) {
- val corruptedFieldsRaw = corruptedFields.map(d => handler.create(Array[Any](d.fieldName, d.rawValue), corruptedFieldsGroup))
- outputRecords.append(corruptedFieldsRaw)
+ if (generateCorruptedFields && corruptedFields != null) {
+ // Ugly but efficient implementation of converting errors as an array field
+ val len = corruptedFields.length
+ val ar = new Array[Any](len)
+ var i = 0
+ while (i < len) {
+ val r = handler.create(Array[Any](corruptedFields(i).fieldName, corruptedFields(i).rawValue), corruptedFieldsGroup)
+ ar(i) = r
+ i += 1
+ }
+ outputRecords.append(ar)
}
// toList() is a constant time operation, and List implements immutable Seq, which is exactly what is needed here.
outputRecords.toList
}
+ /**
+ * Constructs a Group object representing corrupted fields. It is only needed for constructing records that require field names,
+ * such as JSON. Field sizes and encoding do not really matter
+ */
private def getCorruptedFieldsGroup: Group = {
val corruptedFieldsInGroup = new mutable.ArrayBuffer[Statement]
corruptedFieldsInGroup += Primitive(15, "field_name", "field_name", 0, AlphaNumeric("X(50)", 50), decode = null, encode = null)(None)
- corruptedFieldsInGroup += Primitive(15, "raw_value", "raw_value", 0, AlphaNumeric("X(50)", 50, compact = Some(COMP9())), decode = null, encode = null)(None)
+ corruptedFieldsInGroup += Primitive(15, "raw_value", "raw_value", 0, AlphaNumeric("X(50)", 50, enc = Some(RAW), compact = Some(COMP4())), decode = null, encode = null)(None)
- Group(10, "_corrupted_fields", "_corrupted_fields", 0, children = corruptedFieldsInGroup )(None)
+ Group(10, "_corrupted_fields", "_corrupted_fields", 0, children = corruptedFieldsInGroup, occurs = Some(10))(None)
}
}
From 3d1a9a8775f5ab92fe545c03f202a0cabba4cc98 Mon Sep 17 00:00:00 2001
From: Ruslan Iushchenko
Date: Wed, 11 Feb 2026 07:29:44 +0100
Subject: [PATCH 5/7] Refactor CobolSchema to use builder pattern for improved
readability and flexibility in schema creation.
---
.../spark/cobol/schema/CobolSchema.scala | 99 ++++++-
.../cobol/CobolSchemaHierarchicalSpec.scala | 3 +-
.../cobrix/spark/cobol/CobolSchemaSpec.scala | 246 ++++++++++++++++--
.../cobol/source/base/SparkSchemaSpec.scala | 4 +-
.../source/base/impl/DummyCobolSchema.scala | 18 +-
.../text/Test02TextFilesOldSchool.scala | 8 +-
6 files changed, 336 insertions(+), 42 deletions(-)
diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala
index c5d441a19..ba3d842a5 100644
--- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala
+++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala
@@ -52,15 +52,15 @@ import scala.collection.mutable.ArrayBuffer
*/
class CobolSchema(copybook: Copybook,
schemaRetentionPolicy: SchemaRetentionPolicy,
- isDisplayAlwaysString: Boolean = false,
- strictIntegralPrecision: Boolean = false,
- inputFileNameField: String = "",
- generateRecordId: Boolean = false,
- generateRecordBytes: Boolean = false,
- generateCorruptedFields: Boolean = false,
- generateSegIdFieldsCnt: Int = 0,
- segmentIdProvidedPrefix: String = "",
- metadataPolicy: MetadataPolicy = MetadataPolicy.Basic)
+ isDisplayAlwaysString: Boolean,
+ strictIntegralPrecision: Boolean,
+ inputFileNameField: String,
+ generateRecordId: Boolean,
+ generateRecordBytes: Boolean,
+ generateCorruptedFields: Boolean,
+ generateSegIdFieldsCnt: Int,
+ segmentIdProvidedPrefix: String,
+ metadataPolicy: MetadataPolicy)
extends CobolReaderSchema(copybook,
schemaRetentionPolicy,
isDisplayAlwaysString,
@@ -337,4 +337,85 @@ object CobolSchema {
CobolSchema.fromBaseReader(CobolReaderSchema.fromReaderParameters(copyBookContents, readerParameters))
}
+
+ def builder(copybook: Copybook): CobolSchemaBuilder = new CobolSchemaBuilder(copybook)
+
+ class CobolSchemaBuilder(copybook: Copybook) {
+ private var schemaRetentionPolicy: SchemaRetentionPolicy = SchemaRetentionPolicy.CollapseRoot
+ private var isDisplayAlwaysString: Boolean = false
+ private var strictIntegralPrecision: Boolean = false
+ private var inputFileNameField: String = ""
+ private var generateRecordId: Boolean = false
+ private var generateRecordBytes: Boolean = false
+ private var generateCorruptedFields: Boolean = false
+ private var generateSegIdFieldsCnt: Int = 0
+ private var segmentIdProvidedPrefix: String = ""
+ private var metadataPolicy: MetadataPolicy = MetadataPolicy.Basic
+
+ def withSchemaRetentionPolicy(schemaRetentionPolicy: SchemaRetentionPolicy): CobolSchemaBuilder = {
+ this.schemaRetentionPolicy = schemaRetentionPolicy
+ this
+ }
+
+ def withIsDisplayAlwaysString(isDisplayAlwaysString: Boolean): CobolSchemaBuilder = {
+ this.isDisplayAlwaysString = isDisplayAlwaysString
+ this
+ }
+
+ def withStrictIntegralPrecision(strictIntegralPrecision: Boolean): CobolSchemaBuilder = {
+ this.strictIntegralPrecision = strictIntegralPrecision
+ this
+ }
+
+ def withInputFileNameField(inputFileNameField: String): CobolSchemaBuilder = {
+ this.inputFileNameField = inputFileNameField
+ this
+ }
+
+ def withGenerateRecordId(generateRecordId: Boolean): CobolSchemaBuilder = {
+ this.generateRecordId = generateRecordId
+ this
+ }
+
+ def withGenerateRecordBytes(generateRecordBytes: Boolean): CobolSchemaBuilder = {
+ this.generateRecordBytes = generateRecordBytes
+ this
+ }
+
+ def withGenerateCorruptedFields(generateCorruptedFields: Boolean): CobolSchemaBuilder = {
+ this.generateCorruptedFields = generateCorruptedFields
+ this
+ }
+
+ def withGenerateSegIdFieldsCnt(generateSegIdFieldsCnt: Int): CobolSchemaBuilder = {
+ this.generateSegIdFieldsCnt = generateSegIdFieldsCnt
+ this
+ }
+
+ def withSegmentIdProvidedPrefix(segmentIdProvidedPrefix: String): CobolSchemaBuilder = {
+ this.segmentIdProvidedPrefix = segmentIdProvidedPrefix
+ this
+ }
+
+ def withMetadataPolicy(metadataPolicy: MetadataPolicy): CobolSchemaBuilder = {
+ this.metadataPolicy = metadataPolicy
+ this
+ }
+
+ def build(): CobolSchema = {
+ new CobolSchema(
+ copybook,
+ schemaRetentionPolicy,
+ isDisplayAlwaysString,
+ strictIntegralPrecision,
+ inputFileNameField,
+ generateRecordId,
+ generateRecordBytes,
+ generateCorruptedFields,
+ generateSegIdFieldsCnt,
+ segmentIdProvidedPrefix,
+ metadataPolicy
+ )
+ }
+ }
}
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaHierarchicalSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaHierarchicalSpec.scala
index 27d8c44e8..0d530bfb2 100644
--- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaHierarchicalSpec.scala
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaHierarchicalSpec.scala
@@ -18,7 +18,6 @@ package za.co.absa.cobrix.spark.cobol
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.cobrix.cobol.parser.CopybookParser
-import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy
import za.co.absa.cobrix.spark.cobol.schema.CobolSchema
import scala.collection.immutable.HashMap
@@ -102,6 +101,6 @@ class CobolSchemaHierarchicalSpec extends AnyWordSpec {
private def parseSchema(copybook: String, segmentRedefines: List[String], fieldParentMap: Map[String, String]): CobolSchema = {
val parsedSchema = CopybookParser.parseTree(copybook, segmentRedefines = segmentRedefines, fieldParentMap = fieldParentMap)
- new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "",false, false)
+ CobolSchema.builder(parsedSchema).build()
}
}
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala
index 81f73e9c9..5bdf38643 100644
--- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala
@@ -20,6 +20,7 @@ import org.apache.spark.sql.types._
import org.scalatest.wordspec.AnyWordSpec
import org.slf4j.{Logger, LoggerFactory}
import za.co.absa.cobrix.cobol.parser.CopybookParser
+import za.co.absa.cobrix.cobol.parser.policies.MetadataPolicy
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy
import za.co.absa.cobrix.spark.cobol.parameters.MetadataFields.MAX_LENGTH
import za.co.absa.cobrix.spark.cobol.schema.CobolSchema
@@ -55,7 +56,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBookContents)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", false, false)
+ val cobolSchema = CobolSchema.builder(parsedSchema).build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -73,7 +74,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBookContents)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, true, "", false, false)
+ val cobolSchema = CobolSchema.builder(parsedSchema).withStrictIntegralPrecision(true).build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -94,7 +95,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBookContents)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", true, false)
+ val cobolSchema = CobolSchema.builder(parsedSchema).withGenerateRecordId(true).build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -113,7 +114,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBookContents)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", false, true)
+ val cobolSchema = CobolSchema.builder(parsedSchema).withGenerateRecordBytes(true).build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -135,7 +136,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBookContents)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", true, true)
+ val cobolSchema = CobolSchema.builder(parsedSchema).withGenerateRecordId(true).withGenerateRecordBytes(true).build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -163,7 +164,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", true, false)
+ val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal).withGenerateRecordId(true).build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -179,7 +180,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, false)
+ val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal).build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -196,7 +197,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", true, false)
+ val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot).withGenerateRecordId(true).build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -211,7 +212,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", false, true)
+ val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot).withGenerateRecordBytes(true).build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -229,7 +230,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", true, true)
+ val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot).withGenerateRecordBytes(true).withGenerateRecordId(true).build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -243,7 +244,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", false, false)
+ val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot).build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -272,7 +273,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| | |-- STR_FLD: string (nullable = true)
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", true, false, false, 2)
+ val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal).withGenerateRecordId(true).withGenerateSegIdFieldsCnt(2).build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -290,7 +291,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| | |-- STR_FLD: string (nullable = true)
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, true, false, 2)
+ val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal).withGenerateRecordBytes(true).withGenerateSegIdFieldsCnt(2).build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -311,7 +312,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| | | |-- raw_value: binary (nullable = false)
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, false, true, 2)
+ val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal).withGenerateCorruptedFields(true).withGenerateSegIdFieldsCnt(2).build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -332,7 +333,12 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| | |-- STR_FLD: string (nullable = true)
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", true, true, false, 2)
+ val cobolSchema = CobolSchema.builder(parsedSchema)
+ .withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal)
+ .withGenerateRecordId(true)
+ .withGenerateRecordBytes(true)
+ .withGenerateSegIdFieldsCnt(2)
+ .build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -353,7 +359,13 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| | |-- STR_FLD: string (nullable = true)
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, true, "", true, true, false, 2)
+ val cobolSchema = CobolSchema.builder(parsedSchema)
+ .withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal)
+ .withStrictIntegralPrecision(true)
+ .withGenerateRecordId(true)
+ .withGenerateRecordBytes(true)
+ .withGenerateSegIdFieldsCnt(2)
+ .build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -370,7 +382,11 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| | |-- STR_FLD: string (nullable = true)
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, false, false, 2)
+ val cobolSchema = CobolSchema.builder(parsedSchema)
+ .withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal)
+ .withGenerateSegIdFieldsCnt(2)
+ .build()
+
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -388,7 +404,11 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| |-- STR_FLD: string (nullable = true)
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", true, false, false, 2)
+ val cobolSchema = CobolSchema.builder(parsedSchema)
+ .withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot)
+ .withGenerateRecordId(true)
+ .withGenerateSegIdFieldsCnt(2)
+ .build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -403,7 +423,11 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| |-- STR_FLD: string (nullable = true)
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", false, false, false, 2)
+ val cobolSchema = CobolSchema.builder(parsedSchema)
+ .withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot)
+ .withGenerateSegIdFieldsCnt(2)
+ .build()
+
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -422,7 +446,8 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema1 = new CobolSchema(parsedSchema, SchemaRetentionPolicy.KeepOriginal, false, false, "", false, false)
+ val cobolSchema1 = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal).build()
+
val actualSparkSchema = cobolSchema1.getSparkSchema
val rootField = actualSparkSchema.fields.head.dataType.asInstanceOf[StructType]
@@ -451,7 +476,10 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema1 = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "", false, false)
+ val cobolSchema1 = CobolSchema.builder(parsedSchema)
+ .withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot)
+ .build()
+
val actualSparkSchema = cobolSchema1.getSparkSchema
val metadataStr1 = actualSparkSchema.fields.head.metadata
@@ -485,7 +513,12 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| |-- NUM3: decimal(9,8) (nullable = true)""".stripMargin
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, true, false, "", false, false, false, 2)
+ val cobolSchema = CobolSchema.builder(parsedSchema)
+ .withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot)
+ .withIsDisplayAlwaysString(true)
+ .withGenerateSegIdFieldsCnt(2)
+ .build()
+
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -707,4 +740,173 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
assert(cobolSchema.getMaximumSegmentIdLength("ID_") == 53)
}
}
+
+ "builder" should {
+ "create schema with default settings using builder" in {
+ val copybook: String =
+ """ 01 RECORD.
+ | 05 STR1 PIC X(10).
+ | 05 NUM2 PIC 9(7).
+ |""".stripMargin
+
+ val parsedCopybook = CopybookParser.parse(copybook)
+ val cobolSchema = CobolSchema.builder(parsedCopybook)
+ .build()
+
+ val sparkSchema = cobolSchema.getSparkSchema
+
+ assert(sparkSchema.fields.length == 2)
+ assert(sparkSchema.fields.head.name == "STR1")
+ assert(sparkSchema.fields.head.dataType == StringType)
+ assert(sparkSchema.fields(1).name == "NUM2")
+ assert(sparkSchema.fields(1).dataType == IntegerType)
+ }
+
+ "create schema with strict integral precision using builder" in {
+ val copybook: String =
+ """ 01 RECORD.
+ | 05 NUM1 PIC 9(12).
+ | 05 NUM2 PIC S9(7).
+ |""".stripMargin
+
+ val parsedCopybook = CopybookParser.parse(copybook)
+ val cobolSchema = CobolSchema.builder(parsedCopybook)
+ .withStrictIntegralPrecision(true)
+ .build()
+
+ val sparkSchema = cobolSchema.getSparkSchema
+
+ assert(sparkSchema.fields.length == 2)
+ assert(sparkSchema.fields.head.name == "NUM1")
+ assert(sparkSchema.fields.head.dataType == DecimalType(12, 0))
+ assert(sparkSchema.fields(1).name == "NUM2")
+ assert(sparkSchema.fields(1).dataType == DecimalType(7, 0))
+ }
+
+ "create schema with record id generation using builder" in {
+ val copybook: String =
+ """ 01 RECORD.
+ | 05 STR1 PIC X(10).
+ |""".stripMargin
+
+ val parsedCopybook = CopybookParser.parse(copybook)
+ val cobolSchema = CobolSchema.builder(parsedCopybook)
+ .withGenerateRecordId(true)
+ .build()
+
+ val sparkSchema = cobolSchema.getSparkSchema
+
+ assert(sparkSchema.fields.length == 4)
+ assert(sparkSchema.fields.head.name == "File_Id")
+ assert(sparkSchema.fields.head.dataType == IntegerType)
+ assert(sparkSchema.fields(1).name == "Record_Id")
+ assert(sparkSchema.fields(1).dataType == LongType)
+ assert(sparkSchema.fields(2).name == "Record_Byte_Length")
+ assert(sparkSchema.fields(2).dataType == IntegerType)
+ assert(sparkSchema.fields(3).name == "STR1")
+ }
+
+ "create schema with record bytes generation using builder" in {
+ val copybook: String =
+ """ 01 RECORD.
+ | 05 STR1 PIC X(10).
+ |""".stripMargin
+ val parsedCopybook = CopybookParser.parse(copybook)
+ val cobolSchema = CobolSchema.builder(parsedCopybook)
+ .withGenerateRecordBytes(true)
+ .build()
+
+ val sparkSchema = cobolSchema.getSparkSchema
+
+ assert(sparkSchema.fields.length == 2)
+ assert(sparkSchema.fields.head.name == "Record_Bytes")
+ assert(sparkSchema.fields.head.dataType == BinaryType)
+ assert(sparkSchema.fields(1).name == "STR1")
+ }
+
+ "create schema with both record id and bytes using builder" in {
+ val copybook: String =
+ """ 01 RECORD.
+ | 05 STR1 PIC X(10).
+ |""".stripMargin
+
+ val parsedCopybook = CopybookParser.parse(copybook)
+ val cobolSchema = CobolSchema.builder(parsedCopybook)
+ .withGenerateRecordId(true)
+ .withGenerateRecordBytes(true)
+ .build()
+
+ val sparkSchema = cobolSchema.getSparkSchema
+
+ assert(sparkSchema.fields.length == 5)
+ assert(sparkSchema.fields.head.name == "File_Id")
+ assert(sparkSchema.fields(1).name == "Record_Id")
+ assert(sparkSchema.fields(2).name == "Record_Byte_Length")
+ assert(sparkSchema.fields(3).name == "Record_Bytes")
+ assert(sparkSchema.fields(3).dataType == BinaryType)
+ assert(sparkSchema.fields(4).name == "STR1")
+ }
+
+ "create schema with schema retention policy using builder" in {
+ val copybook: String =
+ """ 01 STRUCT1.
+ | 05 IntValue PIC 9(6) COMP.
+ | 01 STRUCT2.
+ | 10 STR-FLD PIC X(10).
+ |""".stripMargin
+
+ val parsedCopybook = CopybookParser.parse(copybook)
+ val cobolSchema = CobolSchema.builder(parsedCopybook)
+ .withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal)
+ .build()
+
+ val sparkSchema = cobolSchema.getSparkSchema
+
+ assert(sparkSchema.fields.length == 2)
+ assert(sparkSchema.fields.head.name == "STRUCT1")
+ assert(sparkSchema.fields.head.dataType.isInstanceOf[StructType])
+ assert(sparkSchema.fields(1).name == "STRUCT2")
+ assert(sparkSchema.fields(1).dataType.isInstanceOf[StructType])
+ }
+
+ "create schema with corrupted fields using builder" in {
+ val copybook: String =
+ """ 01 RECORD.
+ | 05 STR1 PIC X(10).
+ |""".stripMargin
+ val parsedCopybook = CopybookParser.parse(copybook)
+ val cobolSchema = CobolSchema.builder(parsedCopybook)
+ .withGenerateCorruptedFields(true)
+ .build()
+
+ val sparkSchema = cobolSchema.getSparkSchema
+
+ assert(sparkSchema.fields.length == 2)
+ assert(sparkSchema.fields(1).name == "_corrupted_fields")
+ assert(sparkSchema.fields(1).dataType.isInstanceOf[ArrayType])
+ }
+
+ "create schema with various options" in {
+ val copybook: String =
+ """ 01 RECORD.
+ | 05 NUM1 PIC 9(10).
+ |""".stripMargin
+ val parsedCopybook = CopybookParser.parse(copybook)
+ val cobolSchema = CobolSchema.builder(parsedCopybook)
+ .withIsDisplayAlwaysString(true)
+ .withInputFileNameField("file_name")
+ .withGenerateSegIdFieldsCnt(1)
+ .withSegmentIdProvidedPrefix("segid")
+ .withMetadataPolicy(MetadataPolicy.Extended)
+ .build()
+
+ val sparkSchema = cobolSchema.getSparkSchema
+
+ assert(sparkSchema.fields.length == 3)
+ assert(sparkSchema.fields(0).name == "file_name")
+ assert(sparkSchema.fields(1).name == "Seg_Id0")
+ assert(sparkSchema.fields(2).name == "NUM1")
+ assert(sparkSchema.fields(2).dataType.isInstanceOf[StringType])
+ }
+ }
}
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/SparkSchemaSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/SparkSchemaSpec.scala
index 3731efa8a..9ee30bbb3 100644
--- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/SparkSchemaSpec.scala
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/SparkSchemaSpec.scala
@@ -37,7 +37,9 @@ class SparkSchemaSpec extends AnyFunSuite {
val parsedSchema = CopybookParser.parseTree(copyBookContents)
- val cobolSchema = new CobolSchema(parsedSchema, SchemaRetentionPolicy.CollapseRoot, false, false, "",false, false)
+ val cobolSchema = CobolSchema.builder(parsedSchema)
+ .withSchemaRetentionPolicy(SchemaRetentionPolicy.CollapseRoot)
+ .build()
val sparkSchema = cobolSchema.getSparkSchema
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyCobolSchema.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyCobolSchema.scala
index 77fb982b1..790420cad 100644
--- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyCobolSchema.scala
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyCobolSchema.scala
@@ -17,14 +17,24 @@
package za.co.absa.cobrix.spark.cobol.source.base.impl
import org.apache.spark.sql.types.StructType
-import za.co.absa.cobrix.spark.cobol.schema.CobolSchema
import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.parser.ast.Group
+import za.co.absa.cobrix.cobol.parser.policies.MetadataPolicy
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy
+import za.co.absa.cobrix.spark.cobol.schema.CobolSchema
-import scala.collection.Seq
-
-class DummyCobolSchema(val sparkSchema: StructType) extends CobolSchema(new Copybook(Group.root), SchemaRetentionPolicy.KeepOriginal, false, false, "", false, false) with Serializable {
+class DummyCobolSchema(val sparkSchema: StructType) extends CobolSchema(
+ new Copybook(Group.root),
+ SchemaRetentionPolicy.KeepOriginal,
+ false,
+ false,
+ "",
+ false,
+ false,
+ false,
+ 0,
+ "",
+ MetadataPolicy.Basic) with Serializable {
override def getSparkSchema: StructType = sparkSchema
override lazy val getRecordSize: Int = 40
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/text/Test02TextFilesOldSchool.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/text/Test02TextFilesOldSchool.scala
index 746a41c19..71c3cf4bc 100644
--- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/text/Test02TextFilesOldSchool.scala
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/text/Test02TextFilesOldSchool.scala
@@ -16,8 +16,6 @@
package za.co.absa.cobrix.spark.cobol.source.text
-import java.nio.charset.StandardCharsets
-
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.scalatest.funsuite.AnyFunSuite
@@ -32,6 +30,8 @@ import za.co.absa.cobrix.spark.cobol.schema.CobolSchema
import za.co.absa.cobrix.spark.cobol.source.base.{SimpleComparisonBase, SparkTestBase}
import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture
+import java.nio.charset.StandardCharsets
+
class Test02TextFilesOldSchool extends AnyFunSuite with SparkTestBase with BinaryFileFixture with SimpleComparisonBase {
private implicit val logger: Logger = LoggerFactory.getLogger(this.getClass)
@@ -52,7 +52,7 @@ class Test02TextFilesOldSchool extends AnyFunSuite with SparkTestBase with Binar
withTempTextFile("text_ascii", ".txt", StandardCharsets.UTF_8, textFileContent) { tmpFileName =>
val parsedCopybook = CopybookParser.parse(copybook, dataEncoding = ASCII, stringTrimmingPolicy = StringTrimmingPolicy.TrimNone)
- val cobolSchema = new CobolSchema(parsedCopybook, SchemaRetentionPolicy.CollapseRoot, false, false, "", false)
+ val cobolSchema = CobolSchema.builder(parsedCopybook).build()
val sparkSchema = cobolSchema.getSparkSchema
val rddText = spark.sparkContext
@@ -61,7 +61,7 @@ class Test02TextFilesOldSchool extends AnyFunSuite with SparkTestBase with Binar
val recordHandler = new RowHandler()
val rddRow = rddText
- .filter(str => str.length > 0)
+ .filter(str => str.nonEmpty)
.map(str => {
val record = RecordExtractors.extractRecord[GenericRow](parsedCopybook.ast,
str.getBytes(),
From 620daebd2f63eb205cd029551cce2538e1c40156 Mon Sep 17 00:00:00 2001
From: Ruslan Iushchenko
Date: Wed, 11 Feb 2026 09:28:45 +0100
Subject: [PATCH 6/7] #723 Fix numerous PR suggestions about corrupt field
generation.
---
README.md | 12 +--
.../cobrix/cobol/parser/ast/Primitive.scala | 44 ++++++++-
.../cobol/parser/common/Constants.scala | 2 +-
...orruptedField.scala => CorruptField.scala} | 8 +-
.../extractors/record/RecordExtractors.scala | 98 +++++++++----------
.../iterator/FixedLenNestedRowIterator.scala | 2 +-
.../reader/parameters/CobolParameters.scala | 2 +-
.../parameters/CobolParametersParser.scala | 12 +--
.../cobol/reader/schema/CobolSchema.scala | 4 +-
.../builder/SparkCobolOptionsBuilder.scala | 2 +-
.../spark/cobol/schema/CobolSchema.scala | 24 ++---
.../cobrix/spark/cobol/CobolSchemaSpec.scala | 12 +--
.../integration/Test17HierarchicalSpec.scala | 9 +-
.../Test35GeneratedRecordBytes.scala | 3 +-
...ec.scala => Test41CorruptFieldsSpec.scala} | 26 ++---
15 files changed, 150 insertions(+), 110 deletions(-)
rename cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/{CorruptedField.scala => CorruptField.scala} (81%)
rename spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/{Test41CorruptedFieldsSpec.scala => Test41CorruptFieldsSpec.scala} (83%)
diff --git a/README.md b/README.md
index 30704f10c..a16b56bfd 100644
--- a/README.md
+++ b/README.md
@@ -597,9 +597,9 @@ root
|-- Record_Bytes: binary (nullable = false)
```
-You can generate `_corrupted_fields` that will contain original binary values of fields Cobrix was unable to decode:
-```
-.option("generate_corrupted_fields", "true")
+You can generate `_corrupt_fields` that will contain original binary values of fields Cobrix was unable to decode:
+```scala
+.option("generate_corrupt_fields", "true")
```
### Locality optimization for variable-length records parsing
@@ -1562,7 +1562,7 @@ The output looks like this:
| .option("non_terminals", "GROUP1,GROUP2") | Specifies groups to also be added to the schema as string fields. When this option is specified, the reader will add one extra data field after each matching group containing the string data for the group. |
| .option("generate_record_id", false) | Generate autoincremental 'File_Id', 'Record_Id' and 'Record_Byte_Length' fields. This is used for processing record order dependent data. |
| .option("generate_record_bytes", false) | Generate 'Record_Bytes', the binary field that contains raw contents of the original unparsed records. |
-| .option("generate_corrupted_fields", false) | Generate `_corrupted_fields` field that contains values of fields Cobrix was unable to decode. |
+| .option("generate_corrupt_fields", false) | Generate `_corrupt_fields` field that contains values of fields Cobrix was unable to decode. |
| .option("with_input_file_name_col", "file_name") | Generates a column containing input file name for each record (Similar to Spark SQL `input_file_name()` function). The column name is specified by the value of the option. This option only works for variable record length files. For fixed record length and ASCII files use `input_file_name()`. |
| .option("metadata", "basic") | Specifies wat kind of metadata to include in the Spark schema: `false`, `basic`(default), or `extended` (PIC, usage, etc). |
| .option("debug", "hex") | If specified, each primitive field will be accompanied by a debug field containing raw bytes from the source file. Possible values: `none` (default), `hex`, `binary`, `string` (ASCII only). The legacy value `true` is supported and will generate debug fields in HEX. |
@@ -1952,10 +1952,10 @@ A: Update hadoop dll to version 3.2.2 or newer.
## Changelog
- #### 2.9.8 will be released soon.
- - [#723](https://github.com/AbsaOSS/cobrix/pull/723) Added the option to generate `corrupted_fields` field that contains field names and raw values
+ - [#723](https://github.com/AbsaOSS/cobrix/pull/723) Added the option to generate `corrupt_fields` field that contains field names and raw values
of fields that Cobrix couldn't decode.
```scala
- .option("generate_corrupted_fields", "true")
+ .option("generate_corrupt_fields", "true")
```
- #### 2.9.7 released 29 January 2026.
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala
index 34ca9b903..1019dcc7c 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala
@@ -16,9 +16,9 @@
package za.co.absa.cobrix.cobol.parser.ast
-import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, CobolType, Decimal, Integral}
+import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, COMP3, CobolType, Decimal, Integral}
import za.co.absa.cobrix.cobol.parser.decoders.{BinaryUtils, DecoderSelector}
-import za.co.absa.cobrix.cobol.parser.encoding.EncoderSelector
+import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC, EncoderSelector}
/** An abstraction of the statements describing fields of primitive data types in the COBOL copybook
*
@@ -63,6 +63,44 @@ case class Primitive(
/** This is cached value specifying if the field is a string */
private val isString = dataType.isInstanceOf[AlphaNumeric]
+ /** This is cached value to speedup checking for empty values */
+ private val spaceChar: Byte = {
+ dataType match {
+ case t: AlphaNumeric =>
+ t.enc match {
+ case Some(EBCDIC) => 0x40
+ case Some(ASCII) => 0x20
+ case Some(_) => 0
+ case None => 0x40
+ }
+ case t: Integral =>
+ t.compact match {
+ case Some(COMP3()) => 0x40
+ case Some(_) => 0
+ case None =>
+ t.enc match {
+ case Some(EBCDIC) => 0x40
+ case Some(ASCII) => 0x20
+ case Some(_) => 0
+ case None => 0x40
+ }
+ }
+ case t: Decimal =>
+ t.compact match {
+ case Some(COMP3()) => 0x40
+ case Some(_) => 0
+ case None =>
+ t.enc match {
+ case Some(EBCDIC) => 0x40
+ case Some(ASCII) => 0x20
+ case Some(_) => 0
+ case None => 0x40
+ }
+ }
+ case _ => 0
+ }
+ }
+
/** Returns a string representation of the field */
override def toString: String = {
s"${" " * 2 * level}$camelCased ${camelCase(redefines.getOrElse(""))} $dataType"
@@ -147,7 +185,7 @@ case class Primitive(
}
var i = idx
while (i < endIndex) {
- if (record(i) != 0 && record(i) != 0x40) {
+ if (record(i) != 0 && record(i) != spaceChar) {
return false
}
i += 1
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala
index 7f18fd3af..979ee7790 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/common/Constants.scala
@@ -64,7 +64,7 @@ object Constants {
val recordIdField = "Record_Id"
val recordByteLength = "Record_Byte_Length"
val recordBytes = "Record_Bytes"
- val corruptedFieldsField = "_corrupted_fields"
+ val corruptFieldsField = "_corrupt_fields"
val fieldNameColumn = "field_name"
val rawValueColumn = "raw_value"
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptedField.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptField.scala
similarity index 81%
rename from cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptedField.scala
rename to cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptField.scala
index aa2038fd1..a6bf54347 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptedField.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/CorruptField.scala
@@ -16,7 +16,7 @@
package za.co.absa.cobrix.cobol.reader.extractors.record
-case class CorruptedField(
- fieldName: String,
- rawValue: Array[Byte]
- )
+case class CorruptField(
+ fieldName: String,
+ rawValue: Array[Byte]
+ )
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala
index daaa46e4a..fdabc52ee 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala
@@ -29,7 +29,7 @@ import scala.reflect.ClassTag
object RecordExtractors {
- private val corruptedFieldsGroup = getCorruptedFieldsGroup
+ private val corruptFieldsGroup = getCorruptFieldsGroup
/**
* This method extracts a record from the specified array of bytes. The copybook for the record needs to be already parsed.
@@ -41,7 +41,7 @@ object RecordExtractors {
* @param variableLengthOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements.
* @param generateRecordId If true, a record id field will be added as the first field of the record.
* @param generateRecordBytes If true, a record bytes field will be added at the beginning of each record.
- * @param generateCorruptedFields If true, a corrupted record field will be appended to the end of the schema.
+ * @param generateCorruptFields If true, a corrupt fields field will be appended to the end of the schema.
* @param segmentLevelIds Segment ids to put to the extracted record if id generation it turned on.
* @param fileId A file id to be put to the extractor record if generateRecordId == true.
* @param recordId The record id to be saved to the record id field.
@@ -53,24 +53,24 @@ object RecordExtractors {
*/
@throws(classOf[IllegalStateException])
def extractRecord[T: ClassTag](
- ast: CopybookAST,
- data: Array[Byte],
- offsetBytes: Int = 0,
- policy: SchemaRetentionPolicy = SchemaRetentionPolicy.KeepOriginal,
- variableLengthOccurs: Boolean = false,
- generateRecordId: Boolean = false,
- generateRecordBytes: Boolean = false,
- generateCorruptedFields: Boolean = false,
- segmentLevelIds: List[String] = Nil,
- fileId: Int = 0,
- recordId: Long = 0,
- activeSegmentRedefine: String = "",
- generateInputFileField: Boolean = false,
- inputFileName: String = "",
- handler: RecordHandler[T]
+ ast: CopybookAST,
+ data: Array[Byte],
+ offsetBytes: Int = 0,
+ policy: SchemaRetentionPolicy = SchemaRetentionPolicy.KeepOriginal,
+ variableLengthOccurs: Boolean = false,
+ generateRecordId: Boolean = false,
+ generateRecordBytes: Boolean = false,
+ generateCorruptFields: Boolean = false,
+ segmentLevelIds: List[String] = Nil,
+ fileId: Int = 0,
+ recordId: Long = 0,
+ activeSegmentRedefine: String = "",
+ generateInputFileField: Boolean = false,
+ inputFileName: String = "",
+ handler: RecordHandler[T]
): Seq[Any] = {
val dependFields = scala.collection.mutable.HashMap.empty[String, Either[Int, String]]
- val corruptedFields = new ArrayBuffer[CorruptedField]
+ val corruptFields = new ArrayBuffer[CorruptField]
val isAstFlat = ast.children.exists(_.isInstanceOf[Primitive])
@@ -110,8 +110,8 @@ object RecordExtractors {
var j = 0
while (i < actualSize) {
val value = s.decodeTypeValue(offset, data)
- if (value == null && generateCorruptedFields && !s.isEmpty(offset, data)) {
- corruptedFields += CorruptedField(s"${field.name}[$i]", s.getRawValue(offset,data))
+ if (value == null && generateCorruptFields && !s.isEmpty(offset, data)) {
+ corruptFields += CorruptField(s"${field.name}[$i]", s.getRawValue(offset,data))
}
offset += s.binaryProperties.dataSize
values(j) = value
@@ -137,8 +137,8 @@ object RecordExtractors {
}
case st: Primitive =>
val value = st.decodeTypeValue(useOffset, data)
- if (value == null && generateCorruptedFields && !st.isEmpty(useOffset, data)) {
- corruptedFields += CorruptedField(field.name, st.getRawValue(useOffset,data))
+ if (value == null && generateCorruptFields && !st.isEmpty(useOffset, data)) {
+ corruptFields += CorruptField(field.name, st.getRawValue(useOffset,data))
}
if (value != null && st.isDependee) {
val intStringVal: Either[Int, String] = value match {
@@ -212,7 +212,7 @@ object RecordExtractors {
policy
}
- applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes, generateCorruptedFields, segmentLevelIds, fileId, recordId, data.length, data, generateInputFileField, inputFileName, corruptedFields, handler)
+ applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes, generateCorruptFields, segmentLevelIds, fileId, recordId, data.length, data, generateInputFileField, inputFileName, corruptFields, handler)
}
/**
@@ -432,7 +432,7 @@ object RecordExtractors {
policy
}
- applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes = false, generateCorruptedFields = false, Nil, fileId, recordId, recordLength, Array.empty[Byte], generateInputFileField = generateInputFileField, inputFileName, null, handler)
+ applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes = false, generateCorruptFields = false, Nil, fileId, recordId, recordLength, Array.empty[Byte], generateInputFileField = generateInputFileField, inputFileName, null, handler)
}
/**
@@ -452,7 +452,7 @@ object RecordExtractors {
* @param records The array of [[T]] object for each Group of the copybook
* @param generateRecordId If true a record id field will be added as the first field of the record.
* @param generateRecordBytes If true a record bytes field will be added at the beginning of the record.
- * @param generateCorruptedFields If true, a corrupted record field will be appended to the end of the schema.
+ * @param generateCorruptFields If true,a corrupt fields field will be appended to the end of the schema.
* @param fileId The file id to be saved to the file id field
* @param recordId The record id to be saved to the record id field
* @param recordByteLength The length of the record
@@ -461,21 +461,21 @@ object RecordExtractors {
* @return A [[T]] object corresponding to the record schema
*/
private def applyRecordPostProcessing[T](
- ast: CopybookAST,
- records: List[T],
- policy: SchemaRetentionPolicy,
- generateRecordId: Boolean,
- generateRecordBytes: Boolean,
- generateCorruptedFields: Boolean,
- segmentLevelIds: List[String],
- fileId: Int,
- recordId: Long,
- recordByteLength: Int,
- recordBytes: Array[Byte],
- generateInputFileField: Boolean,
- inputFileName: String,
- corruptedFields: ArrayBuffer[CorruptedField],
- handler: RecordHandler[T]
+ ast: CopybookAST,
+ records: List[T],
+ policy: SchemaRetentionPolicy,
+ generateRecordId: Boolean,
+ generateRecordBytes: Boolean,
+ generateCorruptFields: Boolean,
+ segmentLevelIds: List[String],
+ fileId: Int,
+ recordId: Long,
+ recordByteLength: Int,
+ recordBytes: Array[Byte],
+ generateInputFileField: Boolean,
+ inputFileName: String,
+ corruptFields: ArrayBuffer[CorruptField],
+ handler: RecordHandler[T]
): Seq[Any] = {
val outputRecords = new ListBuffer[Any]
@@ -508,13 +508,13 @@ object RecordExtractors {
records.foreach(record => outputRecords.append(record))
}
- if (generateCorruptedFields && corruptedFields != null) {
+ if (generateCorruptFields && corruptFields != null) {
// Ugly but efficient implementation of converting errors as an array field
- val len = corruptedFields.length
+ val len = corruptFields.length
val ar = new Array[Any](len)
var i = 0
while (i < len) {
- val r = handler.create(Array[Any](corruptedFields(i).fieldName, corruptedFields(i).rawValue), corruptedFieldsGroup)
+ val r = handler.create(Array[Any](corruptFields(i).fieldName, corruptFields(i).rawValue), corruptFieldsGroup)
ar(i) = r
i += 1
}
@@ -526,15 +526,15 @@ object RecordExtractors {
}
/**
- * Constructs a Group object representing corrupted fields. It is only needed for constructing records that require field names,
+ * Constructs a Group object representing corrupt fields. It is only needed for constructing records that require field names,
* such as JSON. Field sizes and encoding do not really matter
*/
- private def getCorruptedFieldsGroup: Group = {
- val corruptedFieldsInGroup = new mutable.ArrayBuffer[Statement]
+ private def getCorruptFieldsGroup: Group = {
+ val corruptFieldsInGroup = new mutable.ArrayBuffer[Statement]
- corruptedFieldsInGroup += Primitive(15, "field_name", "field_name", 0, AlphaNumeric("X(50)", 50), decode = null, encode = null)(None)
- corruptedFieldsInGroup += Primitive(15, "raw_value", "raw_value", 0, AlphaNumeric("X(50)", 50, enc = Some(RAW), compact = Some(COMP4())), decode = null, encode = null)(None)
+ corruptFieldsInGroup += Primitive(15, "field_name", "field_name", 0, AlphaNumeric("X(50)", 50), decode = null, encode = null)(None)
+ corruptFieldsInGroup += Primitive(15, "raw_value", "raw_value", 0, AlphaNumeric("X(50)", 50, enc = Some(RAW), compact = Some(COMP4())), decode = null, encode = null)(None)
- Group(10, "_corrupted_fields", "_corrupted_fields", 0, children = corruptedFieldsInGroup, occurs = Some(10))(None)
+ Group(10, "_corrupt_fields", "_corrupt_fields", 0, children = corruptFieldsInGroup, occurs = Some(10))(None)
}
}
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala
index 8da001b6a..a6a8fb8d6 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala
@@ -90,7 +90,7 @@ class FixedLenNestedRowIterator[T: ClassTag](
readerProperties.schemaPolicy,
readerProperties.variableSizeOccurs,
generateRecordBytes = readerProperties.generateRecordBytes,
- generateCorruptedFields = readerProperties.generateCorruptFields,
+ generateCorruptFields = readerProperties.generateCorruptFields,
activeSegmentRedefine = activeSegmentRedefine,
handler = handler
)
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala
index e9bab4ff8..7a6b63432 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala
@@ -50,7 +50,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param generateCorruptFields Generate '_corrupt_fields' field for fields that haven't converted successfully
* @param schemaRetentionPolicy A copybook usually has a root group struct element that acts like a rowtag in XML. This can be retained in Spark schema or can be collapsed
* @param stringTrimmingPolicy Specify if and how strings should be trimmed when parsed
- * @param isDisplayAlwaysString If true, all fields having DISPLAY format will remain strings and won't be converted to numbers
+ * @param isDisplayAlwaysString If true, all fields having DISPLAY format will remain strings and won't be converted to numbers
* @param allowPartialRecords If true, partial ASCII records can be parsed (in cases when LF character is missing for example)
* @param multisegmentParams Parameters for reading multisegment mainframe files
* @param improvedNullDetection If true, string values that contain only zero bytes (0x0) will be considered null.
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala
index f87314ab5..6f9878eca 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala
@@ -66,7 +66,7 @@ object CobolParametersParser extends Logging {
// Schema transformation parameters
val PARAM_GENERATE_RECORD_ID = "generate_record_id"
val PARAM_GENERATE_RECORD_BYTES = "generate_record_bytes"
- val PARAM_CORRUPTED_FIELDS = "generate_corrupted_fields"
+ val PARAM_CORRUPT_FIELDS = "generate_corrupt_fields"
val PARAM_SCHEMA_RETENTION_POLICY = "schema_retention_policy"
val PARAM_GROUP_FILLERS = "drop_group_fillers"
val PARAM_VALUE_FILLERS = "drop_value_fillers"
@@ -287,7 +287,7 @@ object CobolParametersParser extends Logging {
variableLengthParams,
params.getOrElse(PARAM_VARIABLE_SIZE_OCCURS, "false").toBoolean,
params.getOrElse(PARAM_GENERATE_RECORD_BYTES, "false").toBoolean,
- params.getOrElse(PARAM_CORRUPTED_FIELDS, "false").toBoolean,
+ params.getOrElse(PARAM_CORRUPT_FIELDS, "false").toBoolean,
schemaRetentionPolicy,
stringTrimmingPolicy,
params.getOrElse(PARAM_DISPLAY_PIC_ALWAYS_STRING, "false").toBoolean,
@@ -891,12 +891,12 @@ object CobolParametersParser extends Logging {
throw new IllegalArgumentException(s"Options 'segment-children:*' cannot be used with 'segment_id_level*' or 'segment_id_root' " +
"since ID fields generation is not supported for hierarchical records reader.")
}
- if (params.contains(PARAM_GENERATE_RECORD_BYTES)) {
- throw new IllegalArgumentException(s"Option '$PARAM_GENERATE_RECORD_BYTES' cannot be used with 'segment-children:*' " +
+ if (params.contains(PARAM_GENERATE_RECORD_BYTES) && params(PARAM_GENERATE_RECORD_BYTES).toBoolean) {
+ throw new IllegalArgumentException(s"Option '$PARAM_GENERATE_RECORD_BYTES=true' cannot be used with 'segment-children:*' " +
"since hierarchical records are composites of more than one raw record.")
}
- if (params.contains(PARAM_CORRUPTED_FIELDS)) {
- throw new IllegalArgumentException(s"Option '$PARAM_CORRUPTED_FIELDS' cannot be used with 'segment-children:*' " +
+ if (params.contains(PARAM_CORRUPT_FIELDS) && params(PARAM_CORRUPT_FIELDS).toBoolean) {
+ throw new IllegalArgumentException(s"Option '$PARAM_CORRUPT_FIELDS=true' cannot be used with 'segment-children:*' " +
"at the moment.")
}
}
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala
index 5180e7778..69a025c8d 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala
@@ -39,7 +39,7 @@ import scala.collection.immutable.HashMap
* @param strictIntegralPrecision If true, Cobrix will not generate short/integer/long Spark data types, and always use decimal(n) with the exact precision that matches the copybook.
* @param generateRecordId If true, a record id field will be prepended to the beginning of the schema.
* @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema.
- * @param generateCorruptedFields If true, a corrupted record field will be appended to the end of the schema.
+ * @param generateCorruptFields If true, a corrupt fields field will be appended to the end of the schema.
* @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema.
* @param generateSegIdFieldsCnt A number of segment ID levels to generate
* @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used)
@@ -52,7 +52,7 @@ class CobolSchema(val copybook: Copybook,
val inputFileNameField: String,
val generateRecordId: Boolean,
val generateRecordBytes: Boolean,
- val generateCorruptedFields: Boolean,
+ val generateCorruptFields: Boolean,
val generateSegIdFieldsCnt: Int = 0,
val segmentIdProvidedPrefix: String = "",
val metadataPolicy: MetadataPolicy = MetadataPolicy.Basic) extends Serializable {
diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala
index 14c81e3c3..d827e20b1 100644
--- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala
+++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala
@@ -73,7 +73,7 @@ class SparkCobolOptionsBuilder(copybookContent: String)(implicit spark: SparkSes
variableLengthOccurs = readerParams.variableSizeOccurs,
generateRecordId = readerParams.generateRecordId,
generateRecordBytes = readerParams.generateRecordBytes,
- generateCorruptedFields = readerParams.generateCorruptFields,
+ generateCorruptFields = readerParams.generateCorruptFields,
handler = recordHandler)
Row.fromSeq(record)
})
diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala
index ba3d842a5..3beb8d8fc 100644
--- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala
+++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala
@@ -40,11 +40,11 @@ import scala.collection.mutable.ArrayBuffer
* provides the corresponding Spark schema and also other properties for the Spark data source.
*
* @param copybook A parsed copybook.
- * @param schemaRetentionPolicy pecifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook.
+ * @param schemaRetentionPolicy Specifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook.
* @param strictIntegralPrecision If true, Cobrix will not generate short/integer/long Spark data types, and always use decimal(n) with the exact precision that matches the copybook.
* @param generateRecordId If true, a record id field will be prepended to the beginning of the schema.
* @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema.
- * @param generateCorruptedFields If true, a corrupted record field will be appended to the beginning of the schema.
+ * @param generateCorruptFields If true, a corrupt fields field will be appended to the end of the schema.
* @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema.
* @param generateSegIdFieldsCnt A number of segment ID levels to generate
* @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used)
@@ -57,7 +57,7 @@ class CobolSchema(copybook: Copybook,
inputFileNameField: String,
generateRecordId: Boolean,
generateRecordBytes: Boolean,
- generateCorruptedFields: Boolean,
+ generateCorruptFields: Boolean,
generateSegIdFieldsCnt: Int,
segmentIdProvidedPrefix: String,
metadataPolicy: MetadataPolicy)
@@ -68,7 +68,7 @@ class CobolSchema(copybook: Copybook,
inputFileNameField,
generateRecordId,
generateRecordBytes,
- generateCorruptedFields,
+ generateCorruptFields,
generateSegIdFieldsCnt,
segmentIdProvidedPrefix
) with Logging with Serializable {
@@ -129,8 +129,8 @@ class CobolSchema(copybook: Copybook,
recordsWithRecordBytes
}
- val recordsWithCorruptedFields = if (generateCorruptedFields) {
- recordsWithRecordId :+ StructField(Constants.corruptedFieldsField, ArrayType(StructType(
+ val recordsWithCorruptFields = if (generateCorruptFields) {
+ recordsWithRecordId :+ StructField(Constants.corruptFieldsField, ArrayType(StructType(
Seq(
StructField(Constants.fieldNameColumn, StringType, nullable = false),
StructField(Constants.rawValueColumn, BinaryType, nullable = false)
@@ -140,7 +140,7 @@ class CobolSchema(copybook: Copybook,
recordsWithRecordId
}
- StructType(recordsWithCorruptedFields)
+ StructType(recordsWithCorruptFields)
}
private [cobrix] def getMaximumSegmentIdLength(segmentIdProvidedPrefix: String): Int = {
@@ -323,7 +323,7 @@ object CobolSchema {
schema.inputFileNameField,
schema.generateRecordId,
schema.generateRecordBytes,
- schema.generateCorruptedFields,
+ schema.generateCorruptFields,
schema.generateSegIdFieldsCnt,
schema.segmentIdPrefix,
schema.metadataPolicy
@@ -347,7 +347,7 @@ object CobolSchema {
private var inputFileNameField: String = ""
private var generateRecordId: Boolean = false
private var generateRecordBytes: Boolean = false
- private var generateCorruptedFields: Boolean = false
+ private var generateCorruptFields: Boolean = false
private var generateSegIdFieldsCnt: Int = 0
private var segmentIdProvidedPrefix: String = ""
private var metadataPolicy: MetadataPolicy = MetadataPolicy.Basic
@@ -382,8 +382,8 @@ object CobolSchema {
this
}
- def withGenerateCorruptedFields(generateCorruptedFields: Boolean): CobolSchemaBuilder = {
- this.generateCorruptedFields = generateCorruptedFields
+ def withGenerateCorruptFields(generateCorruptFields: Boolean): CobolSchemaBuilder = {
+ this.generateCorruptFields = generateCorruptFields
this
}
@@ -411,7 +411,7 @@ object CobolSchema {
inputFileNameField,
generateRecordId,
generateRecordBytes,
- generateCorruptedFields,
+ generateCorruptFields,
generateSegIdFieldsCnt,
segmentIdProvidedPrefix,
metadataPolicy
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala
index 5bdf38643..349ba4630 100644
--- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala
@@ -297,7 +297,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
assertEqualsMultiline(actualSchema, expectedSchema)
}
- "multi-segment keep-original with corrupted record generation" in {
+ "multi-segment keep-original with corrupt record generation" in {
val expectedSchema =
"""root
| |-- Seg_Id0: string (nullable = true)
@@ -306,13 +306,13 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
| | |-- IntValue: integer (nullable = true)
| |-- STRUCT2: struct (nullable = true)
| | |-- STR_FLD: string (nullable = true)
- | |-- _corrupted_fields: array (nullable = false)
+ | |-- _corrupt_fields: array (nullable = false)
| | |-- element: struct (containsNull = false)
| | | |-- field_name: string (nullable = false)
| | | |-- raw_value: binary (nullable = false)
|""".stripMargin.replaceAll("[\\r\\n]", "\n")
val parsedSchema = CopybookParser.parseTree(copyBook)
- val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal).withGenerateCorruptedFields(true).withGenerateSegIdFieldsCnt(2).build()
+ val cobolSchema = CobolSchema.builder(parsedSchema).withSchemaRetentionPolicy(SchemaRetentionPolicy.KeepOriginal).withGenerateCorruptFields(true).withGenerateSegIdFieldsCnt(2).build()
val actualSchema = cobolSchema.getSparkSchema.treeString
assertEqualsMultiline(actualSchema, expectedSchema)
@@ -869,20 +869,20 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
assert(sparkSchema.fields(1).dataType.isInstanceOf[StructType])
}
- "create schema with corrupted fields using builder" in {
+ "create schema with corrupt fields using builder" in {
val copybook: String =
""" 01 RECORD.
| 05 STR1 PIC X(10).
|""".stripMargin
val parsedCopybook = CopybookParser.parse(copybook)
val cobolSchema = CobolSchema.builder(parsedCopybook)
- .withGenerateCorruptedFields(true)
+ .withGenerateCorruptFields(true)
.build()
val sparkSchema = cobolSchema.getSparkSchema
assert(sparkSchema.fields.length == 2)
- assert(sparkSchema.fields(1).name == "_corrupted_fields")
+ assert(sparkSchema.fields(1).name == "_corrupt_fields")
assert(sparkSchema.fields(1).dataType.isInstanceOf[ArrayType])
}
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test17HierarchicalSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test17HierarchicalSpec.scala
index 837d227c7..b316d9ccb 100644
--- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test17HierarchicalSpec.scala
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test17HierarchicalSpec.scala
@@ -16,12 +16,13 @@
package za.co.absa.cobrix.spark.cobol.source.integration
-import java.nio.charset.StandardCharsets
-import java.nio.file.{Files, Paths}
-
import org.scalatest.wordspec.AnyWordSpec
+import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.PARAM_GENERATE_RECORD_BYTES
import za.co.absa.cobrix.spark.cobol.source.base.{CobolTestBase, SparkTestBase}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths}
+
//noinspection NameBooleanParameters
class Test17HierarchicalSpec extends AnyWordSpec with SparkTestBase with CobolTestBase {
@@ -441,7 +442,7 @@ class Test17HierarchicalSpec extends AnyWordSpec with SparkTestBase with CobolTe
.load(inpudDataPath)
}
- assert(ex.getMessage.contains("Option 'generate_record_bytes' cannot be used with 'segment-children:*'"))
+ assert(ex.getMessage.contains(s"Option '$PARAM_GENERATE_RECORD_BYTES=true' cannot be used with 'segment-children:*'"))
}
}
}
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test35GeneratedRecordBytes.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test35GeneratedRecordBytes.scala
index d17f43653..7912c480e 100644
--- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test35GeneratedRecordBytes.scala
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test35GeneratedRecordBytes.scala
@@ -17,6 +17,7 @@
package za.co.absa.cobrix.spark.cobol.source.integration
import org.scalatest.wordspec.AnyWordSpec
+import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.PARAM_GENERATE_RECORD_BYTES
import za.co.absa.cobrix.spark.cobol.Cobrix
import za.co.absa.cobrix.spark.cobol.source.base.{SimpleComparisonBase, SparkTestBase}
import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture
@@ -115,7 +116,7 @@ class Test35GeneratedRecordBytes extends AnyWordSpec with SparkTestBase with Bin
.load(fileName)
}
- assert(ex.getMessage.contains("Option 'generate_record_bytes' cannot be used with 'segment-children:*'"))
+ assert(ex.getMessage.contains(s"Option '$PARAM_GENERATE_RECORD_BYTES=true' cannot be used with 'segment-children:*'"))
}
}
}
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptedFieldsSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala
similarity index 83%
rename from spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptedFieldsSpec.scala
rename to spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala
index 4cab7323a..5a5732419 100644
--- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptedFieldsSpec.scala
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala
@@ -18,12 +18,12 @@ package za.co.absa.cobrix.spark.cobol.source.integration
import org.apache.spark.sql.DataFrame
import org.scalatest.wordspec.AnyWordSpec
-import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.PARAM_CORRUPTED_FIELDS
+import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.PARAM_CORRUPT_FIELDS
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
import za.co.absa.cobrix.spark.cobol.source.fixtures.{BinaryFileFixture, TextComparisonFixture}
import za.co.absa.cobrix.spark.cobol.utils.SparkUtils
-class Test41CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture with TextComparisonFixture {
+class Test41CorruptFieldsSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture with TextComparisonFixture {
private val copybook =
""" 01 R.
03 ID PIC 9(1).
@@ -41,7 +41,7 @@ class Test41CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with Bina
0xF5, 0xC1, 0x4C, 0x00, 0xA0, 0xC1, 0xF5, 0xA3 // Errors in array
).map(_.toByte)
- "Corrupted fields record generation" should {
+ "Corrupt fields record generation" should {
"work when the option is turned on" in {
val expectedSchema =
"""root
@@ -51,7 +51,7 @@ class Test41CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with Bina
| |-- F3: integer (nullable = true)
| |-- F4: array (nullable = true)
| | |-- element: integer (containsNull = true)
- | |-- _corrupted_fields: array (nullable = false)
+ | |-- _corrupt_fields: array (nullable = false)
| | |-- element: struct (containsNull = false)
| | | |-- field_name: string (nullable = false)
| | | |-- raw_value: binary (nullable = false)
@@ -64,13 +64,13 @@ class Test41CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with Bina
| "F2" : 5,
| "F3" : 6,
| "F4" : [ 1, 2, 3 ],
- | "_corrupted_fields" : [ ]
+ | "_corrupt_fields" : [ ]
|}, {
| "ID" : 2,
| "F1" : "1",
| "F3" : 5,
| "F4" : [ 4, 5, 6 ],
- | "_corrupted_fields" : [ {
+ | "_corrupt_fields" : [ {
| "field_name" : "F2",
| "raw_value" : "0w=="
| } ]
@@ -79,19 +79,19 @@ class Test41CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with Bina
| "F2" : 3,
| "F3" : 61702,
| "F4" : [ 7, 8, 9 ],
- | "_corrupted_fields" : [ ]
+ | "_corrupt_fields" : [ ]
|}, {
| "ID" : 4,
| "F3" : 0,
| "F4" : [ null, null, 0 ],
- | "_corrupted_fields" : [ ]
+ | "_corrupt_fields" : [ ]
|}, {
| "ID" : 5,
| "F1" : "A",
| "F2" : 4,
| "F3" : 160,
| "F4" : [ null, 5, null ],
- | "_corrupted_fields" : [ {
+ | "_corrupt_fields" : [ {
| "field_name" : "F4[0]",
| "raw_value" : "wQ=="
| }, {
@@ -101,8 +101,8 @@ class Test41CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with Bina
|} ]
|""".stripMargin
- withTempBinFile("corrupted_fields1", ".dat", data) { tmpFileName =>
- val df = getDataFrame(tmpFileName, Map("generate_corrupted_fields" -> "true"))
+ withTempBinFile("corrupt_fields1", ".dat", data) { tmpFileName =>
+ val df = getDataFrame(tmpFileName, Map("generate_corrupt_fields" -> "true"))
val actualSchema = df.schema.treeString
compareTextVertical(actualSchema, expectedSchema)
@@ -115,10 +115,10 @@ class Test41CorruptedFieldsSpec extends AnyWordSpec with SparkTestBase with Bina
"throw an exception when working with a hierarchical data" in {
val ex = intercept[IllegalArgumentException] {
- getDataFrame("/tmp/dummy", Map("generate_corrupted_fields" -> "true", "segment-children:0" -> "COMPANY => DEPT,CUSTOMER"))
+ getDataFrame("/tmp/dummy", Map("generate_corrupt_fields" -> "true", "segment-children:0" -> "COMPANY => DEPT,CUSTOMER"))
}
- assert(ex.getMessage.contains(s"Option '$PARAM_CORRUPTED_FIELDS' cannot be used with 'segment-children:*'"))
+ assert(ex.getMessage.contains(s"Option '$PARAM_CORRUPT_FIELDS=true' cannot be used with 'segment-children:*'"))
}
}
From 8a835e555a06be13928f1191dd4fa613175ebbfb Mon Sep 17 00:00:00 2001
From: Ruslan Iushchenko
Date: Wed, 11 Feb 2026 09:41:38 +0100
Subject: [PATCH 7/7] #723 Fix PR nitpicks.
---
README.md | 2 +-
.../za/co/absa/cobrix/cobol/parser/ast/Primitive.scala | 7 ++++---
.../cobol/reader/extractors/record/RecordExtractors.scala | 7 ++++---
.../za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala | 2 +-
.../cobol/source/integration/Test41CorruptFieldsSpec.scala | 4 ++--
5 files changed, 12 insertions(+), 10 deletions(-)
diff --git a/README.md b/README.md
index a16b56bfd..f4cc9bcf0 100644
--- a/README.md
+++ b/README.md
@@ -1952,7 +1952,7 @@ A: Update hadoop dll to version 3.2.2 or newer.
## Changelog
- #### 2.9.8 will be released soon.
- - [#723](https://github.com/AbsaOSS/cobrix/pull/723) Added the option to generate `corrupt_fields` field that contains field names and raw values
+ - [#723](https://github.com/AbsaOSS/cobrix/pull/723) Added the option to generate `_corrupt_fields` field that contains field names and raw values
of fields that Cobrix couldn't decode.
```scala
.option("generate_corrupt_fields", "true")
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala
index 1019dcc7c..794172a2b 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/ast/Primitive.scala
@@ -154,7 +154,8 @@ case class Primitive(
/**
* Checks if a value extracted from a given binary record at a specified offset is considered empty.
- * A value is considered empty if it contains only null bytes or bytes equal to 0x40.
+ * A value is considered empty if it contains only null bytes or bytes equal to a space character
+ * of the underlying encoding (e.g., 0x40 for EBCDIC, 0x20 for ASCII, 0x00 for binary).
*
* @param itOffset The offset within the binary record where the value starts.
* @param record The binary record represented as an array of bytes.
@@ -170,7 +171,7 @@ case class Primitive(
return true
}
} else {
- // Non-string field size should exactly fix the required bytes
+ // Non-string field size should exactly fit the required bytes
if (idx + bytesCount > record.length) {
return true
}
@@ -212,7 +213,7 @@ case class Primitive(
return null
}
} else {
- // Non-string field size should exactly fix the required bytes
+ // Non-string field size should exactly fit the required bytes
if (idx + bytesCount > record.length) {
return null
}
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala
index fdabc52ee..838c0e365 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala
@@ -19,6 +19,7 @@ package za.co.absa.cobrix.cobol.reader.extractors.record
import za.co.absa.cobrix.cobol.parser.CopybookParser.CopybookAST
import za.co.absa.cobrix.cobol.parser.ast.datatype.{AlphaNumeric, COMP4}
import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive, Statement}
+import za.co.absa.cobrix.cobol.parser.common.Constants
import za.co.absa.cobrix.cobol.parser.encoding.RAW
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy
@@ -532,9 +533,9 @@ object RecordExtractors {
private def getCorruptFieldsGroup: Group = {
val corruptFieldsInGroup = new mutable.ArrayBuffer[Statement]
- corruptFieldsInGroup += Primitive(15, "field_name", "field_name", 0, AlphaNumeric("X(50)", 50), decode = null, encode = null)(None)
- corruptFieldsInGroup += Primitive(15, "raw_value", "raw_value", 0, AlphaNumeric("X(50)", 50, enc = Some(RAW), compact = Some(COMP4())), decode = null, encode = null)(None)
+ corruptFieldsInGroup += Primitive(15, Constants.fieldNameColumn, Constants.fieldNameColumn, 0, AlphaNumeric("X(50)", 50), decode = null, encode = null)(None)
+ corruptFieldsInGroup += Primitive(15, Constants.rawValueColumn, Constants.rawValueColumn, 0, AlphaNumeric("X(50)", 50, enc = Some(RAW), compact = Some(COMP4())), decode = null, encode = null)(None)
- Group(10, "_corrupt_fields", "_corrupt_fields", 0, children = corruptFieldsInGroup, occurs = Some(10))(None)
+ Group(10, Constants.corruptFieldsField, Constants.corruptFieldsField, 0, children = corruptFieldsInGroup, occurs = Some(10))(None)
}
}
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala
index 349ba4630..34d1244d8 100644
--- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala
@@ -906,7 +906,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
assert(sparkSchema.fields(0).name == "file_name")
assert(sparkSchema.fields(1).name == "Seg_Id0")
assert(sparkSchema.fields(2).name == "NUM1")
- assert(sparkSchema.fields(2).dataType.isInstanceOf[StringType])
+ assert(sparkSchema.fields(2).dataType == StringType)
}
}
}
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala
index 5a5732419..77dabe73f 100644
--- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala
@@ -102,7 +102,7 @@ class Test41CorruptFieldsSpec extends AnyWordSpec with SparkTestBase with Binary
|""".stripMargin
withTempBinFile("corrupt_fields1", ".dat", data) { tmpFileName =>
- val df = getDataFrame(tmpFileName, Map("generate_corrupt_fields" -> "true"))
+ val df = getDataFrame(tmpFileName, Map(PARAM_CORRUPT_FIELDS -> "true"))
val actualSchema = df.schema.treeString
compareTextVertical(actualSchema, expectedSchema)
@@ -115,7 +115,7 @@ class Test41CorruptFieldsSpec extends AnyWordSpec with SparkTestBase with Binary
"throw an exception when working with a hierarchical data" in {
val ex = intercept[IllegalArgumentException] {
- getDataFrame("/tmp/dummy", Map("generate_corrupt_fields" -> "true", "segment-children:0" -> "COMPANY => DEPT,CUSTOMER"))
+ getDataFrame("/tmp/dummy", Map(PARAM_CORRUPT_FIELDS -> "true", "segment-children:0" -> "COMPANY => DEPT,CUSTOMER"))
}
assert(ex.getMessage.contains(s"Option '$PARAM_CORRUPT_FIELDS=true' cannot be used with 'segment-children:*'"))