Skip to content

[AURON #1850] Add FlinkArrowUtils for Flink-Arrow type conversion#1959

Merged
Tartarus0zm merged 8 commits intoapache:masterfrom
x-tong:feature/issue-1850-flink-arrow-part1
Feb 4, 2026
Merged

[AURON #1850] Add FlinkArrowUtils for Flink-Arrow type conversion#1959
Tartarus0zm merged 8 commits intoapache:masterfrom
x-tong:feature/issue-1850-flink-arrow-part1

Conversation

@x-tong
Copy link
Contributor

@x-tong x-tong commented Jan 26, 2026

Summary

Part 1/3 of Flink RowData to Arrow conversion implementation (split from #1930).

This PR adds the foundational type conversion utilities:

  • FlinkArrowUtils: Conversion form Flink RowData to Arrow types
  • Support for all common Flink types including primitives, temporal, and complex types
  • Comprehensive unit tests for type conversion

Follow-up PRs

  • Part 2: FlinkArrowFieldWriter + FlinkArrowWriter
  • Part 3: FlinkArrowFFIExporter

Test plan

  • Unit tests for FlinkArrowUtils type conversion
  • Build passes with ./auron-build.sh --pre --sparkver 3.5 --scalaver 2.12 -Pflink-1.18 -DskipBuildNative
  • Code formatted with ./dev/reformat

Related: #1850

Part 1 of Flink RowData to Arrow conversion implementation.

This PR adds the foundational type conversion utilities:
- FlinkArrowUtils: Bidirectional conversion between Flink LogicalType and Arrow types
- Support for all common Flink types including primitives, temporal, and complex types
- Comprehensive unit tests for type conversion
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces foundational type conversion utilities for Flink-Arrow integration as part 1 of a 3-part implementation series. It adds FlinkArrowUtils with methods to convert Flink LogicalTypes to Arrow types, supporting primitives, temporal types, and complex structures (arrays, maps, rows).

Changes:

  • Added FlinkArrowUtils class with one-way conversion from Flink LogicalType to Arrow types
  • Added comprehensive unit tests covering all supported type conversions
  • Added Arrow and Flink table dependencies to support the conversion utilities

Reviewed changes

Copilot reviewed 3 out of 4 changed files in this pull request and generated 2 comments.

File Description
auron-flink-extension/auron-flink-runtime/src/main/java/org/apache/auron/flink/arrow/FlinkArrowUtils.java Core utility class implementing Flink-to-Arrow type conversion with methods for types, fields, and schemas, plus a shared ROOT_ALLOCATOR for Arrow memory management
auron-flink-extension/auron-flink-runtime/src/test/java/org/apache/auron/flink/arrow/FlinkArrowUtilsTest.java Comprehensive test suite covering basic types, complex types (arrays, rows, maps), temporal types, and error handling
auron-flink-extension/auron-flink-runtime/pom.xml Added Arrow dependencies (arrow-c-data, arrow-memory-unsafe, arrow-vector) and test dependencies (junit-jupiter-api)
.gitignore Added LSP-related files (*.prefs) to ignore list

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

x-tong and others added 2 commits January 26, 2026 22:11
…ache/auron/flink/arrow/FlinkArrowUtils.java

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Copy link
Contributor

@ShreyeshArangath ShreyeshArangath left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, just a couple minor comments

Also, from the description, should this not be built with flink instead of Spark?

./auron-build.sh --pre --sparkver 3.5 --scalaver 2.12 -DskipBuildNative

- Mark class as final to prevent subclassing
- Add null check for logicalType parameter to throw IllegalArgumentException instead of NPE
@x-tong x-tong force-pushed the feature/issue-1850-flink-arrow-part1 branch from e6be0b8 to 9f6cfc6 Compare January 27, 2026 18:09
@x-tong
Copy link
Contributor Author

x-tong commented Jan 27, 2026

Overall LGTM, just a couple minor comments

Also, from the description, should this not be built with flink instead of Spark?

./auron-build.sh --pre --sparkver 3.5 --scalaver 2.12 -DskipBuildNative

Thank you for your review, yes, that's the way it should be done, I have made the changes.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 4 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

- Add test case for NullType to Arrow conversion
- Add assertion to verify Decimal bitWidth is 128
Copy link
Contributor

@ShreyeshArangath ShreyeshArangath left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for adding this

Copy link
Contributor

@Tartarus0zm Tartarus0zm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution!
Overall LGTM, just a couple minor comments.

x-tong and others added 2 commits January 30, 2026 17:55
- Remove redundant nullable parameter from toArrowField method
- Use logicalType.isNullable() to determine field nullability
- Improve TimeType conversion with precision-based Arrow type selection
- Update tests to reflect API changes
…lZonedTimestampType

Reference Flink's ArrowUtils implementation to select appropriate Arrow
TimeUnit based on timestamp precision:
- precision 0: SECOND
- precision 1-3: MILLISECOND
- precision 4-6: MICROSECOND
- precision 7+: NANOSECOND
Copy link
Contributor

@Tartarus0zm Tartarus0zm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your contribute!
LGTM

@Tartarus0zm Tartarus0zm merged commit a9adc70 into apache:master Feb 4, 2026
187 of 188 checks passed
@Tartarus0zm
Copy link
Contributor

@x-tong thanks for your contribute! merged!

@x-tong x-tong deleted the feature/issue-1850-flink-arrow-part1 branch February 27, 2026 17:41
Tartarus0zm pushed a commit that referenced this pull request Mar 10, 2026
…es (#2079)

# Which issue does this PR close?

Partially addresses #1850 (Part 2a of the Flink RowData to Arrow
conversion).

# Rationale for this change

Per AIP-1, the Flink integration data path requires converting Flink
`RowData` into Arrow `VectorSchemaRoot` for export to the native engine
(DataFusion/Rust). This PR implements the writer layer for basic types,
following Flink's official `flink-python` Arrow implementation as
requested during Part 1 review (#1959).

# What changes are included in this PR?

## Commit 1: ArrowFieldWriter base class + 12 type writers (16 files,
+2181 lines)

- **`ArrowFieldWriter<IN>`** — Generic abstract base class using
template method pattern (`write()` → `doWrite()` + count++), aligned
with Flink's `flink-python` `ArrowFieldWriter`.
- **12 concrete writers** in `writers/` sub-package, each with
`forRow()`/`forArray()` dual-mode factory methods:
- Numeric: `IntWriter`, `TinyIntWriter`, `SmallIntWriter`,
`BigIntWriter`, `FloatWriter`, `DoubleWriter`
- Non-numeric: `BooleanWriter`, `VarCharWriter`, `VarBinaryWriter`,
`DecimalWriter`, `DateWriter`, `NullWriter`
- **Key design**: Each writer (except `NullWriter`) has two `public
static final` inner classes (`XxxWriterForRow` / `XxxWriterForArray`)
because Flink's `RowData` and `ArrayData` have no common getter
interface.
- **Special cases**:
- `NullWriter`: No inner classes needed, `doWrite()` is empty
(NullVector values are inherently null)
- `DecimalWriter`: Takes precision/scale parameters, includes
`fitBigDecimal()` validation before writing (aligned with Flink's
`fromBigDecimal` logic)
- **Unit tests**: `IntWriterTest` (5), `BasicWritersTest` (20),
`NonNumericWritersTest` (12) — 37 tests

## Commit 2: FlinkArrowWriter orchestrator + factory methods (3 files,
+482 lines)

- **`FlinkArrowWriter`** — Orchestrates per-column
`ArrowFieldWriter<RowData>[]` to write Flink `RowData` into Arrow
`VectorSchemaRoot`. Lifecycle: `create()` → `write(row)*` → `finish()` →
`reset()`.
- **Factory methods in `FlinkArrowUtils`** —
`createArrowFieldWriterForRow()`/`createArrowFieldWriterForArray()`
dispatch writer creation based on Arrow vector type (instanceof chain).
Both are package-private.
- **Integration tests**: `FlinkArrowWriterTest` (7) — all-types write,
null handling, multi-row batches, reset, empty batch, zero columns,
unsupported type. Total: **53 tests, all passing**.

# Scope

This PR covers basic types only. Time, Timestamp, and complex types
(Array/Map/Row) will be in Part 2b.

# Are there any user-facing changes?

No. Internal API for Flink integration.

# How was this patch tested?

53 tests across 4 test classes:

```bash
./build/mvn test -Pflink-1.18 -Pspark-3.5 -Pscala-2.12 \
  -pl auron-flink-extension/auron-flink-runtime -am -DskipBuildNative
```

Result: 53 pass, 0 failures.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants