From 6dba210c8166bc577861178988ba9a6de9f29ace Mon Sep 17 00:00:00 2001 From: Jared Yu Date: Mon, 18 May 2026 20:02:55 -0700 Subject: [PATCH 1/2] docs: add documentation for MAP dtype (#556) --- website/docs/user-guide/rust/api-reference.md | 14 +++++++++++ website/docs/user-guide/rust/data-types.md | 23 +++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index 5d3068b5..7ef34ef3 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -466,6 +466,7 @@ Implements the `InternalRow` trait (see below). | `fn get_binary(&self, idx: usize, length: usize) -> Result<&[u8]>` | Get fixed-length binary value | | `fn get_char(&self, idx: usize, length: usize) -> Result<&str>` | Get fixed-length char value | | `fn get_array(&self, idx: usize) -> Result` | Get array value | +| `fn get_map(&self, idx: usize, key_type: &DataType, value_type: &DataType) -> Result` | Get map value | ## `FlussArray` @@ -479,6 +480,19 @@ Implements the `InternalRow` trait (see below). Element getters mirror `InternalRow` typed getters and return `Result`. For example, use `get_int()`, `get_long()`, and `get_double()` for primitive elements, and `get_string()`, `get_binary()`, `get_decimal()`, `get_timestamp_ntz()`, `get_timestamp_ltz()`, and `get_array()` for variable-length or nested elements. +## `FlussMap` + +`FlussMap` is the Rust row representation for `MAP` values. You usually obtain it from `InternalRow::get_map()`. + +| Method | Description | +|--------|-------------| +| `fn size(&self) -> usize` | Number of entries in the map | +| `fn as_bytes(&self) -> &[u8]` | Get encoded bytes of the map | +| `fn key_array(&self) -> &FlussArray` | Get the key array | +| `fn value_array(&self) -> &FlussArray` | Get the value array | + +Key and value arrays are returned as `&FlussArray`, allowing you to read entries by retrieving keys and values at the same index positions. + ## `ChangeType` | Value | Short String | Description | diff --git a/website/docs/user-guide/rust/data-types.md b/website/docs/user-guide/rust/data-types.md index 63b7fa62..ad14028b 100644 --- a/website/docs/user-guide/rust/data-types.md +++ b/website/docs/user-guide/rust/data-types.md @@ -22,6 +22,7 @@ sidebar_position: 3 | `BYTES` | `&[u8]` | `get_bytes()` | `set_field(idx, &[u8])` | | `BINARY(n)` | `&[u8]` | `get_binary(idx, length)` | `set_field(idx, &[u8])` | | `ARRAY` | `FlussArray` | `get_array()` | `set_field(idx, FlussArray)` | +| `MAP` | `FlussMap` | `get_map(idx, key_type, value_type)` | `set_field(idx, FlussMap)` | ## Constructing Special Types @@ -83,6 +84,28 @@ row.set_field(0, Datum::Array(arr)); `ARRAY` is supported for row values and nested row fields. For key encoding, Rust follows Java parity: `ARRAY` can be encoded by the compacted key encoder, while table-level key constraints are validated by the server (which may reject unsupported key types). +## Maps + +Use `DataTypes::map(key_type, value_type)` in schema definitions. At runtime, read maps with `row.get_map(idx, &key_type, &value_type)?`. + +To construct map values for writes, build a `FlussMap` using `FlussMapWriter` and wrap it with `Datum::Map`: + +```rust +use fluss::metadata::DataTypes; +use fluss::row::binary_map::FlussMapWriter; +use fluss::row::{Datum, GenericRow}; + +let mut writer = FlussMapWriter::new(2, &DataTypes::string(), &DataTypes::int()); +writer.write_entry("key1".into(), 100.into())?; +writer.write_entry("key2".into(), Datum::Null)?; +let map = writer.complete()?; + +let mut row = GenericRow::new(1); +row.set_field(0, Datum::Map(map)); +``` + +`MAP` keys cannot be null. `MAP` is supported for row values and nested row fields. Like arrays, `MAP` follows Java parity for key encoding and can be encoded by the compacted key encoder, while table-level key constraints are validated by the server. + ## Reading Row Data ```rust From b05b7bc858209d9235c4b633d0d932849c4d7f6b Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sat, 23 May 2026 20:27:09 +0100 Subject: [PATCH 2/2] [chore] improve ergonomics for MAP dt, docs additions --- bindings/cpp/src/types.rs | 4 +- crates/fluss/src/row/binary_array.rs | 11 +- crates/fluss/src/row/binary_map.rs | 153 +++++++++++++++++- crates/fluss/src/row/column.rs | 42 ++--- crates/fluss/src/row/column_writer.rs | 102 ++++++++---- .../fluss/src/row/compacted/compacted_row.rs | 20 +-- crates/fluss/src/row/datum.rs | 11 +- crates/fluss/src/row/field_getter.rs | 6 +- crates/fluss/src/row/lookup_row.rs | 5 +- crates/fluss/src/row/mod.rs | 12 +- crates/fluss/src/row/projected_row.rs | 5 +- crates/fluss/tests/integration/log_table.rs | 14 +- website/docs/user-guide/rust/api-reference.md | 23 ++- website/docs/user-guide/rust/data-types.md | 60 ++++++- 14 files changed, 349 insertions(+), 119 deletions(-) diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index 9ef8fe72..23ac636d 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -589,9 +589,7 @@ pub fn compacted_row_to_owned( Datum::Blob(Cow::Owned(row.get_binary(i, dt.length())?.to_vec())) } fcore::metadata::DataType::Array(_) => Datum::Array(row.get_array(i)?), - fcore::metadata::DataType::Map(mt) => { - Datum::Map(row.get_map(i, mt.key_type(), mt.value_type())?) - } + fcore::metadata::DataType::Map(_) => Datum::Map(row.get_map(i)?), other => return Err(anyhow!("Unsupported data type for column {i}: {other:?}")), }; diff --git a/crates/fluss/src/row/binary_array.rs b/crates/fluss/src/row/binary_array.rs index d4fab762..b987cec8 100644 --- a/crates/fluss/src/row/binary_array.rs +++ b/crates/fluss/src/row/binary_array.rs @@ -832,8 +832,15 @@ impl InternalRow for FlussArray { self.get_array(pos) } - fn get_map(&self, pos: usize, key_type: &DataType, value_type: &DataType) -> Result { - self.get_map(pos, key_type, value_type) + fn get_map(&self, pos: usize) -> Result { + // FlussArray carries no schema; nested map reads must go through the + // inherent FlussArray::get_map(pos, key_type, value_type). + Err(IllegalArgument { + message: format!( + "InternalRow::get_map is not supported on FlussArray (pos {pos}); \ + use FlussArray::get_map(pos, key_type, value_type) directly" + ), + }) } } diff --git a/crates/fluss/src/row/binary_map.rs b/crates/fluss/src/row/binary_map.rs index 46e82899..57201b84 100644 --- a/crates/fluss/src/row/binary_map.rs +++ b/crates/fluss/src/row/binary_map.rs @@ -26,7 +26,7 @@ use crate::error::Error::IllegalArgument; use crate::error::Result; use crate::metadata::DataType; use crate::row::binary_array::{FlussArray, FlussArrayWriter}; -use crate::row::datum::Datum; +use crate::row::datum::{Datum, read_datum_from_fluss_array}; use bytes::Bytes; use serde::Serialize; use std::fmt; @@ -41,6 +41,8 @@ pub struct FlussMap { data: Bytes, key_array: FlussArray, value_array: FlussArray, + key_type: DataType, + value_type: DataType, } impl fmt::Debug for FlussMap { @@ -190,6 +192,8 @@ impl FlussMap { data: Bytes::copy_from_slice(data), key_array, value_array, + key_type: key_type.clone(), + value_type: value_type.clone(), }) } @@ -204,13 +208,20 @@ impl FlussMap { data, key_array, value_array, + key_type: key_type.clone(), + value_type: value_type.clone(), }) } /// Creates a FlussMap by combining a key array and a value array. /// /// Copies both arrays into a new contiguous buffer. - pub fn from_arrays(key_array: &FlussArray, value_array: &FlussArray) -> Result { + pub fn from_arrays( + key_array: &FlussArray, + value_array: &FlussArray, + key_type: &DataType, + value_type: &DataType, + ) -> Result { if key_array.size() != value_array.size() { return Err(IllegalArgument { message: format!( @@ -239,6 +250,8 @@ impl FlussMap { data, key_array: key_array.clone(), value_array: value_array.clone(), + key_type: key_type.clone(), + value_type: value_type.clone(), }) } @@ -261,8 +274,61 @@ impl FlussMap { pub fn value_array(&self) -> &FlussArray { &self.value_array } + + pub fn key_type(&self) -> &DataType { + &self.key_type + } + + pub fn value_type(&self) -> &DataType { + &self.value_type + } + + pub fn entries(&self) -> Entries<'_> { + Entries { + map: self, + index: 0, + } + } + + /// O(n) linear scan; the binary format carries no key index. + pub fn get<'a>(&'a self, key: &Datum<'_>) -> Result>> { + for entry in self.entries() { + let (k, v) = entry?; + if &k == key { + return Ok(Some(v)); + } + } + Ok(None) + } +} + +pub struct Entries<'a> { + map: &'a FlussMap, + index: usize, } +impl<'a> Iterator for Entries<'a> { + type Item = Result<(Datum<'a>, Datum<'a>)>; + + fn next(&mut self) -> Option { + if self.index >= self.map.size() { + return None; + } + let i = self.index; + self.index += 1; + let key = read_datum_from_fluss_array(&self.map.key_array, i, &self.map.key_type); + let value = read_datum_from_fluss_array(&self.map.value_array, i, &self.map.value_type); + Some(key.and_then(|k| value.map(|v| (k, v)))) + } + + fn size_hint(&self) -> (usize, Option) { + let remaining = self.map.size() - self.index; + (remaining, Some(remaining)) + } +} + +impl ExactSizeIterator for Entries<'_> {} + /// Writer for building a `FlussMap` entry by entry. pub struct FlussMapWriter { key_writer: FlussArrayWriter, @@ -284,6 +350,18 @@ impl FlussMapWriter { } } + pub fn extend<'a, I, K, V>(&mut self, entries: I) -> Result<()> + where + I: IntoIterator, + K: Into>, + V: Into>, + { + for (k, v) in entries { + self.write_entry(k.into(), v.into())?; + } + Ok(()) + } + /// Writes a key-value entry into the map. /// /// # Errors @@ -315,7 +393,7 @@ impl FlussMapWriter { pub fn complete(self) -> Result { let key_array = self.key_writer.complete()?; let value_array = self.value_writer.complete()?; - FlussMap::from_arrays(&key_array, &value_array) + FlussMap::from_arrays(&key_array, &value_array, &self.key_type, &self.value_type) } fn write_datum( @@ -480,7 +558,13 @@ mod tests { let value_writer = FlussArrayWriter::new(2, &DataTypes::string()); let value_array = value_writer.complete().unwrap(); - let err = FlussMap::from_arrays(&key_array, &value_array).unwrap_err(); + let err = FlussMap::from_arrays( + &key_array, + &value_array, + &DataTypes::int(), + &DataTypes::string(), + ) + .unwrap_err(); assert!(err.to_string().contains("does not match value array size")); } @@ -520,7 +604,13 @@ mod tests { value_writer.write_int(0, 100); let value_array = value_writer.complete().unwrap(); - let map = FlussMap::from_arrays(&key_array, &value_array).unwrap(); + let map = FlussMap::from_arrays( + &key_array, + &value_array, + &DataTypes::int(), + &DataTypes::int(), + ) + .unwrap(); let bytes = map.as_bytes(); // Valid bytes should pass @@ -545,7 +635,13 @@ mod tests { value_writer.write_int(0, 100); let value_array = value_writer.complete().unwrap(); - let err = FlussMap::from_arrays(&key_array, &value_array).unwrap_err(); + let err = FlussMap::from_arrays( + &key_array, + &value_array, + &DataTypes::int(), + &DataTypes::int(), + ) + .unwrap_err(); assert!(err.to_string().contains("keys cannot be null")); let key_bytes = key_array.as_bytes(); @@ -558,4 +654,49 @@ mod tests { let err = FlussMap::from_bytes(&data, &DataTypes::int(), &DataTypes::int()).unwrap_err(); assert!(err.to_string().contains("keys cannot be null")); } + + #[test] + fn entries_yields_typed_pairs_including_nulls() { + let mut writer = FlussMapWriter::new(3, &DataTypes::string(), &DataTypes::int()); + writer.write_entry("a".into(), 1.into()).unwrap(); + writer.write_entry("b".into(), Datum::Null).unwrap(); + writer.write_entry("c".into(), 3.into()).unwrap(); + let map = writer.complete().unwrap(); + + let collected: Vec<(Datum, Datum)> = map + .entries() + .collect::>>() + .expect("entries should decode cleanly"); + + assert_eq!(collected.len(), 3); + assert_eq!(collected[0], (Datum::from("a"), Datum::from(1i32))); + assert_eq!(collected[1].0, Datum::from("b")); + assert_eq!(collected[1].1, Datum::Null); + assert_eq!(collected[2], (Datum::from("c"), Datum::from(3i32))); + } + + #[test] + fn get_finds_present_key_and_returns_none_for_absent() { + let mut writer = FlussMapWriter::new(2, &DataTypes::string(), &DataTypes::int()); + writer.write_entry("a".into(), 10.into()).unwrap(); + writer.write_entry("b".into(), 20.into()).unwrap(); + let map = writer.complete().unwrap(); + + let v = map.get(&Datum::from("b")).unwrap(); + assert_eq!(v, Some(Datum::from(20i32))); + + let missing = map.get(&Datum::from("z")).unwrap(); + assert!(missing.is_none()); + } + + #[test] + fn writer_extend_from_iterator_round_trips() { + let src: Vec<(&str, i32)> = vec![("a", 1), ("b", 2), ("c", 3)]; + let mut writer = FlussMapWriter::new(src.len(), &DataTypes::string(), &DataTypes::int()); + writer.extend(src).unwrap(); + let map = writer.complete().unwrap(); + + assert_eq!(map.size(), 3); + assert_eq!(map.get(&Datum::from("b")).unwrap(), Some(Datum::from(2i32))); + } } diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs index 8b23423b..f4da6b3f 100644 --- a/crates/fluss/src/row/column.rs +++ b/crates/fluss/src/row/column.rs @@ -690,7 +690,17 @@ impl InternalRow for ColumnarRow { writer.complete() } - fn get_map(&self, pos: usize, key_type: &DataType, value_type: &DataType) -> Result { + fn get_map(&self, pos: usize) -> Result { + let expected_type = self.row_type.fields()[pos].data_type(); + let map_type = match expected_type { + DataType::Map(m) => m, + _ => { + return Err(IllegalArgument { + message: format!("expected Map type at position {pos}, got {expected_type:?}"), + }); + } + }; + let column = self.column(pos)?; let map_arr = column @@ -703,7 +713,11 @@ impl InternalRow for ColumnarRow { ), })?; - arrow_map_entry_to_fluss_map(&map_arr.value(self.row_id), key_type, value_type) + arrow_map_entry_to_fluss_map( + &map_arr.value(self.row_id), + map_type.key_type(), + map_type.value_type(), + ) } fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { @@ -799,7 +813,7 @@ fn arrow_map_entry_to_fluss_map( write_arrow_values_to_fluss_array(&**values_arrow, value_type, &mut value_writer)?; let value_array = value_writer.complete()?; - FlussMap::from_arrays(&key_array, &value_array) + FlussMap::from_arrays(&key_array, &value_array, key_type, value_type) } /// Downcast to a primitive Arrow array type, then loop with null checks calling a writer method. @@ -1560,16 +1574,12 @@ mod tests { Arc::new(RecordBatch::try_new(schema, vec![Arc::new(map_arr)]).expect("record batch")); let map_type = DataTypes::map(DataTypes::int(), DataTypes::string()); - let row_type = Arc::new(RowType::with_data_types(vec![map_type.clone()])); + let row_type = Arc::new(RowType::with_data_types(vec![map_type])); let row = ColumnarRow::new(batch, row_type, 0, None); - let (k, v) = match &map_type { - crate::metadata::DataType::Map(m) => (m.key_type(), m.value_type()), - _ => unreachable!(), - }; let fluss_map = row - .get_map(0, k, v) - .expect("get_map should accept non-nullable key from MapType"); + .get_map(0) + .expect("get_map should succeed on ColumnarRow"); assert_eq!(fluss_map.size(), 1); assert_eq!(fluss_map.key_array().get_int(0).unwrap(), 1); assert_eq!(fluss_map.value_array().get_string(0).unwrap(), "a"); @@ -1628,9 +1638,7 @@ mod tests { .get_row(0) .expect("reading row with Map field must succeed"); assert_eq!(nested.get_int(0).unwrap(), 10); - let inner_map = nested - .get_map(1, &DataTypes::string(), &DataTypes::int()) - .expect("nested map should be accessible"); + let inner_map = nested.get_map(1).expect("nested map should be accessible"); assert_eq!(inner_map.size(), 1); assert_eq!(inner_map.key_array().get_string(0).unwrap(), "k1"); assert_eq!(inner_map.value_array().get_int(0).unwrap(), 42); @@ -1639,9 +1647,7 @@ mod tests { row.set_row_id(1); let nested = row.get_row(0).expect("row 1 must read"); assert_eq!(nested.get_int(0).unwrap(), 20); - let inner_map = nested - .get_map(1, &DataTypes::string(), &DataTypes::int()) - .unwrap(); + let inner_map = nested.get_map(1).unwrap(); assert_eq!(inner_map.key_array().get_string(0).unwrap(), "k2"); assert_eq!(inner_map.value_array().get_int(0).unwrap(), 7); } @@ -1726,9 +1732,7 @@ mod tests { )])); let row = ColumnarRow::new(batch, row_type, 0, None); - let err = row - .get_map(0, &DataTypes::string(), &DataTypes::string()) - .expect_err("type mismatch must error"); + let err = row.get_map(0).expect_err("type mismatch must error"); let msg = err.to_string(); assert!( msg.contains("does not match expected Fluss type"), diff --git a/crates/fluss/src/row/column_writer.rs b/crates/fluss/src/row/column_writer.rs index bbd28767..94777faf 100644 --- a/crates/fluss/src/row/column_writer.rs +++ b/crates/fluss/src/row/column_writer.rs @@ -22,6 +22,7 @@ use crate::error::Error::RowConvertError; use crate::error::{Error, Result}; use crate::metadata::{DataType, RowType}; +use crate::row::FlussMap; use crate::row::InternalRow; use crate::row::datum::{ MICROS_PER_MILLI, MILLIS_PER_SECOND, NANOS_PER_MILLI, append_decimal_to_builder, @@ -833,30 +834,52 @@ impl ColumnWriter { } => { let array = row.get_array(pos)?; let size = array.size(); - if let TypedWriter::Struct { - field_writers, - validity: child_validity, - row_type, - .. - } = &mut element_writer.inner - { - for i in 0..size { - if array.is_null_at(i) { - for child in field_writers.iter_mut() { - child.append_null(); + match &mut element_writer.inner { + TypedWriter::Struct { + field_writers, + validity: child_validity, + row_type, + .. + } => { + for i in 0..size { + if array.is_null_at(i) { + for child in field_writers.iter_mut() { + child.append_null(); + } + child_validity.push(false); + } else { + let nested = array.get_row(i, row_type)?; + for (j, child) in field_writers.iter_mut().enumerate() { + child.write_field_at(&nested, j)?; + } + child_validity.push(true); } - child_validity.push(false); - } else { - let nested = array.get_row(i, row_type)?; - for (j, child) in field_writers.iter_mut().enumerate() { - child.write_field_at(&nested, j)?; + } + } + TypedWriter::Map { + key_writer, + value_writer, + key_type, + value_type, + offsets: child_offsets, + validity: child_validity, + } => { + for i in 0..size { + if array.is_null_at(i) { + child_validity.push(false); + let last = *child_offsets.last().unwrap(); + child_offsets.push(last); + } else { + let map = array.get_map(i, key_type, value_type)?; + write_map_into(map, key_writer, value_writer, child_offsets)?; + child_validity.push(true); } - child_validity.push(true); } } - } else { - for i in 0..size { - element_writer.write_field_at(&array, i)?; + _ => { + for i in 0..size { + element_writer.write_field_at(&array, i)?; + } } } let last = *offsets.last().unwrap(); @@ -871,24 +894,12 @@ impl ColumnWriter { TypedWriter::Map { key_writer, value_writer, - key_type, - value_type, offsets, validity, + .. } => { - let map = row.get_map(pos, key_type, value_type)?; - let key_array = map.key_array(); - let value_array = map.value_array(); - for i in 0..map.size() { - key_writer.write_field_at(key_array, i)?; - value_writer.write_field_at(value_array, i)?; - } - let last = *offsets.last().unwrap(); - offsets.push( - last + i32::try_from(map.size()).map_err(|_| RowConvertError { - message: format!("Map size {} exceeds i32 range", map.size()), - })?, - ); + let map = row.get_map(pos)?; + write_map_into(map, key_writer, value_writer, offsets)?; validity.push(true); Ok(()) } @@ -908,6 +919,27 @@ impl ColumnWriter { } } +fn write_map_into( + map: FlussMap, + key_writer: &mut ColumnWriter, + value_writer: &mut ColumnWriter, + offsets: &mut Vec, +) -> Result<()> { + let key_array = map.key_array(); + let value_array = map.value_array(); + for i in 0..map.size() { + key_writer.write_field_at(key_array, i)?; + value_writer.write_field_at(value_array, i)?; + } + let last = *offsets.last().unwrap(); + offsets.push( + last + i32::try_from(map.size()).map_err(|_| RowConvertError { + message: format!("Map size {} exceeds i32 range", map.size()), + })?, + ); + Ok(()) +} + fn finish_struct_array( fields: arrow_schema::Fields, child_arrays: Vec, diff --git a/crates/fluss/src/row/compacted/compacted_row.rs b/crates/fluss/src/row/compacted/compacted_row.rs index 2463e479..7f2b5c04 100644 --- a/crates/fluss/src/row/compacted/compacted_row.rs +++ b/crates/fluss/src/row/compacted/compacted_row.rs @@ -18,7 +18,7 @@ use crate::client::WriteFormat; use crate::error::Error::IllegalArgument; use crate::error::Result; -use crate::metadata::{DataType, RowType}; +use crate::metadata::RowType; use crate::row::binary_array::FlussArray; use crate::row::binary_map::FlussMap; use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, CompactedRowReader}; @@ -174,8 +174,8 @@ impl<'a> InternalRow for CompactedRow<'a> { self.decoded_row()?.get_array(pos) } - fn get_map(&self, pos: usize, key_type: &DataType, value_type: &DataType) -> Result { - self.decoded_row()?.get_map(pos, key_type, value_type) + fn get_map(&self, pos: usize) -> Result { + self.decoded_row()?.get_map(pos) } fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { @@ -508,9 +508,7 @@ mod tests { let bytes = writer.to_bytes(); let row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); - let read_map = row - .get_map(0, &DataTypes::int(), &DataTypes::string()) - .unwrap(); + let read_map = row.get_map(0).unwrap(); assert_eq!(read_map.size(), 2); assert_eq!(read_map.key_array().get_int(0).unwrap(), 1); assert_eq!(read_map.value_array().get_string(0).unwrap(), "a"); @@ -549,9 +547,7 @@ mod tests { let row2 = CompactedRow::from_bytes(&row_type, bytes2.as_ref()); assert_eq!(row2.get_int(0).unwrap(), 99); assert!(!row2.is_null_at(1).unwrap()); - let read_map = row2 - .get_map(1, &DataTypes::int(), &DataTypes::string()) - .unwrap(); + let read_map = row2.get_map(1).unwrap(); assert_eq!(read_map.size(), 1); assert_eq!(read_map.key_array().get_int(0).unwrap(), 7); assert_eq!(read_map.value_array().get_string(0).unwrap(), "hello"); @@ -593,7 +589,7 @@ mod tests { let bytes = writer.to_bytes(); let row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); - let read_map = row.get_map(0, &DataTypes::string(), &array_type).unwrap(); + let read_map = row.get_map(0).unwrap(); assert_eq!(read_map.size(), 2); assert_eq!(read_map.key_array().get_string(0).unwrap(), "a"); assert_eq!(read_map.key_array().get_string(1).unwrap(), "b"); @@ -622,9 +618,7 @@ mod tests { let bytes = writer.to_bytes(); let row = CompactedRow::from_bytes(&row_type, bytes.as_ref()); - let read_map = row - .get_map(0, &DataTypes::int(), &DataTypes::string()) - .unwrap(); + let read_map = row.get_map(0).unwrap(); assert_eq!(read_map.size(), 0); } } diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs index b1595b31..e6a67394 100644 --- a/crates/fluss/src/row/datum.rs +++ b/crates/fluss/src/row/datum.rs @@ -681,7 +681,7 @@ fn append_fluss_map_to_map_builder( Ok(()) } -fn read_datum_from_fluss_array<'a>( +pub(crate) fn read_datum_from_fluss_array<'a>( arr: &FlussArray, pos: usize, element_type: &crate::metadata::DataType, @@ -693,6 +693,15 @@ fn read_datum_from_fluss_array<'a>( )?))); } + // FlussArray has no attached schema; use the typed inherent accessor. + if let DataType::Map(map_type) = element_type { + return Ok(Datum::Map(arr.get_map( + pos, + map_type.key_type(), + map_type.value_type(), + )?)); + } + let getter = FieldGetter::create(element_type, pos); Ok(getter.get_field(arr)?.into_owned()) } diff --git a/crates/fluss/src/row/field_getter.rs b/crates/fluss/src/row/field_getter.rs index 41322f54..3c2c7ce1 100644 --- a/crates/fluss/src/row/field_getter.rs +++ b/crates/fluss/src/row/field_getter.rs @@ -196,11 +196,7 @@ impl InnerFieldGetter { Datum::TimestampLtz(row.get_timestamp_ltz(*pos, *precision)?) } InnerFieldGetter::Array { pos } => Datum::Array(row.get_array(*pos)?), - InnerFieldGetter::Map { - pos, - key_type, - value_type, - } => Datum::Map(row.get_map(*pos, key_type, value_type)?), + InnerFieldGetter::Map { pos, .. } => Datum::Map(row.get_map(*pos)?), InnerFieldGetter::Row { pos } => Datum::Row(Box::new(row.get_row(*pos)?.clone())), }) } diff --git a/crates/fluss/src/row/lookup_row.rs b/crates/fluss/src/row/lookup_row.rs index fd3db4fc..6271a7eb 100644 --- a/crates/fluss/src/row/lookup_row.rs +++ b/crates/fluss/src/row/lookup_row.rs @@ -21,7 +21,6 @@ use crate::client::WriteFormat; use crate::error::Result; -use crate::metadata::DataType; use crate::row::compacted::CompactedRow; use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz}; use crate::row::projected_row::ProjectedRow; @@ -117,8 +116,8 @@ impl<'a> InternalRow for LookupRow<'a> { fn get_array(&self, pos: usize) -> Result { delegate!(self, get_array, pos) } - fn get_map(&self, pos: usize, key_type: &DataType, value_type: &DataType) -> Result { - delegate!(self, get_map, pos, key_type, value_type) + fn get_map(&self, pos: usize) -> Result { + delegate!(self, get_map, pos) } fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { delegate!(self, get_row, pos) diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index 2456ee4d..1e045b2d 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -33,7 +33,6 @@ mod projected_row; mod row_decoder; use crate::client::WriteFormat; -use crate::metadata::DataType; pub use binary_array::{FlussArray, FlussArrayWriter}; pub use binary_map::{FlussMap, FlussMapWriter}; use bytes::Bytes; @@ -135,9 +134,9 @@ pub trait InternalRow: Send + Sync { fn get_array(&self, pos: usize) -> Result; /// Returns the map value at the given position - fn get_map(&self, pos: usize, key_type: &DataType, value_type: &DataType) -> Result; + fn get_map(&self, pos: usize) -> Result; - /// Returns the nested row value at the given position + /// Returns the nested row value at the given position fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { Err(IllegalArgument { message: format!("get_row not supported at position {pos}"), @@ -309,12 +308,7 @@ impl<'a> InternalRow for GenericRow<'a> { } } - fn get_map( - &self, - pos: usize, - _key_type: &DataType, - _value_type: &DataType, - ) -> Result { + fn get_map(&self, pos: usize) -> Result { match self.get_value(pos)? { Datum::Map(m) => Ok(m.clone()), other => Err(IllegalArgument { diff --git a/crates/fluss/src/row/projected_row.rs b/crates/fluss/src/row/projected_row.rs index 0075f0b8..f08778cc 100644 --- a/crates/fluss/src/row/projected_row.rs +++ b/crates/fluss/src/row/projected_row.rs @@ -21,7 +21,6 @@ use crate::client::WriteFormat; use crate::error::Error::IllegalArgument; use crate::error::Result; -use crate::metadata::DataType; use crate::metadata::UNEXIST_MAPPING; use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz}; use crate::row::{Decimal, FlussArray, FlussMap, GenericRow, InternalRow}; @@ -143,8 +142,8 @@ impl InternalRow for ProjectedRow { project!(self, get_array, pos) } - fn get_map(&self, pos: usize, key_type: &DataType, value_type: &DataType) -> Result { - project!(self, get_map, pos, key_type, value_type) + fn get_map(&self, pos: usize) -> Result { + project!(self, get_map, pos) } fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { diff --git a/crates/fluss/tests/integration/log_table.rs b/crates/fluss/tests/integration/log_table.rs index 11c5b3e9..fc8f8376 100644 --- a/crates/fluss/tests/integration/log_table.rs +++ b/crates/fluss/tests/integration/log_table.rs @@ -1102,18 +1102,8 @@ mod table_test { let found_row = records[0].row(); assert_eq!(found_row.get_int(0).unwrap(), 1); - // 4. Assert Map. Look the types up from `map_type` rather than reusing - // the locally-stashed `key_type`/`value_type`: `MapType::with_nullable` - // forces the stored key non-nullable, which exercises the same - // (non-nullable schema) vs (Arrow-derived nullable) comparison realistic - // callers hit. - let (mt_key, mt_value) = match &map_type { - fluss::metadata::DataType::Map(m) => (m.key_type(), m.value_type()), - _ => unreachable!("map_type is a MAP"), - }; - let decoded_map = found_row - .get_map(1, mt_key, mt_value) - .expect("Failed to get map"); + // 4. Assert Map + let decoded_map = found_row.get_map(1).expect("Failed to get map"); assert_eq!(decoded_map.size(), 3); let decoded_keys = decoded_map.key_array(); diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index 7ef34ef3..52999e8c 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -466,7 +466,7 @@ Implements the `InternalRow` trait (see below). | `fn get_binary(&self, idx: usize, length: usize) -> Result<&[u8]>` | Get fixed-length binary value | | `fn get_char(&self, idx: usize, length: usize) -> Result<&str>` | Get fixed-length char value | | `fn get_array(&self, idx: usize) -> Result` | Get array value | -| `fn get_map(&self, idx: usize, key_type: &DataType, value_type: &DataType) -> Result` | Get map value | +| `fn get_map(&self, idx: usize) -> Result` | Get map value | ## `FlussArray` @@ -488,10 +488,25 @@ Element getters mirror `InternalRow` typed getters and return `Result`. For e |--------|-------------| | `fn size(&self) -> usize` | Number of entries in the map | | `fn as_bytes(&self) -> &[u8]` | Get encoded bytes of the map | -| `fn key_array(&self) -> &FlussArray` | Get the key array | -| `fn value_array(&self) -> &FlussArray` | Get the value array | +| `fn key_type(&self) -> &DataType` | Schema-declared type of keys | +| `fn value_type(&self) -> &DataType` | Schema-declared type of values | +| `fn entries(&self) -> Entries<'_>` | Iterator yielding `Result<(Datum, Datum)>` pairs | +| `fn get(&self, key: &Datum) -> Result>` | Linear-scan lookup by key (`O(n)`) | +| `fn key_array(&self) -> &FlussArray` | Parallel keys array (zero-copy view) | +| `fn value_array(&self) -> &FlussArray` | Parallel values array (zero-copy view) | -Key and value arrays are returned as `&FlussArray`, allowing you to read entries by retrieving keys and values at the same index positions. +Most user code should prefer `entries()` (iteration) and `get()` (lookup). The `key_array()` / `value_array()` views are for serdes and Arrow-adapter code that needs zero-copy access to the underlying parallel-array layout. + +## `FlussMapWriter` + +`FlussMapWriter` builds a `FlussMap` for write paths. + +| Method | Description | +|--------|-------------| +| `fn new(capacity: usize, key_type: &DataType, value_type: &DataType) -> Self` | Create a writer sized for `capacity` entries | +| `fn write_entry(&mut self, key: Datum, value: Datum) -> Result<()>` | Append a single entry; rejects null keys and type mismatches | +| `fn extend(&mut self, entries: I) -> Result<()>` | Append every pair from `entries: IntoIterator` | +| `fn complete(self) -> Result` | Finalize the writer and produce the `FlussMap` | ## `ChangeType` diff --git a/website/docs/user-guide/rust/data-types.md b/website/docs/user-guide/rust/data-types.md index ad14028b..54188391 100644 --- a/website/docs/user-guide/rust/data-types.md +++ b/website/docs/user-guide/rust/data-types.md @@ -22,7 +22,7 @@ sidebar_position: 3 | `BYTES` | `&[u8]` | `get_bytes()` | `set_field(idx, &[u8])` | | `BINARY(n)` | `&[u8]` | `get_binary(idx, length)` | `set_field(idx, &[u8])` | | `ARRAY` | `FlussArray` | `get_array()` | `set_field(idx, FlussArray)` | -| `MAP` | `FlussMap` | `get_map(idx, key_type, value_type)` | `set_field(idx, FlussMap)` | +| `MAP` | `FlussMap` | `get_map(idx)` | `set_field(idx, FlussMap)` | ## Constructing Special Types @@ -86,9 +86,11 @@ row.set_field(0, Datum::Array(arr)); ## Maps -Use `DataTypes::map(key_type, value_type)` in schema definitions. At runtime, read maps with `row.get_map(idx, &key_type, &value_type)?`. +Use `DataTypes::map(key_type, value_type)` in schema definitions. At runtime, read maps with `row.get_map(idx)?` — the row knows its schema, so no extra type arguments are needed. -To construct map values for writes, build a `FlussMap` using `FlussMapWriter` and wrap it with `Datum::Map`: +### Writing + +Build a `FlussMap` entry-by-entry, then wrap it with `Datum::Map`: ```rust use fluss::metadata::DataTypes; @@ -104,7 +106,57 @@ let mut row = GenericRow::new(1); row.set_field(0, Datum::Map(map)); ``` -`MAP` keys cannot be null. `MAP` is supported for row values and nested row fields. Like arrays, `MAP` follows Java parity for key encoding and can be encoded by the compacted key encoder, while table-level key constraints are validated by the server. +For bulk writes from any iterator of `(key, value)` pairs (including a `HashMap`), use `extend`: + +```rust +use std::collections::HashMap; + +let entries: HashMap<&str, i32> = HashMap::from([("a", 1), ("b", 2)]); +let mut writer = FlussMapWriter::new(entries.len(), &DataTypes::string(), &DataTypes::int()); +writer.extend(entries)?; +let map = writer.complete()?; +``` + +### Reading + +The `entries()` iterator yields `(key, value)` pairs as schema-typed `Datum`s, folding the null check in: + +```rust +use fluss::row::InternalRow; + +let m = row.get_map(0)?; +for entry in m.entries() { + let (k, v) = entry?; + println!("{k:?} => {v:?}"); // Datum's Debug handles null +} +``` + +For point lookups, `get(&key)` does a linear scan and returns `Option`: + +```rust +use fluss::row::Datum; + +if let Some(v) = m.get(&Datum::from("attr_size"))? { + println!("size = {v:?}"); +} +``` + +Lookup is `O(n)` — the binary MAP layout has no key index. If you need repeated lookups against the same map, collect the entries once: + +```rust +use std::collections::HashMap; + +let snapshot: HashMap> = m + .entries() + .map(|e| e.map(|(k, v)| (format!("{k:?}"), v))) + .collect::>()?; +``` + +For raw access to the underlying parallel-array representation (zero-copy, used by serdes / Arrow adapters), `m.key_array()` and `m.value_array()` are still available. + +### Constraints + +`MAP` keys cannot be null. `MAP` is supported for row values and nested row fields. `MAP` cannot be used as a primary key or bucket key column — the Rust client rejects it at the compacted key encoder, and the Fluss server bans `MAP` (along with `ARRAY` and `ROW`) from key columns. ## Reading Row Data