Skip to content

[FLINK-38023][formats] Fix GenericRecord Avro state migration#28161

Open
avi-sanwal wants to merge 4 commits into
apache:masterfrom
avi-sanwal:FLINK-38023-generic-record-avro-state-restore
Open

[FLINK-38023][formats] Fix GenericRecord Avro state migration#28161
avi-sanwal wants to merge 4 commits into
apache:masterfrom
avi-sanwal:FLINK-38023-generic-record-avro-state-restore

Conversation

@avi-sanwal
Copy link
Copy Markdown

@avi-sanwal avi-sanwal commented May 14, 2026

What is the purpose of the change

This pull request fixes GenericRecord Avro state migration when compatible schema evolution changes the record shape, for example by adding a field with a default.

During migration, Flink can deserialize old state bytes with the restored previous serializer. For GenericRecord that value can still carry the old Avro schema, while the new serializer must write bytes using the new schema. Writing the old-shaped GenericRecord directly with the new GenericDatumWriter can fail or address fields by the wrong index.

Brief change log

  • When serializing GenericRecord values, keep the existing same-schema fast path.
  • If the GenericRecord value schema differs from the serializer runtime schema, resolve the value through Avro writer-schema to reader-schema conversion before writing it with the new serializer.
  • Add serializer snapshot regression coverage for restoring old GenericRecord bytes and then writing them with the evolved serializer, including inserting a field in the middle of the record.

Verifying this change

This change added tests and can be verified as follows:

  • Added AvroSerializerSnapshotTest#migratedGenericRecordShouldBeSerializedWithNewSchema.
  • Added AvroSerializerSnapshotTest#migratedGenericRecordShouldBeSerializedWithNewSchemaWhenFieldIsInsertedInMiddle.
  • Ran ./mvnw -pl flink-formats/flink-avro spotless:apply -Dspotless.check.skip=false.
  • Ran ./mvnw -pl flink-formats/flink-avro -am -Dtest=AvroSerializerSnapshotTest -DfailIfNoTests=false -Dsurefire.failIfNoSpecifiedTests=false -Dcheckstyle.skip -Drat.skip -Dspotless.check.skip test.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: yes
  • The runtime per-record code paths (performance sensitive): yes, only GenericRecord serialization when the value schema differs from the serializer schema
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes, state restore/migration
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

Was generative AI tooling used to co-author this PR?
  • Yes: OpenAI Codex GPT-5

Generated-by: OpenAI Codex GPT-5

Resolve GenericRecord values with an older record schema to the serializer runtime schema before writing them during state migration.

Generated-by: OpenAI Codex GPT-5
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 14, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Align GenericRecord restore behavior with SpecificRecord by constructing the restored reader with previous and current schemas instead of rewriting records at serialize time.

Generated-by: OpenAI Codex GPT-5
Handle the nullable previous schema inside the GenericRecord factory helper instead of passing Optional as a parameter.

Generated-by: OpenAI Codex GPT-5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants