Skip to content

[AURON #1850] Add ArrowFieldWriter and FlinkArrowWriter for basic types#2079

Merged
Tartarus0zm merged 2 commits intoapache:masterfrom
x-tong:feature/issue-1850-flink-arrow-part2a
Mar 10, 2026
Merged

[AURON #1850] Add ArrowFieldWriter and FlinkArrowWriter for basic types#2079
Tartarus0zm merged 2 commits intoapache:masterfrom
x-tong:feature/issue-1850-flink-arrow-part2a

Conversation

@x-tong
Copy link
Contributor

@x-tong x-tong commented Mar 9, 2026

Which issue does this PR close?

Partially addresses #1850 (Part 2a of the Flink RowData to Arrow conversion).

Rationale for this change

Per AIP-1, the Flink integration data path requires converting Flink RowData into Arrow VectorSchemaRoot for export to the native engine (DataFusion/Rust). This PR implements the writer layer for basic types, following Flink's official flink-python Arrow implementation as requested during Part 1 review (#1959).

What changes are included in this PR?

Commit 1: ArrowFieldWriter base class + 12 type writers (16 files, +2181 lines)

  • ArrowFieldWriter<IN> — Generic abstract base class using template method pattern (write()doWrite() + count++), aligned with Flink's flink-python ArrowFieldWriter.
  • 12 concrete writers in writers/ sub-package, each with forRow()/forArray() dual-mode factory methods:
    • Numeric: IntWriter, TinyIntWriter, SmallIntWriter, BigIntWriter, FloatWriter, DoubleWriter
    • Non-numeric: BooleanWriter, VarCharWriter, VarBinaryWriter, DecimalWriter, DateWriter, NullWriter
  • Key design: Each writer (except NullWriter) has two public static final inner classes (XxxWriterForRow / XxxWriterForArray) because Flink's RowData and ArrayData have no common getter interface.
  • Special cases:
    • NullWriter: No inner classes needed, doWrite() is empty (NullVector values are inherently null)
    • DecimalWriter: Takes precision/scale parameters, includes fitBigDecimal() validation before writing (aligned with Flink's fromBigDecimal logic)
  • Unit tests: IntWriterTest (5), BasicWritersTest (20), NonNumericWritersTest (12) — 37 tests

Commit 2: FlinkArrowWriter orchestrator + factory methods (3 files, +482 lines)

  • FlinkArrowWriter — Orchestrates per-column ArrowFieldWriter<RowData>[] to write Flink RowData into Arrow VectorSchemaRoot. Lifecycle: create()write(row)*finish()reset().
  • Factory methods in FlinkArrowUtilscreateArrowFieldWriterForRow()/createArrowFieldWriterForArray() dispatch writer creation based on Arrow vector type (instanceof chain). Both are package-private.
  • Integration tests: FlinkArrowWriterTest (7) — all-types write, null handling, multi-row batches, reset, empty batch, zero columns, unsupported type. Total: 53 tests, all passing.

Scope

This PR covers basic types only. Time, Timestamp, and complex types (Array/Map/Row) will be in Part 2b.

Are there any user-facing changes?

No. Internal API for Flink integration.

How was this patch tested?

53 tests across 4 test classes:

./build/mvn test -Pflink-1.18 -Pspark-3.5 -Pscala-2.12 \
  -pl auron-flink-extension/auron-flink-runtime -am -DskipBuildNative

Result: 53 pass, 0 failures.

x-tong added 2 commits March 5, 2026 23:01
…for Flink Arrow conversion

Introduce the generic ArrowFieldWriter<T> base class and 12 concrete
type writers that convert Flink RowData/ArrayData fields to Arrow vectors.

The design follows Flink's official flink-python module (ArrowFieldWriter +
forRow/forArray dual-mode pattern), as requested during Part 1 review.

Writers added:
- Numeric: Int, TinyInt, SmallInt, BigInt, Float, Double
- Non-numeric: Boolean, VarChar, VarBinary, Decimal, Date, Null

Each writer (except NullWriter) has two inner classes for RowData and
ArrayData access, instantiated via static forRow()/forArray() factory
methods.

Includes unit tests: IntWriterTest (5), BasicWritersTest (20),
NonNumericWritersTest (12) — 37 tests total.
…hods

FlinkArrowWriter orchestrates per-column ArrowFieldWriters to write
Flink RowData into an Arrow VectorSchemaRoot. Lifecycle: create() ->
write(row)* -> finish() -> reset().

Add createArrowFieldWriterForRow/ForArray factory methods in
FlinkArrowUtils that dispatch writer creation based on Arrow vector
type (instanceof chain). Both are package-private as they are only
used by FlinkArrowWriter.create().

Includes integration tests (7): all-types write, null handling,
multi-row batches, reset, empty batch, zero columns, unsupported type.

Total: 53 tests across 4 test classes, all passing.
@github-actions github-actions bot added the flink label Mar 9, 2026
@Tartarus0zm Tartarus0zm self-requested a review March 10, 2026 02:34
Copy link
Contributor

@Tartarus0zm Tartarus0zm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your contribute!
LGTM

@Tartarus0zm Tartarus0zm merged commit 64de43f into apache:master Mar 10, 2026
117 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants