diff --git a/src/sqlite_provider.rs b/src/sqlite_provider.rs index 1c453ca..ea083b8 100644 --- a/src/sqlite_provider.rs +++ b/src/sqlite_provider.rs @@ -17,12 +17,15 @@ use std::any::Any; use std::fmt; use std::sync::{Arc, Mutex}; -use arrow_array::builder::{Int32Builder, Int64Builder, ListBuilder, StringBuilder}; +use arrow_array::builder::{ + GenericListBuilder, Int32Builder, Int64Builder, LargeStringBuilder, StringBuilder, + StringViewBuilder, +}; use arrow_array::{ - Array, ArrayRef, Float32Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, - UInt32Array, UInt64Array, + Array, ArrayRef, Float32Array, Float64Array, Int32Array, Int64Array, OffsetSizeTrait, + RecordBatch, StringArray, UInt32Array, UInt64Array, }; -use arrow_schema::{DataType, SchemaRef}; +use arrow_schema::{DataType, FieldRef, SchemaRef, TimeUnit}; use async_trait::async_trait; use datafusion::catalog::{Session, TableProvider}; use datafusion::common::Result as DFResult; @@ -210,6 +213,7 @@ impl SqliteLookupProvider { "pool_size must be at least 1".into(), )); } + validate_payload_schema(&schema)?; let conn = open_conn(db_path)?; let table_exists: bool = conn @@ -333,6 +337,7 @@ impl SqliteSidecarBuilder { value_col_indices.len() ))); } + validate_payload_schema(&schema)?; let (create_sql, insert_sql) = ddl(table_name, &schema); let conn = open_conn(db_path)?; // Manual BEGIN/COMMIT rather than a borrowed `Transaction` so the @@ -978,11 +983,105 @@ fn build_table( // ── Type conversion helpers ─────────────────────────────────────────────────── +/// Single source of truth for which Arrow types this provider can store in the +/// sidecar **and** reconstruct on read-back. Every type that returns `true` here +/// must have a matching arm in all of `arrow_cell_to_sql` (write), `arrow_type_to_sql` +/// (column affinity), and `sql_values_to_arrow` (read). `validate_payload_schema` +/// gates every build against this set so an unsupported column fails at index +/// build time — not later at query time, half a sidecar in. +fn payload_type_supported(dt: &DataType) -> bool { + match dt { + DataType::Boolean + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Float32 + | DataType::Float64 + | DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Utf8View + | DataType::Binary + | DataType::LargeBinary + | DataType::Date32 + | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Timestamp(_, _) => true, + DataType::List(item) | DataType::LargeList(item) => { + list_item_type_supported(item.data_type()) + } + _ => false, + } +} + +/// Item types reconstructable by `json_text_to_list`. Lists are serialized to +/// JSON TEXT, so only types with a JSON representation `serialize_list` writes +/// and `json_text_to_list` reads are supported. +fn list_item_type_supported(dt: &DataType) -> bool { + matches!( + dt, + DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Utf8View + | DataType::Int32 + | DataType::Int64 + ) +} + +/// Reject a build whose payload columns include a type the read-back path cannot +/// reconstruct. Called from every build entry point so the failure surfaces at +/// index-creation time with the offending column named, rather than as a +/// `unsupported Arrow type` error on the first query against a sidecar that +/// "built successfully". +/// +/// Field 0 of the *output* schema is always the key column — both `finish()` and +/// `open_or_build` derive the key name from `schema.field(0)` — so it is skipped +/// here and validated separately by `extract_key`. (Note this is the output +/// schema's key position, distinct from `begin`'s `key_col_index`, which indexes +/// the input batch.) +fn validate_payload_schema(schema: &SchemaRef) -> DFResult<()> { + for (i, field) in schema.fields().iter().enumerate() { + if i == 0 { + continue; + } + if !payload_type_supported(field.data_type()) { + return Err(DataFusionError::Execution(format!( + "SqliteLookupProvider: payload column '{}' has unsupported type {:?}. \ + Supported types: Boolean, Int8/16/32/64, UInt8/16/32/64, Float32/64, \ + Utf8/LargeUtf8/Utf8View, Binary/LargeBinary, Date32/64, Time32/64, \ + Timestamp, and List/LargeList of {{Utf8, LargeUtf8, Utf8View, Int32, Int64}}.", + field.name(), + field.data_type() + ))); + } + } + Ok(()) +} + fn arrow_type_to_sql(dt: &DataType) -> &'static str { match dt { - DataType::UInt64 | DataType::UInt32 | DataType::Int32 | DataType::Int64 => "INTEGER", + DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::Boolean + | DataType::Date32 + | DataType::Date64 + | DataType::Time32(_) + | DataType::Time64(_) + | DataType::Timestamp(_, _) => "INTEGER", DataType::Float32 | DataType::Float64 => "REAL", - _ => "TEXT", // Utf8, LargeUtf8, List variants → TEXT (JSON for lists) + DataType::Binary | DataType::LargeBinary => "BLOB", + _ => "TEXT", // Utf8, LargeUtf8, Utf8View, List variants → TEXT (JSON for lists) } } @@ -1007,6 +1106,14 @@ fn arrow_cell_to_sql(col: &ArrayRef, row: usize) -> SqlValue { .value(row); SqlValue::Text(v.to_string()) } + DataType::Utf8View => { + let v = col + .as_any() + .downcast_ref::() + .unwrap() + .value(row); + SqlValue::Text(v.to_string()) + } DataType::Int32 => SqlValue::Integer( col.as_any() .downcast_ref::() @@ -1046,7 +1153,127 @@ fn arrow_cell_to_sql(col: &ArrayRef, row: usize) -> SqlValue { .unwrap() .value(row), ), + DataType::Boolean => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + ), + DataType::Date32 => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + ), + DataType::Date64 => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row), + ), + // Timestamps are stored as their raw i64 tick count; the unit and time + // zone live in the schema and are restored on read-back. + DataType::Timestamp(unit, _) => { + let v = match unit { + TimeUnit::Second => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row), + TimeUnit::Millisecond => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row), + TimeUnit::Microsecond => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row), + TimeUnit::Nanosecond => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row), + }; + SqlValue::Integer(v) + } + DataType::Int8 => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + ), + DataType::Int16 => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + ), + DataType::UInt8 => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + ), + DataType::UInt16 => SqlValue::Integer( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + ), + // Time-of-day is stored as its raw tick count; the unit is restored from + // the schema on read-back. Time32 only has Second/Millisecond units and + // Time64 only Microsecond/Nanosecond — other combinations are invalid. + DataType::Time32(unit) => { + let v = match unit { + TimeUnit::Second => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + TimeUnit::Millisecond => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row) as i64, + _ => return SqlValue::Null, + }; + SqlValue::Integer(v) + } + DataType::Time64(unit) => { + let v = match unit { + TimeUnit::Microsecond => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row), + TimeUnit::Nanosecond => col + .as_any() + .downcast_ref::() + .unwrap() + .value(row), + _ => return SqlValue::Null, + }; + SqlValue::Integer(v) + } + DataType::Binary => SqlValue::Blob( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) + .to_vec(), + ), + DataType::LargeBinary => SqlValue::Blob( + col.as_any() + .downcast_ref::() + .unwrap() + .value(row) + .to_vec(), + ), DataType::List(_) | DataType::LargeList(_) => SqlValue::Text(serialize_list(col, row)), + // Unreachable for any schema that passed `validate_payload_schema`; kept + // as a defensive fallback rather than a panic. _ => SqlValue::Null, } } @@ -1085,6 +1312,14 @@ fn serialize_list(col: &ArrayRef, row: usize) -> String { .value(i); JV::String(s.to_string()) } + DataType::Utf8View => { + let s = list_val + .as_any() + .downcast_ref::() + .unwrap() + .value(i); + JV::String(s.to_string()) + } DataType::Int64 => { let v = list_val .as_any() @@ -1109,6 +1344,45 @@ fn serialize_list(col: &ArrayRef, row: usize) -> String { serde_json::to_string(&items).unwrap_or_else(|_| "[]".to_string()) } +/// Reconstruct a `GenericListArray` (regular `List` when `O = i32`, +/// `LargeList` when `O = i64`) from the JSON TEXT that `serialize_list` writes. +/// The serialized form is identical for both offset widths, so the only +/// difference is the outer builder's offset type — hence the generic. +fn json_text_to_list( + item_field: &FieldRef, + values: &[SqlValue], +) -> DFResult { + macro_rules! build_list { + ($child:expr, $item:ty) => {{ + let mut b = + GenericListBuilder::::new($child).with_field(item_field.as_ref().clone()); + for v in values { + match v { + SqlValue::Text(s) => { + let items: Vec> = serde_json::from_str(s).unwrap_or_default(); + for item in items { + b.values().append_option(item); + } + b.append(true); + } + _ => b.append(false), + } + } + Arc::new(b.finish()) as ArrayRef + }}; + } + Ok(match item_field.data_type() { + DataType::Utf8 => build_list!(StringBuilder::new(), String), + DataType::LargeUtf8 => build_list!(LargeStringBuilder::new(), String), + DataType::Utf8View => build_list!(StringViewBuilder::new(), String), + DataType::Int64 => build_list!(Int64Builder::new(), i64), + DataType::Int32 => build_list!(Int32Builder::new(), i32), + inner => Err(DataFusionError::Execution(format!( + "SqliteLookupProvider: unsupported list item type {inner:?}" + )))?, + }) +} + fn sql_values_to_arrow(dt: &DataType, values: Vec) -> DFResult { Ok(match dt { DataType::UInt64 => { @@ -1161,65 +1435,175 @@ fn sql_values_to_arrow(dt: &DataType, values: Vec) -> DFResult match item_field.data_type() { - DataType::Utf8 | DataType::LargeUtf8 => { - let mut b = - ListBuilder::new(StringBuilder::new()).with_field(item_field.as_ref().clone()); - for v in &values { - match v { - SqlValue::Text(s) => { - let items: Vec> = - serde_json::from_str(s).unwrap_or_default(); - for item in items { - b.values().append_option(item); - } - b.append(true); - } - _ => b.append(false), - } + DataType::LargeUtf8 => { + let mut b = LargeStringBuilder::with_capacity(values.len(), values.len() * 32); + for v in &values { + match v { + SqlValue::Text(s) => b.append_value(s), + _ => b.append_null(), } - Arc::new(b.finish()) } - DataType::Int64 => { - let mut b = - ListBuilder::new(Int64Builder::new()).with_field(item_field.as_ref().clone()); - for v in &values { - match v { - SqlValue::Text(s) => { - let items: Vec> = - serde_json::from_str(s).unwrap_or_default(); - for item in items { - b.values().append_option(item); - } - b.append(true); - } - _ => b.append(false), - } + Arc::new(b.finish()) + } + DataType::Utf8View => { + let mut b = StringViewBuilder::with_capacity(values.len()); + for v in &values { + match v { + SqlValue::Text(s) => b.append_value(s), + _ => b.append_null(), } - Arc::new(b.finish()) } - DataType::Int32 => { - let mut b = - ListBuilder::new(Int32Builder::new()).with_field(item_field.as_ref().clone()); - for v in &values { - match v { - SqlValue::Text(s) => { - let items: Vec> = - serde_json::from_str(s).unwrap_or_default(); - for item in items { - b.values().append_option(item); - } - b.append(true); - } - _ => b.append(false), - } + Arc::new(b.finish()) + } + DataType::Boolean => { + let arr: arrow_array::BooleanArray = values + .iter() + .map(|v| match v { + SqlValue::Integer(i) => Some(*i != 0), + _ => None, + }) + .collect(); + Arc::new(arr) + } + DataType::Date32 => { + let arr: arrow_array::Date32Array = values + .iter() + .map(|v| match v { + SqlValue::Integer(i) => Some(*i as i32), + _ => None, + }) + .collect(); + Arc::new(arr) + } + DataType::Date64 => { + let arr: arrow_array::Date64Array = values + .iter() + .map(|v| match v { + SqlValue::Integer(i) => Some(*i), + _ => None, + }) + .collect(); + Arc::new(arr) + } + // Rebuild the unit-specific array, then restore the schema's time zone so + // the reconstructed column's DataType matches the original exactly. + DataType::Timestamp(unit, tz) => { + let it = values.iter().map(|v| match v { + SqlValue::Integer(i) => Some(*i), + _ => None, + }); + match unit { + TimeUnit::Second => Arc::new( + arrow_array::TimestampSecondArray::from_iter(it).with_timezone_opt(tz.clone()), + ), + TimeUnit::Millisecond => Arc::new( + arrow_array::TimestampMillisecondArray::from_iter(it) + .with_timezone_opt(tz.clone()), + ), + TimeUnit::Microsecond => Arc::new( + arrow_array::TimestampMicrosecondArray::from_iter(it) + .with_timezone_opt(tz.clone()), + ), + TimeUnit::Nanosecond => Arc::new( + arrow_array::TimestampNanosecondArray::from_iter(it) + .with_timezone_opt(tz.clone()), + ), + } + } + DataType::Int8 => { + let arr: arrow_array::Int8Array = values + .iter() + .map(|v| match v { + SqlValue::Integer(i) => Some(*i as i8), + _ => None, + }) + .collect(); + Arc::new(arr) + } + DataType::Int16 => { + let arr: arrow_array::Int16Array = values + .iter() + .map(|v| match v { + SqlValue::Integer(i) => Some(*i as i16), + _ => None, + }) + .collect(); + Arc::new(arr) + } + DataType::UInt8 => { + let arr: arrow_array::UInt8Array = values + .iter() + .map(|v| match v { + SqlValue::Integer(i) => Some(*i as u8), + _ => None, + }) + .collect(); + Arc::new(arr) + } + DataType::UInt16 => { + let arr: arrow_array::UInt16Array = values + .iter() + .map(|v| match v { + SqlValue::Integer(i) => Some(*i as u16), + _ => None, + }) + .collect(); + Arc::new(arr) + } + // Time32 is i32-backed (Second/Millisecond); Time64 is i64-backed + // (Microsecond/Nanosecond). Other unit pairings are invalid Arrow. + DataType::Time32(unit) => { + let it = values.iter().map(|v| match v { + SqlValue::Integer(i) => Some(*i as i32), + _ => None, + }); + match unit { + TimeUnit::Second => Arc::new(arrow_array::Time32SecondArray::from_iter(it)), + TimeUnit::Millisecond => { + Arc::new(arrow_array::Time32MillisecondArray::from_iter(it)) + } + other => Err(DataFusionError::Execution(format!( + "SqliteLookupProvider: invalid Time32 unit {other:?}" + )))?, + } + } + DataType::Time64(unit) => { + let it = values.iter().map(|v| match v { + SqlValue::Integer(i) => Some(*i), + _ => None, + }); + match unit { + TimeUnit::Microsecond => { + Arc::new(arrow_array::Time64MicrosecondArray::from_iter(it)) } - Arc::new(b.finish()) + TimeUnit::Nanosecond => Arc::new(arrow_array::Time64NanosecondArray::from_iter(it)), + other => Err(DataFusionError::Execution(format!( + "SqliteLookupProvider: invalid Time64 unit {other:?}" + )))?, } - inner => Err(DataFusionError::Execution(format!( - "SqliteLookupProvider: unsupported list item type {inner:?}" - )))?, - }, + } + DataType::Binary => { + let mut b = arrow_array::builder::BinaryBuilder::new(); + for v in &values { + match v { + SqlValue::Blob(bytes) => b.append_value(bytes), + _ => b.append_null(), + } + } + Arc::new(b.finish()) + } + DataType::LargeBinary => { + let mut b = arrow_array::builder::LargeBinaryBuilder::new(); + for v in &values { + match v { + SqlValue::Blob(bytes) => b.append_value(bytes), + _ => b.append_null(), + } + } + Arc::new(b.finish()) + } + DataType::List(item_field) => json_text_to_list::(item_field, &values)?, + DataType::LargeList(item_field) => json_text_to_list::(item_field, &values)?, DataType::Float64 => { let arr: Float64Array = values .iter() diff --git a/tests/sqlite_provider_test.rs b/tests/sqlite_provider_test.rs index ee85449..5480ee7 100644 --- a/tests/sqlite_provider_test.rs +++ b/tests/sqlite_provider_test.rs @@ -2,7 +2,9 @@ use std::sync::Arc; -use arrow_array::{Array, Int64Array, RecordBatch, StringArray, UInt64Array}; +use arrow_array::{ + Array, Int64Array, LargeStringArray, RecordBatch, StringArray, StringViewArray, UInt64Array, +}; use arrow_schema::{DataType, Field, Schema}; use datafusion::catalog::TableProvider; use datafusion::prelude::SessionContext; @@ -294,6 +296,293 @@ fn test_stream_builder_validation_errors() { assert!(b_text.push_batch(&text_batch).is_err()); } +/// Regression test for hotdata-dev/runtimedb#631: payload columns typed as +/// `LargeUtf8` (what DuckDB/parquet readers emit for strings) or `Utf8View` +/// (what a `::text` cast produces) must round-trip through the sidecar. The +/// write side already mapped both to TEXT, but the read-back path was missing +/// the reconstruction arms and failed with "unsupported Arrow type LargeUtf8". +#[tokio::test] +async fn test_string_view_and_large_utf8_roundtrip() { + let dir = tempdir().unwrap(); + + // key + a LargeUtf8 column + a Utf8View column. + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("large", DataType::LargeUtf8, true), + Field::new("view", DataType::Utf8View, true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![10_i64, 20, 30])), + Arc::new(LargeStringArray::from(vec![ + Some("alpha"), + None, + Some("gamma"), + ])), + Arc::new(StringViewArray::from(vec![Some("one"), Some("two"), None])), + ], + ) + .unwrap(); + + let db_path = dir.path().join("strings.db"); + let mut builder = SqliteSidecarBuilder::begin( + db_path.to_str().unwrap(), + "models", + 2, + schema, + 0, // key (rowid) is column 0 + vec![1, 2], // columns 1 (large) and 2 (view) are stored values + ) + .unwrap(); + builder.push_batch(&batch).unwrap(); + let provider = builder.finish().unwrap(); + + let batches = provider + .fetch_by_keys(&[10, 30], "rowid", None) + .await + .unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 2); + + // The reconstructed columns must preserve their original Arrow types and values. + let mut large: Vec> = Vec::new(); + let mut view: Vec> = Vec::new(); + for b in &batches { + let l = b + .column_by_name("large") + .unwrap() + .as_any() + .downcast_ref::() + .expect("large column should reconstruct as LargeStringArray"); + let v = b + .column_by_name("view") + .unwrap() + .as_any() + .downcast_ref::() + .expect("view column should reconstruct as StringViewArray"); + for i in 0..b.num_rows() { + large.push(l.is_valid(i).then(|| l.value(i).to_string())); + view.push(v.is_valid(i).then(|| v.value(i).to_string())); + } + } + + assert!(large.contains(&Some("alpha".to_string()))); + assert!(large.contains(&Some("gamma".to_string()))); + assert!(view.contains(&Some("one".to_string()))); + // rowid 30 had a null view value, which must survive the round-trip. + assert!(view.contains(&None)); +} + +/// Companion to the scalar regression above: a `List` payload must +/// also round-trip. The write side serializes list elements to JSON TEXT, so a +/// missing `Utf8View` reconstruction arm would write real values then fail on +/// read-back with "unsupported list item type Utf8View". +#[tokio::test] +async fn test_list_utf8view_roundtrip() { + use arrow_array::ListArray; + use arrow_array::builder::{ListBuilder, StringViewBuilder}; + + let dir = tempdir().unwrap(); + + let item_field = Arc::new(Field::new("item", DataType::Utf8View, true)); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("tags", DataType::List(item_field.clone()), true), + ])); + + // Four rows exercising: a populated list with a null element, another + // populated list, a NULL list cell, and an empty list. + let mut lb = ListBuilder::new(StringViewBuilder::new()).with_field(item_field); + lb.values().append_value("red"); + lb.values().append_null(); + lb.append(true); // rowid 1: ["red", null] + lb.values().append_value("blue"); + lb.values().append_value("green"); + lb.append(true); // rowid 2: ["blue", "green"] + lb.append(false); // rowid 3: NULL list + lb.append(true); // rowid 4: [] (empty list) + let tags = lb.finish(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1_i64, 2, 3, 4])), + Arc::new(tags), + ], + ) + .unwrap(); + + let db_path = dir.path().join("lists.db"); + let mut builder = + SqliteSidecarBuilder::begin(db_path.to_str().unwrap(), "models", 2, schema, 0, vec![1]) + .unwrap(); + builder.push_batch(&batch).unwrap(); + let provider = builder.finish().unwrap(); + + let batches = provider.fetch_by_keys(&[1], "rowid", None).await.unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 1); + + let list = batches[0] + .column_by_name("tags") + .unwrap() + .as_any() + .downcast_ref::() + .expect("tags should reconstruct as a List"); + let inner = list.value(0); + let strs = inner + .as_any() + .downcast_ref::() + .expect("list items should reconstruct as StringViewArray"); + assert_eq!(strs.len(), 2); + assert_eq!(strs.value(0), "red"); + assert!(strs.is_null(1)); + + // A NULL list cell survives as a null list entry; an empty list survives + // as a present-but-empty list (length 0), not as null. + let nullable = provider + .fetch_by_keys(&[3, 4], "rowid", None) + .await + .unwrap(); + let mut null_lists = 0usize; + let mut empty_lists = 0usize; + for b in &nullable { + let rowid = b + .column_by_name("rowid") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let list = b + .column_by_name("tags") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..b.num_rows() { + match rowid.value(i) { + 3 => { + assert!(list.is_null(i), "rowid 3 should be a NULL list"); + null_lists += 1; + } + 4 => { + assert!(list.is_valid(i), "rowid 4 should be a present empty list"); + assert_eq!(list.value(i).len(), 0, "rowid 4 should be empty"); + empty_lists += 1; + } + other => panic!("unexpected rowid {other}"), + } + } + } + assert_eq!(null_lists, 1); + assert_eq!(empty_lists, 1); +} + +/// A `List` payload must reconstruct with a `LargeStringArray` child. +/// Arrow's `ListBuilder::finish()` panics if the declared item field type does +/// not match the child builder, so `LargeUtf8` lists must not be routed through +/// a `Utf8` `StringBuilder`. +#[tokio::test] +async fn test_list_largeutf8_roundtrip() { + use arrow_array::ListArray; + use arrow_array::builder::{LargeStringBuilder, ListBuilder}; + + let dir = tempdir().unwrap(); + + let item_field = Arc::new(Field::new("item", DataType::LargeUtf8, true)); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("tags", DataType::List(item_field.clone()), true), + ])); + + let mut lb = ListBuilder::new(LargeStringBuilder::new()).with_field(item_field); + lb.values().append_value("red"); + lb.values().append_null(); + lb.append(true); + let tags = lb.finish(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int64Array::from(vec![1_i64])), Arc::new(tags)], + ) + .unwrap(); + + let db_path = dir.path().join("large_lists.db"); + let mut builder = + SqliteSidecarBuilder::begin(db_path.to_str().unwrap(), "models", 2, schema, 0, vec![1]) + .unwrap(); + builder.push_batch(&batch).unwrap(); + let provider = builder.finish().unwrap(); + + let batches = provider.fetch_by_keys(&[1], "rowid", None).await.unwrap(); + let list = batches[0] + .column_by_name("tags") + .unwrap() + .as_any() + .downcast_ref::() + .expect("tags should reconstruct as a List"); + let inner = list.value(0); + let strs = inner + .as_any() + .downcast_ref::() + .expect("list items should reconstruct as LargeStringArray"); + assert_eq!(strs.len(), 2); + assert_eq!(strs.value(0), "red"); + assert!(strs.is_null(1)); +} + +/// A `LargeList` payload must round-trip. The write side serializes both +/// `List` and `LargeList` to JSON TEXT, but the read-back path only reconstructed +/// `List`, so a `LargeList` column failed with "unsupported Arrow type LargeList". +#[tokio::test] +async fn test_largelist_roundtrip() { + use arrow_array::LargeListArray; + use arrow_array::builder::{LargeListBuilder, StringBuilder}; + + let dir = tempdir().unwrap(); + + let item_field = Arc::new(Field::new("item", DataType::Utf8, true)); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("tags", DataType::LargeList(item_field.clone()), true), + ])); + + let mut lb = LargeListBuilder::new(StringBuilder::new()).with_field(item_field); + lb.values().append_value("red"); + lb.values().append_null(); + lb.append(true); + let tags = lb.finish(); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int64Array::from(vec![1_i64])), Arc::new(tags)], + ) + .unwrap(); + + let db_path = dir.path().join("largelist.db"); + let mut builder = + SqliteSidecarBuilder::begin(db_path.to_str().unwrap(), "models", 2, schema, 0, vec![1]) + .unwrap(); + builder.push_batch(&batch).unwrap(); + let provider = builder.finish().unwrap(); + + let batches = provider.fetch_by_keys(&[1], "rowid", None).await.unwrap(); + let list = batches[0] + .column_by_name("tags") + .unwrap() + .as_any() + .downcast_ref::() + .expect("tags should reconstruct as a LargeList"); + let inner = list.value(0); + let strs = inner + .as_any() + .downcast_ref::() + .expect("list items should reconstruct as StringArray"); + assert_eq!(strs.len(), 2); + assert_eq!(strs.value(0), "red"); + assert!(strs.is_null(1)); +} + #[tokio::test] async fn test_projection() { let dir = tempdir().unwrap(); @@ -492,3 +781,365 @@ async fn test_custom_key_column_name() { .unwrap(); assert_eq!(key_col.value(0), 1); } + +/// The defect class behind hotdata-dev/runtimedb#631 was that the write side +/// accepted any Arrow type (silent TEXT/NULL fallback) while the read side only +/// reconstructed a subset — so an unsupported payload column built a "successful" +/// sidecar that then exploded on the first query. This asserts the failure now +/// surfaces at *build* time (`begin`) with the offending column named, which is +/// where runtimedb's index-create step will catch it. +#[tokio::test] +async fn test_unsupported_payload_type_rejected_at_build() { + let dir = tempdir().unwrap(); + + // Decimal128 is a common parquet type the read-back path cannot reconstruct. + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("price", DataType::Decimal128(10, 2), true), + ])); + + let db_path = dir.path().join("bad.db"); + let err = + SqliteSidecarBuilder::begin(db_path.to_str().unwrap(), "models", 2, schema, 0, vec![1]) + .map(|_| ()) + .expect_err("a Decimal128 payload column must be rejected at build time"); + + let msg = err.to_string(); + assert!( + msg.contains("price") && msg.contains("unsupported type"), + "error should name the offending column and type, got: {msg}" + ); +} + +/// Round-trips the basic scalar types added alongside the validation gate +/// (Boolean, Date32, and a timezone-carrying Timestamp). These are exactly the +/// kinds of columns DuckDB/parquet emit that previously fell through to the +/// "unsupported Arrow type" error on read-back. +#[tokio::test] +async fn test_basic_scalar_types_roundtrip() { + use arrow_array::{BooleanArray, Date32Array, TimestampMicrosecondArray}; + use arrow_schema::TimeUnit; + + let dir = tempdir().unwrap(); + + let ts_type = DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())); + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("flag", DataType::Boolean, true), + Field::new("day", DataType::Date32, true), + Field::new("ts", ts_type.clone(), true), + ])); + + let ts_array = + TimestampMicrosecondArray::from(vec![Some(1_000_000_i64), None, Some(3_000_000)]) + .with_timezone_opt(Some("UTC".to_string())); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1_i64, 2, 3])), + Arc::new(BooleanArray::from(vec![Some(true), Some(false), None])), + Arc::new(Date32Array::from(vec![ + Some(19_000_i32), + None, + Some(19_002), + ])), + Arc::new(ts_array), + ], + ) + .unwrap(); + + let db_path = dir.path().join("basics.db"); + let mut builder = SqliteSidecarBuilder::begin( + db_path.to_str().unwrap(), + "models", + 2, + schema, + 0, + vec![1, 2, 3], + ) + .unwrap(); + builder.push_batch(&batch).unwrap(); + let provider = builder.finish().unwrap(); + + let batches = provider + .fetch_by_keys(&[1, 3], "rowid", None) + .await + .unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 2); + + // The timestamp column must reconstruct with its original unit *and* time + // zone, otherwise the column's DataType won't match the declared schema. + for b in &batches { + assert_eq!(b.column_by_name("ts").unwrap().data_type(), &ts_type); + b.column_by_name("flag") + .unwrap() + .as_any() + .downcast_ref::() + .expect("flag should reconstruct as BooleanArray"); + b.column_by_name("day") + .unwrap() + .as_any() + .downcast_ref::() + .expect("day should reconstruct as Date32Array"); + } + + // Spot-check rowid 1's values survived the round-trip. + let rowid_one = batches + .iter() + .flat_map(|b| { + let rid = b + .column_by_name("rowid") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let flag = b + .column_by_name("flag") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let day = b + .column_by_name("day") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + (0..b.num_rows()) + .filter(|&i| rid.value(i) == 1) + .map(|i| (flag.value(i), day.value(i))) + .collect::>() + }) + .next() + .expect("rowid 1 should be present"); + assert_eq!(rowid_one, (true, 19_000)); +} + +/// Round-trips the remaining "basic" scalar types added in the same pass: +/// the small integers (Int8/16, UInt8/16), time-of-day (Time32/Time64), and +/// binary (Binary/LargeBinary). Each column must reconstruct with its original +/// Arrow type and values, including nulls. +#[tokio::test] +async fn test_all_basic_types_roundtrip() { + use arrow_array::{ + BinaryArray, Int8Array, Int16Array, LargeBinaryArray, Time32MillisecondArray, + Time64NanosecondArray, UInt8Array, UInt16Array, + }; + use arrow_schema::TimeUnit; + + let dir = tempdir().unwrap(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("i8", DataType::Int8, true), + Field::new("i16", DataType::Int16, true), + Field::new("u8", DataType::UInt8, true), + Field::new("u16", DataType::UInt16, true), + Field::new("t32", DataType::Time32(TimeUnit::Millisecond), true), + Field::new("t64", DataType::Time64(TimeUnit::Nanosecond), true), + Field::new("bin", DataType::Binary, true), + Field::new("lbin", DataType::LargeBinary, true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(vec![1_i64, 2, 3])), + Arc::new(Int8Array::from(vec![Some(-8_i8), None, Some(127)])), + Arc::new(Int16Array::from(vec![Some(-16_i16), Some(300), None])), + Arc::new(UInt8Array::from(vec![Some(8_u8), None, Some(255)])), + Arc::new(UInt16Array::from(vec![Some(16_u16), Some(65535), None])), + Arc::new(Time32MillisecondArray::from(vec![ + Some(1_000_i32), + None, + Some(86_399_000), + ])), + Arc::new(Time64NanosecondArray::from(vec![ + Some(1_i64), + Some(2), + None, + ])), + Arc::new(BinaryArray::from_opt_vec(vec![ + Some(b"\x00\x01".as_ref()), + None, + Some(b"\xff".as_ref()), + ])), + Arc::new(LargeBinaryArray::from_opt_vec(vec![ + Some(b"abc".as_ref()), + Some(b"".as_ref()), + None, + ])), + ], + ) + .unwrap(); + + let db_path = dir.path().join("allbasic.db"); + let mut builder = SqliteSidecarBuilder::begin( + db_path.to_str().unwrap(), + "models", + 2, + schema, + 0, + vec![1, 2, 3, 4, 5, 6, 7, 8], + ) + .unwrap(); + builder.push_batch(&batch).unwrap(); + let provider = builder.finish().unwrap(); + + // Fetch rows 1 and 3 and collect them keyed by rowid so assertions don't + // depend on row/batch ordering. + let batches = provider + .fetch_by_keys(&[1, 3], "rowid", None) + .await + .unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 2); + + // Every column must reconstruct with its declared Arrow type. + for b in &batches { + for f in b.schema().fields() { + assert_eq!( + b.column_by_name(f.name()).unwrap().data_type(), + f.data_type(), + "column {} reconstructed with wrong type", + f.name() + ); + } + } + + // Pull row 1's values out and check them concretely. + let (i8v, i16v, u8v, u16v, t32v, t64v, binv, lbinv) = batches + .iter() + .flat_map(|b| { + let rid = b + .column_by_name("rowid") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let i8a = b + .column_by_name("i8") + .unwrap() + .as_any() + .downcast_ref::() + .expect("i8 should reconstruct as Int8Array"); + let i16a = b + .column_by_name("i16") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let u8a = b + .column_by_name("u8") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let u16a = b + .column_by_name("u16") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let t32a = b + .column_by_name("t32") + .unwrap() + .as_any() + .downcast_ref::() + .expect("t32 should reconstruct as Time32MillisecondArray"); + let t64a = b + .column_by_name("t64") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let bina = b + .column_by_name("bin") + .unwrap() + .as_any() + .downcast_ref::() + .expect("bin should reconstruct as BinaryArray"); + let lbina = b + .column_by_name("lbin") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + (0..b.num_rows()) + .filter(|&i| rid.value(i) == 1) + .map(|i| { + ( + i8a.value(i), + i16a.value(i), + u8a.value(i), + u16a.value(i), + t32a.value(i), + t64a.value(i), + bina.value(i).to_vec(), + lbina.value(i).to_vec(), + ) + }) + .collect::>() + }) + .next() + .expect("rowid 1 should be present"); + + assert_eq!(i8v, -8); + assert_eq!(i16v, -16); + assert_eq!(u8v, 8); + assert_eq!(u16v, 16); + assert_eq!(t32v, 1_000); + assert_eq!(t64v, 1); + assert_eq!(binv, vec![0x00, 0x01]); + assert_eq!(lbinv, b"abc".to_vec()); + + // Null cells in row 3 must survive as nulls (row 3: i8=127, i16=null, + // u16=null, t64=null, lbin=null). + let row3 = provider.fetch_by_keys(&[3], "rowid", None).await.unwrap(); + let b = &row3[0]; + let i16a = b + .column_by_name("i16") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + let lbina = b + .column_by_name("lbin") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + assert!(i16a.is_null(0), "row 3 i16 should be null"); + assert!(lbina.is_null(0), "row 3 lbin should be null"); +} + +/// The output schema's key is always field 0, but `begin`'s `key_col_index` +/// indexes the *input batch* and may be non-zero (e.g. a storage engine whose +/// key is not the first batch column). Validation must still key off output +/// field 0 — otherwise a non-zero `key_col_index` would skip a real payload +/// column (leaving its unsupported type to fail at query time) and validate the +/// key column instead. This locks that in. +#[tokio::test] +async fn test_unsupported_payload_rejected_with_nonzero_key_index() { + let dir = tempdir().unwrap(); + + // Output schema: key at field 0, an unsupported Decimal payload at field 1. + let schema = Arc::new(Schema::new(vec![ + Field::new("rowid", DataType::Int64, false), + Field::new("price", DataType::Decimal128(10, 2), true), + ])); + + // key_col_index = 1 (key is the second column of the input batch). A bug that + // skipped output field `key_col_index` would skip "price" and wrongly pass. + let db_path = dir.path().join("nonzero_key.db"); + let err = + SqliteSidecarBuilder::begin(db_path.to_str().unwrap(), "models", 2, schema, 1, vec![0]) + .map(|_| ()) + .expect_err("unsupported payload must be rejected regardless of key_col_index"); + + let msg = err.to_string(); + assert!( + msg.contains("price") && msg.contains("unsupported type"), + "error should name the payload column, got: {msg}" + ); +}