[AURON #1850] Add FlinkArrowUtils for Flink-Arrow type conversion#1959
Conversation
Part 1 of Flink RowData to Arrow conversion implementation. This PR adds the foundational type conversion utilities: - FlinkArrowUtils: Bidirectional conversion between Flink LogicalType and Arrow types - Support for all common Flink types including primitives, temporal, and complex types - Comprehensive unit tests for type conversion
There was a problem hiding this comment.
Pull request overview
This PR introduces foundational type conversion utilities for Flink-Arrow integration as part 1 of a 3-part implementation series. It adds FlinkArrowUtils with methods to convert Flink LogicalTypes to Arrow types, supporting primitives, temporal types, and complex structures (arrays, maps, rows).
Changes:
- Added
FlinkArrowUtilsclass with one-way conversion from Flink LogicalType to Arrow types - Added comprehensive unit tests covering all supported type conversions
- Added Arrow and Flink table dependencies to support the conversion utilities
Reviewed changes
Copilot reviewed 3 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java | Core utility class implementing Flink-to-Arrow type conversion with methods for types, fields, and schemas, plus a shared ROOT_ALLOCATOR for Arrow memory management |
| auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java | Comprehensive test suite covering basic types, complex types (arrays, rows, maps), temporal types, and error handling |
| auron-flink-extension/auron-flink-runtime/pom.xml | Added Arrow dependencies (arrow-c-data, arrow-memory-unsafe, arrow-vector) and test dependencies (junit-jupiter-api) |
| .gitignore | Added LSP-related files (*.prefs) to ignore list |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...xtension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
Outdated
Show resolved
Hide resolved
...xtension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
Outdated
Show resolved
Hide resolved
…ache/auron/flink/arrow/FlinkArrowUtils.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
ShreyeshArangath
left a comment
There was a problem hiding this comment.
Overall LGTM, just a couple minor comments
Also, from the description, should this not be built with flink instead of Spark?
./auron-build.sh --pre --sparkver 3.5 --scalaver 2.12 -DskipBuildNative
...xtension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
Outdated
Show resolved
Hide resolved
...xtension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
Show resolved
Hide resolved
- Mark class as final to prevent subclassing - Add null check for logicalType parameter to throw IllegalArgumentException instead of NPE
e6be0b8 to
9f6cfc6
Compare
Thank you for your review, yes, that's the way it should be done, I have made the changes. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 4 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...sion/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java
Show resolved
Hide resolved
...sion/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java
Show resolved
Hide resolved
- Add test case for NullType to Arrow conversion - Add assertion to verify Decimal bitWidth is 128
ShreyeshArangath
left a comment
There was a problem hiding this comment.
LGTM, thanks for adding this
Tartarus0zm
left a comment
There was a problem hiding this comment.
Thanks for your contribution!
Overall LGTM, just a couple minor comments.
...xtension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
Outdated
Show resolved
Hide resolved
...xtension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
Show resolved
Hide resolved
...xtension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
Outdated
Show resolved
Hide resolved
...xtension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
Outdated
Show resolved
Hide resolved
- Remove redundant nullable parameter from toArrowField method - Use logicalType.isNullable() to determine field nullability - Improve TimeType conversion with precision-based Arrow type selection - Update tests to reflect API changes
…lZonedTimestampType Reference Flink's ArrowUtils implementation to select appropriate Arrow TimeUnit based on timestamp precision: - precision 0: SECOND - precision 1-3: MILLISECOND - precision 4-6: MICROSECOND - precision 7+: NANOSECOND
...xtension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java
Outdated
Show resolved
Hide resolved
Tartarus0zm
left a comment
There was a problem hiding this comment.
thanks for your contribute!
LGTM
|
@x-tong thanks for your contribute! merged! |
…es (#2079) # Which issue does this PR close? Partially addresses #1850 (Part 2a of the Flink RowData to Arrow conversion). # Rationale for this change Per AIP-1, the Flink integration data path requires converting Flink `RowData` into Arrow `VectorSchemaRoot` for export to the native engine (DataFusion/Rust). This PR implements the writer layer for basic types, following Flink's official `flink-python` Arrow implementation as requested during Part 1 review (#1959). # What changes are included in this PR? ## Commit 1: ArrowFieldWriter base class + 12 type writers (16 files, +2181 lines) - **`ArrowFieldWriter<IN>`** — Generic abstract base class using template method pattern (`write()` → `doWrite()` + count++), aligned with Flink's `flink-python` `ArrowFieldWriter`. - **12 concrete writers** in `writers/` sub-package, each with `forRow()`/`forArray()` dual-mode factory methods: - Numeric: `IntWriter`, `TinyIntWriter`, `SmallIntWriter`, `BigIntWriter`, `FloatWriter`, `DoubleWriter` - Non-numeric: `BooleanWriter`, `VarCharWriter`, `VarBinaryWriter`, `DecimalWriter`, `DateWriter`, `NullWriter` - **Key design**: Each writer (except `NullWriter`) has two `public static final` inner classes (`XxxWriterForRow` / `XxxWriterForArray`) because Flink's `RowData` and `ArrayData` have no common getter interface. - **Special cases**: - `NullWriter`: No inner classes needed, `doWrite()` is empty (NullVector values are inherently null) - `DecimalWriter`: Takes precision/scale parameters, includes `fitBigDecimal()` validation before writing (aligned with Flink's `fromBigDecimal` logic) - **Unit tests**: `IntWriterTest` (5), `BasicWritersTest` (20), `NonNumericWritersTest` (12) — 37 tests ## Commit 2: FlinkArrowWriter orchestrator + factory methods (3 files, +482 lines) - **`FlinkArrowWriter`** — Orchestrates per-column `ArrowFieldWriter<RowData>[]` to write Flink `RowData` into Arrow `VectorSchemaRoot`. Lifecycle: `create()` → `write(row)*` → `finish()` → `reset()`. - **Factory methods in `FlinkArrowUtils`** — `createArrowFieldWriterForRow()`/`createArrowFieldWriterForArray()` dispatch writer creation based on Arrow vector type (instanceof chain). Both are package-private. - **Integration tests**: `FlinkArrowWriterTest` (7) — all-types write, null handling, multi-row batches, reset, empty batch, zero columns, unsupported type. Total: **53 tests, all passing**. # Scope This PR covers basic types only. Time, Timestamp, and complex types (Array/Map/Row) will be in Part 2b. # Are there any user-facing changes? No. Internal API for Flink integration. # How was this patch tested? 53 tests across 4 test classes: ```bash ./build/mvn test -Pflink-1.18 -Pspark-3.5 -Pscala-2.12 \ -pl auron-flink-extension/auron-flink-runtime -am -DskipBuildNative ``` Result: 53 pass, 0 failures.
Summary
Part 1/3 of Flink RowData to Arrow conversion implementation (split from #1930).
This PR adds the foundational type conversion utilities:
FlinkArrowUtils: Conversion form Flink RowData to Arrow typesFollow-up PRs
Test plan
./auron-build.sh --pre --sparkver 3.5 --scalaver 2.12 -Pflink-1.18 -DskipBuildNative./dev/reformatRelated: #1850